Loading ifs/src/daemon/adafs_ops/data.cpp +1 −28 Original line number Diff line number Diff line Loading @@ -17,45 +17,18 @@ std::string path_to_fspath(const std::string& path) { return fs_path; } /** * Creates the directory in the chunk dir for a file to hold data * @param inode * @return */ // XXX this might be just a temp function as long as we don't use chunks // XXX this function creates not only the chunk folder but also a single file which holds the data of the 'real' file int init_chunk_space(const std::string& path) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; // create chunk dir bfs::create_directories(chnk_path); // XXX create temp big file. remember also to modify the return value chnk_path /= "data"s; bfs::ofstream ofs{chnk_path}; // return static_cast<int>(bfs::exists(chnk_path)); return 0; } /** * Remove the directory in the chunk dir of a file. * @param inode * @return */ // XXX this might be just a temp function as long as we don't use chunks int destroy_chunk_space(const std::string& path) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; // create chunk dir // remove chunk dir with all contents if path exists bfs::remove_all(chnk_path); // return static_cast<int>(!bfs::exists(chnk_path)); return 0; } Loading ifs/src/daemon/adafs_ops/metadentry.cpp +17 −24 Original line number Diff line number Diff line Loading @@ -5,31 +5,12 @@ using namespace std; static const std::string dentry_val_delim = ","s; // XXX this needs to be global. ino_t generate_inode_no() { std::lock_guard<std::mutex> inode_lock(ADAFS_DATA->inode_mutex); // TODO check that our inode counter is within boundaries of inode numbers in the given node return ADAFS_DATA->raise_inode_count(1); } /** * Creates a file system node of any type (such as file or directory) * @param path * @param uid * @param gid * @param mode * @return */ int create_node(const std::string& path, const uid_t uid, const gid_t gid, mode_t mode) { auto err = create_metadentry(path, mode); // XXX errorhandling // XXX Only do that for files and not for directories init_chunk_space(path); return err; } /** * Creates metadata (if required) and dentry at the same time * @param path Loading Loading @@ -78,16 +59,28 @@ int get_metadentry(const std::string& path, Metadata& md) { return 0; } /** * Wrapper to remove a KV store entry with the path as key * @param path * @return */ int remove_metadentry(const string& path) { return db_delete_metadentry(path) ? 0 : -1; } /** * Remove metadentry if exists and try to remove all chunks for path * @param path * @return */ int remove_node(const string& path) { auto err = remove_metadentry(path); // XXX Only do that with a file. Directory needs to be handled differently if (err == 0) destroy_chunk_space( path); // XXX This removes only the data on that node. Leaving everything in inconsistent state int err = 0; // assume we succeed Metadata md{}; // If metadentry exists, try to remove it if (get_metadentry(path, md) == 0) { err = remove_metadentry(path); } destroy_chunk_space(path); // destroys all chunks for the path on this node return err; } Loading ifs/src/daemon/handler/h_metadentry.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -131,7 +131,7 @@ static hg_return_t rpc_srv_rm_node(hg_handle_t handle) { assert(ret == HG_SUCCESS); ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path {}", in.path); // do remove // Remove metadentry if exists on the node but also remove all chunks for that path out.err = remove_node(in.path); ADAFS_DATA->spdlogger()->debug("Sending output {}", out.err); Loading ifs/src/preload/adafs_functions.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -44,6 +44,11 @@ int adafs_mk_node(const std::string& path, const mode_t mode) { return rpc_send_mk_node(path, mode); } /** * This sends internally a broadcast (i.e. n RPCs) to clean their chunk folders for that path * @param path * @return */ int adafs_rm_node(const std::string& path) { init_ld_env_if_needed(); return rpc_send_rm_node(path); Loading ifs/src/preload/rpc/ld_rpc_metadentry.cpp +45 −27 Original line number Diff line number Diff line Loading @@ -164,44 +164,62 @@ int rpc_send_stat(const std::string& path, string& attr) { } int rpc_send_rm_node(const std::string& path) { rpc_rm_node_in_t in{}; rpc_err_out_t out{}; hg_handle_t handle; int err = EUNKNOWN; hg_return_t ret; int err = 0; // assume we succeed ld_logger->debug("{}() Creating Mercury handles for all nodes ...", __func__); vector<hg_handle_t> rpc_handles(fs_config->host_size); vector<margo_request> rpc_waiters(fs_config->host_size); vector<rpc_rm_node_in_t> rpc_in(fs_config->host_size); // Send rpc to all nodes as all of them can have chunks for this path for (size_t i = 0; i < fs_config->host_size; i++) { // fill in in.path = path.c_str(); rpc_in[i].path = path.c_str(); // create handle ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, i, rpc_handles[i], false); if (ret != HG_SUCCESS) { ld_logger->warn("{}() Unable to create Mercury handle", __func__); // We use continue here to remove at least some data // XXX In the future we can discuss RPC retrying. This should be a function to be used in general errno = EBUSY; err = -1; } // send async rpc ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->warn("{}() Unable to create Mercury handle", __func__); errno = EBUSY; err = -1; } } ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, false); // Wait for RPC responses and then get response for (size_t i = 0; i < fs_config->host_size; i++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->warn("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, i); errno = EBUSY; return -1; err = -1; } // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); rpc_err_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; if (err != 0) { errno = out.err; err = -1; } } else { // something is wrong errno = EBUSY; err = -1; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out"); errno = EBUSY; margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); } margo_destroy(handle); return err; } Loading Loading
ifs/src/daemon/adafs_ops/data.cpp +1 −28 Original line number Diff line number Diff line Loading @@ -17,45 +17,18 @@ std::string path_to_fspath(const std::string& path) { return fs_path; } /** * Creates the directory in the chunk dir for a file to hold data * @param inode * @return */ // XXX this might be just a temp function as long as we don't use chunks // XXX this function creates not only the chunk folder but also a single file which holds the data of the 'real' file int init_chunk_space(const std::string& path) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; // create chunk dir bfs::create_directories(chnk_path); // XXX create temp big file. remember also to modify the return value chnk_path /= "data"s; bfs::ofstream ofs{chnk_path}; // return static_cast<int>(bfs::exists(chnk_path)); return 0; } /** * Remove the directory in the chunk dir of a file. * @param inode * @return */ // XXX this might be just a temp function as long as we don't use chunks int destroy_chunk_space(const std::string& path) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; // create chunk dir // remove chunk dir with all contents if path exists bfs::remove_all(chnk_path); // return static_cast<int>(!bfs::exists(chnk_path)); return 0; } Loading
ifs/src/daemon/adafs_ops/metadentry.cpp +17 −24 Original line number Diff line number Diff line Loading @@ -5,31 +5,12 @@ using namespace std; static const std::string dentry_val_delim = ","s; // XXX this needs to be global. ino_t generate_inode_no() { std::lock_guard<std::mutex> inode_lock(ADAFS_DATA->inode_mutex); // TODO check that our inode counter is within boundaries of inode numbers in the given node return ADAFS_DATA->raise_inode_count(1); } /** * Creates a file system node of any type (such as file or directory) * @param path * @param uid * @param gid * @param mode * @return */ int create_node(const std::string& path, const uid_t uid, const gid_t gid, mode_t mode) { auto err = create_metadentry(path, mode); // XXX errorhandling // XXX Only do that for files and not for directories init_chunk_space(path); return err; } /** * Creates metadata (if required) and dentry at the same time * @param path Loading Loading @@ -78,16 +59,28 @@ int get_metadentry(const std::string& path, Metadata& md) { return 0; } /** * Wrapper to remove a KV store entry with the path as key * @param path * @return */ int remove_metadentry(const string& path) { return db_delete_metadentry(path) ? 0 : -1; } /** * Remove metadentry if exists and try to remove all chunks for path * @param path * @return */ int remove_node(const string& path) { auto err = remove_metadentry(path); // XXX Only do that with a file. Directory needs to be handled differently if (err == 0) destroy_chunk_space( path); // XXX This removes only the data on that node. Leaving everything in inconsistent state int err = 0; // assume we succeed Metadata md{}; // If metadentry exists, try to remove it if (get_metadentry(path, md) == 0) { err = remove_metadentry(path); } destroy_chunk_space(path); // destroys all chunks for the path on this node return err; } Loading
ifs/src/daemon/handler/h_metadentry.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -131,7 +131,7 @@ static hg_return_t rpc_srv_rm_node(hg_handle_t handle) { assert(ret == HG_SUCCESS); ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path {}", in.path); // do remove // Remove metadentry if exists on the node but also remove all chunks for that path out.err = remove_node(in.path); ADAFS_DATA->spdlogger()->debug("Sending output {}", out.err); Loading
ifs/src/preload/adafs_functions.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -44,6 +44,11 @@ int adafs_mk_node(const std::string& path, const mode_t mode) { return rpc_send_mk_node(path, mode); } /** * This sends internally a broadcast (i.e. n RPCs) to clean their chunk folders for that path * @param path * @return */ int adafs_rm_node(const std::string& path) { init_ld_env_if_needed(); return rpc_send_rm_node(path); Loading
ifs/src/preload/rpc/ld_rpc_metadentry.cpp +45 −27 Original line number Diff line number Diff line Loading @@ -164,44 +164,62 @@ int rpc_send_stat(const std::string& path, string& attr) { } int rpc_send_rm_node(const std::string& path) { rpc_rm_node_in_t in{}; rpc_err_out_t out{}; hg_handle_t handle; int err = EUNKNOWN; hg_return_t ret; int err = 0; // assume we succeed ld_logger->debug("{}() Creating Mercury handles for all nodes ...", __func__); vector<hg_handle_t> rpc_handles(fs_config->host_size); vector<margo_request> rpc_waiters(fs_config->host_size); vector<rpc_rm_node_in_t> rpc_in(fs_config->host_size); // Send rpc to all nodes as all of them can have chunks for this path for (size_t i = 0; i < fs_config->host_size; i++) { // fill in in.path = path.c_str(); rpc_in[i].path = path.c_str(); // create handle ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, i, rpc_handles[i], false); if (ret != HG_SUCCESS) { ld_logger->warn("{}() Unable to create Mercury handle", __func__); // We use continue here to remove at least some data // XXX In the future we can discuss RPC retrying. This should be a function to be used in general errno = EBUSY; err = -1; } // send async rpc ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->warn("{}() Unable to create Mercury handle", __func__); errno = EBUSY; err = -1; } } ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, false); // Wait for RPC responses and then get response for (size_t i = 0; i < fs_config->host_size; i++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->warn("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, i); errno = EBUSY; return -1; err = -1; } // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); rpc_err_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; if (err != 0) { errno = out.err; err = -1; } } else { // something is wrong errno = EBUSY; err = -1; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out"); errno = EBUSY; margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); } margo_destroy(handle); return err; } Loading