Loading include/client/rpc/forward_data.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,8 @@ struct ChunkStat { unsigned long chunk_free; }; // TODO once we have LEAF, remove all the error code returns and throw them as an exception. std::pair<int, ssize_t> forward_write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset, size_t write_size, int64_t updated_metadentry_size); Loading include/client/rpc/forward_metadata.hpp +4 −1 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ #define GEKKOFS_CLIENT_FORWARD_METADATA_HPP #include <string> #include <memory> /* Forward declaration */ namespace gkfs { Loading @@ -27,6 +28,8 @@ struct MetadentryUpdateFlags; class Metadata; } // TODO once we have LEAF, remove all the error code returns and throw them as an exception. namespace rpc { int forward_create(const std::string& path, mode_t mode); Loading @@ -45,7 +48,7 @@ forward_update_metadentry_size(const std::string& path, size_t size, off64_t off std::pair<int, off64_t> forward_get_metadentry_size(const std::string& path); int forward_get_dirents(gkfs::filemap::OpenDir& open_dir); std::pair<int, std::shared_ptr<gkfs::filemap::OpenDir>> forward_get_dirents(const std::string& path); #ifdef HAS_SYMLINKS Loading src/client/gkfs_functions.cpp +8 −5 Original line number Diff line number Diff line Loading @@ -818,13 +818,14 @@ int gkfs_opendir(const std::string& path) { return -1; } auto open_dir = std::make_shared<gkfs::filemap::OpenDir>(path); auto err = gkfs::rpc::forward_get_dirents(*open_dir); auto ret = gkfs::rpc::forward_get_dirents(path); auto err = ret.first; if (err) { errno = err; return -1; } return CTX->file_map()->add(open_dir); assert(ret.second); return CTX->file_map()->add(ret.second); } /** Loading @@ -846,12 +847,14 @@ int gkfs_rmdir(const std::string& path) { return -1; } auto open_dir = std::make_shared<gkfs::filemap::OpenDir>(path); auto err = gkfs::rpc::forward_get_dirents(*open_dir); auto ret = gkfs::rpc::forward_get_dirents(path); auto err = ret.first; if (err) { errno = err; return -1; } assert(ret.second); auto open_dir = ret.second; if (open_dir->size() != 0) { errno = ENOTEMPTY; return -1; Loading src/client/rpc/forward_metadata.cpp +20 −14 Original line number Diff line number Diff line Loading @@ -353,12 +353,11 @@ pair<int, off64_t> forward_get_metadentry_size(const std::string& path) { * @param open_dir * @return error code */ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { pair<int, shared_ptr<gkfs::filemap::OpenDir>> forward_get_dirents(const string& path) { LOG(DEBUG, "{}() enter for path '{}'", __func__, open_dir.path()) LOG(DEBUG, "{}() enter for path '{}'", __func__, path) auto const root_dir = open_dir.path(); auto const targets = CTX->distributor()->locate_directory_metadata(root_dir); auto const targets = CTX->distributor()->locate_directory_metadata(path); /* preallocate receiving buffer. The actual size is not known yet. * Loading Loading @@ -387,10 +386,11 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { hermes::access_mode::write_only)); } catch (const std::exception& ex) { LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__, ex.what()); return EBUSY; return make_pair(EBUSY, nullptr); } } auto err = 0; // send RPCs std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents>> handles; Loading @@ -399,23 +399,25 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // Setup rpc input parameters for each host auto endp = CTX->hosts().at(targets[i]); gkfs::rpc::get_dirents::input in(root_dir, exposed_buffers[i]); gkfs::rpc::get_dirents::input in(path, exposed_buffers[i]); try { LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]); handles.emplace_back(ld_network_service->post<gkfs::rpc::get_dirents>(endp, in)); } catch (const std::exception& ex) { LOG(ERROR, "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'", __func__, root_dir, LOG(ERROR, "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'", __func__, path, targets[i], ex.what()); return EBUSY; err = EBUSY; break; // we need to gather responses from already sent RPCS } } LOG(INFO, "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize", __func__, open_dir.path(), targets.size(), per_host_buff_size); __func__, path, targets.size(), per_host_buff_size); auto err = 0; auto send_error = err != 0; auto open_dir = make_shared<gkfs::filemap::OpenDir>(path); // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { Loading @@ -425,17 +427,21 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // XXX We might need a timeout here to not wait forever for an // output that never comes? out = handles[i].get().at(0); // skip processing dirent data if there was an error during send // In this case all responses are gathered but their contents skipped if (send_error) continue; if (out.err() != 0) { LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'", __func__, targets[i], strerror(out.err()), root_dir); strerror(out.err()), path); err = out.err(); // We need to gather all responses before exiting continue; } } catch (const std::exception& ex) { LOG(ERROR, "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'", __func__, root_dir, LOG(ERROR, "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'", __func__, path, targets[i], ex.what()); err = EBUSY; // We need to gather all responses before exiting Loading Loading @@ -465,10 +471,10 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // number of characters in entry + \0 terminator names_ptr += name.size() + 1; open_dir.add(name, ftype); open_dir->add(name, ftype); } } return err; return make_pair(err, open_dir); } #ifdef HAS_SYMLINKS Loading Loading
include/client/rpc/forward_data.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,8 @@ struct ChunkStat { unsigned long chunk_free; }; // TODO once we have LEAF, remove all the error code returns and throw them as an exception. std::pair<int, ssize_t> forward_write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset, size_t write_size, int64_t updated_metadentry_size); Loading
include/client/rpc/forward_metadata.hpp +4 −1 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ #define GEKKOFS_CLIENT_FORWARD_METADATA_HPP #include <string> #include <memory> /* Forward declaration */ namespace gkfs { Loading @@ -27,6 +28,8 @@ struct MetadentryUpdateFlags; class Metadata; } // TODO once we have LEAF, remove all the error code returns and throw them as an exception. namespace rpc { int forward_create(const std::string& path, mode_t mode); Loading @@ -45,7 +48,7 @@ forward_update_metadentry_size(const std::string& path, size_t size, off64_t off std::pair<int, off64_t> forward_get_metadentry_size(const std::string& path); int forward_get_dirents(gkfs::filemap::OpenDir& open_dir); std::pair<int, std::shared_ptr<gkfs::filemap::OpenDir>> forward_get_dirents(const std::string& path); #ifdef HAS_SYMLINKS Loading
src/client/gkfs_functions.cpp +8 −5 Original line number Diff line number Diff line Loading @@ -818,13 +818,14 @@ int gkfs_opendir(const std::string& path) { return -1; } auto open_dir = std::make_shared<gkfs::filemap::OpenDir>(path); auto err = gkfs::rpc::forward_get_dirents(*open_dir); auto ret = gkfs::rpc::forward_get_dirents(path); auto err = ret.first; if (err) { errno = err; return -1; } return CTX->file_map()->add(open_dir); assert(ret.second); return CTX->file_map()->add(ret.second); } /** Loading @@ -846,12 +847,14 @@ int gkfs_rmdir(const std::string& path) { return -1; } auto open_dir = std::make_shared<gkfs::filemap::OpenDir>(path); auto err = gkfs::rpc::forward_get_dirents(*open_dir); auto ret = gkfs::rpc::forward_get_dirents(path); auto err = ret.first; if (err) { errno = err; return -1; } assert(ret.second); auto open_dir = ret.second; if (open_dir->size() != 0) { errno = ENOTEMPTY; return -1; Loading
src/client/rpc/forward_metadata.cpp +20 −14 Original line number Diff line number Diff line Loading @@ -353,12 +353,11 @@ pair<int, off64_t> forward_get_metadentry_size(const std::string& path) { * @param open_dir * @return error code */ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { pair<int, shared_ptr<gkfs::filemap::OpenDir>> forward_get_dirents(const string& path) { LOG(DEBUG, "{}() enter for path '{}'", __func__, open_dir.path()) LOG(DEBUG, "{}() enter for path '{}'", __func__, path) auto const root_dir = open_dir.path(); auto const targets = CTX->distributor()->locate_directory_metadata(root_dir); auto const targets = CTX->distributor()->locate_directory_metadata(path); /* preallocate receiving buffer. The actual size is not known yet. * Loading Loading @@ -387,10 +386,11 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { hermes::access_mode::write_only)); } catch (const std::exception& ex) { LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__, ex.what()); return EBUSY; return make_pair(EBUSY, nullptr); } } auto err = 0; // send RPCs std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents>> handles; Loading @@ -399,23 +399,25 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // Setup rpc input parameters for each host auto endp = CTX->hosts().at(targets[i]); gkfs::rpc::get_dirents::input in(root_dir, exposed_buffers[i]); gkfs::rpc::get_dirents::input in(path, exposed_buffers[i]); try { LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]); handles.emplace_back(ld_network_service->post<gkfs::rpc::get_dirents>(endp, in)); } catch (const std::exception& ex) { LOG(ERROR, "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'", __func__, root_dir, LOG(ERROR, "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'", __func__, path, targets[i], ex.what()); return EBUSY; err = EBUSY; break; // we need to gather responses from already sent RPCS } } LOG(INFO, "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize", __func__, open_dir.path(), targets.size(), per_host_buff_size); __func__, path, targets.size(), per_host_buff_size); auto err = 0; auto send_error = err != 0; auto open_dir = make_shared<gkfs::filemap::OpenDir>(path); // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { Loading @@ -425,17 +427,21 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // XXX We might need a timeout here to not wait forever for an // output that never comes? out = handles[i].get().at(0); // skip processing dirent data if there was an error during send // In this case all responses are gathered but their contents skipped if (send_error) continue; if (out.err() != 0) { LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'", __func__, targets[i], strerror(out.err()), root_dir); strerror(out.err()), path); err = out.err(); // We need to gather all responses before exiting continue; } } catch (const std::exception& ex) { LOG(ERROR, "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'", __func__, root_dir, LOG(ERROR, "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'", __func__, path, targets[i], ex.what()); err = EBUSY; // We need to gather all responses before exiting Loading Loading @@ -465,10 +471,10 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // number of characters in entry + \0 terminator names_ptr += name.size() + 1; open_dir.add(name, ftype); open_dir->add(name, ftype); } } return err; return make_pair(err, open_dir); } #ifdef HAS_SYMLINKS Loading