Loading include/daemon/handler/rpc_defs.hpp +0 −2 Original line number Diff line number Diff line Loading @@ -94,7 +94,5 @@ DECLARE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) DECLARE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) DECLARE_MARGO_RPC_HANDLER(rpc_srv_migrate_data) #endif // GKFS_DAEMON_RPC_DEFS_HPP include/daemon/malleability/malleable_manager.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -48,15 +48,15 @@ private: connect_to_hosts( const std::vector<std::pair<std::string, std::string>>& hosts); static void expand_abt(void* _arg); int redistribute_metadata(); void redistribute_data(); static void expand_abt(void* _arg); public: void expand_start(int old_server_conf, int new_server_conf); Loading src/client/malleability.cpp +8 −5 Original line number Diff line number Diff line Loading @@ -59,19 +59,22 @@ expand_start(int old_server_conf, int new_server_conf) { // TODO check that hostsfile contains endmarker return gkfs::malleable::rpc::forward_expand_start(old_server_conf, new_server_conf); // return 0; } int expand_status() { LOG(INFO, "{}() Expand operation status", __func__); return gkfs::malleable::rpc::forward_expand_status(); LOG(INFO, "{}() enter", __func__); auto res = gkfs::malleable::rpc::forward_expand_status(); LOG(INFO, "{}() '{}' nodes working on extend operation.", __func__, res); return res; } int expand_finalize() { LOG(INFO, "{}() Expand operation finalize", __func__); return gkfs::malleable::rpc::forward_expand_finalize(); LOG(INFO, "{}() enter", __func__); auto res = gkfs::malleable::rpc::forward_expand_finalize(); LOG(INFO, "{}() extend operation finalized. ", __func__); return res; } } // namespace gkfs::malleable No newline at end of file src/client/rpc/forward_malleability.cpp +11 −11 Original line number Diff line number Diff line Loading @@ -132,19 +132,20 @@ forward_expand_status() { // wait for RPC responses for(std::size_t i = 0; i < handles.size(); ++i) { gkfs::malleable::rpc::expand_status::output out; try { out = handles[i].get().at(0); if(out.err() != 0) { if(out.err() > 0) { LOG(DEBUG, "{}() Host '{}' not done yet with malleable operation.", __func__, targets[i]); err += out.err(); } if(out.err() < 0) { // ignore. shouldn't happen for now LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}'", __func__, targets[i], strerror(out.err())); err = out.err(); // We need to gather all responses before exiting continue; "{}() Host '{}' is unable to check for expansion progress. (shouldn't happen)", __func__, targets[i]); } } catch(const std::exception& ex) { LOG(ERROR, Loading Loading @@ -200,8 +201,7 @@ forward_expand_finalize() { out = handles[i].get().at(0); if(out.err() != 0) { LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}'", LOG(ERROR, "{}() Failed finalize on host '{}'. Error '{}'", __func__, targets[i], strerror(out.err())); err = out.err(); // We need to gather all responses before exiting Loading src/daemon/handler/srv_malleability.cpp +0 −7 Original line number Diff line number Diff line Loading @@ -138,11 +138,6 @@ rpc_srv_migrate_metadata(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } hg_return_t rpc_srv_migrate_data(hg_handle_t handle) { return HG_SUCCESS; } } // namespace DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_start) Loading @@ -152,5 +147,3 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_status) DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) DEFINE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) DEFINE_MARGO_RPC_HANDLER(rpc_srv_migrate_data) Loading
include/daemon/handler/rpc_defs.hpp +0 −2 Original line number Diff line number Diff line Loading @@ -94,7 +94,5 @@ DECLARE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) DECLARE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) DECLARE_MARGO_RPC_HANDLER(rpc_srv_migrate_data) #endif // GKFS_DAEMON_RPC_DEFS_HPP
include/daemon/malleability/malleable_manager.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -48,15 +48,15 @@ private: connect_to_hosts( const std::vector<std::pair<std::string, std::string>>& hosts); static void expand_abt(void* _arg); int redistribute_metadata(); void redistribute_data(); static void expand_abt(void* _arg); public: void expand_start(int old_server_conf, int new_server_conf); Loading
src/client/malleability.cpp +8 −5 Original line number Diff line number Diff line Loading @@ -59,19 +59,22 @@ expand_start(int old_server_conf, int new_server_conf) { // TODO check that hostsfile contains endmarker return gkfs::malleable::rpc::forward_expand_start(old_server_conf, new_server_conf); // return 0; } int expand_status() { LOG(INFO, "{}() Expand operation status", __func__); return gkfs::malleable::rpc::forward_expand_status(); LOG(INFO, "{}() enter", __func__); auto res = gkfs::malleable::rpc::forward_expand_status(); LOG(INFO, "{}() '{}' nodes working on extend operation.", __func__, res); return res; } int expand_finalize() { LOG(INFO, "{}() Expand operation finalize", __func__); return gkfs::malleable::rpc::forward_expand_finalize(); LOG(INFO, "{}() enter", __func__); auto res = gkfs::malleable::rpc::forward_expand_finalize(); LOG(INFO, "{}() extend operation finalized. ", __func__); return res; } } // namespace gkfs::malleable No newline at end of file
src/client/rpc/forward_malleability.cpp +11 −11 Original line number Diff line number Diff line Loading @@ -132,19 +132,20 @@ forward_expand_status() { // wait for RPC responses for(std::size_t i = 0; i < handles.size(); ++i) { gkfs::malleable::rpc::expand_status::output out; try { out = handles[i].get().at(0); if(out.err() != 0) { if(out.err() > 0) { LOG(DEBUG, "{}() Host '{}' not done yet with malleable operation.", __func__, targets[i]); err += out.err(); } if(out.err() < 0) { // ignore. shouldn't happen for now LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}'", __func__, targets[i], strerror(out.err())); err = out.err(); // We need to gather all responses before exiting continue; "{}() Host '{}' is unable to check for expansion progress. (shouldn't happen)", __func__, targets[i]); } } catch(const std::exception& ex) { LOG(ERROR, Loading Loading @@ -200,8 +201,7 @@ forward_expand_finalize() { out = handles[i].get().at(0); if(out.err() != 0) { LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}'", LOG(ERROR, "{}() Failed finalize on host '{}'. Error '{}'", __func__, targets[i], strerror(out.err())); err = out.err(); // We need to gather all responses before exiting Loading
src/daemon/handler/srv_malleability.cpp +0 −7 Original line number Diff line number Diff line Loading @@ -138,11 +138,6 @@ rpc_srv_migrate_metadata(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } hg_return_t rpc_srv_migrate_data(hg_handle_t handle) { return HG_SUCCESS; } } // namespace DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_start) Loading @@ -152,5 +147,3 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_status) DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) DEFINE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) DEFINE_MARGO_RPC_HANDLER(rpc_srv_migrate_data)