Loading ifs/configure.hpp +9 −3 Original line number Diff line number Diff line Loading @@ -2,13 +2,13 @@ #ifndef FS_CONFIGURE_H #define FS_CONFIGURE_H // To enabled logging with info level // To enabled logging for daemon #define LOG_INFO //#define LOG_DEBUG //#define LOG_TRACE #define LOG_DAEMON_PATH "/tmp/adafs_daemon.log" // Enable logging for daemon // Enable logging for preload #define LOG_PRELOAD_INFO //#define LOG_PRELOAD_DEBUG //#define LOG_PRELOAD_TRACE Loading Loading @@ -43,7 +43,13 @@ // RPC configuration #define RPCPORT 4433 #define RPC_TRIES 3 #define RPC_TIMEOUT 150000 // rpc timeout to try again in milliseconds #define RPC_TIMEOUT 180000 // enables timing of sending rpcs //#define MARGO_FORWARD_TIMER // sets the threshold in milliseconds when a log entry should be created #define MARGO_FORWARD_TIMER_THRESHOLD 1000 // Set a hostname suffix when a connection is built. E.g., "-ib" to use Infiniband #define HOSTNAME_SUFFIX "" //#define MARGODIAG // enables diagnostics of margo (printed after shutting down Loading ifs/include/preload/rpc/ld_rpc_metadentry.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,10 @@ #include <rpc/rpc_types.hpp> #include <iostream> inline hg_return_t margo_forward_timed_wrap_timer(hg_handle_t& handle, void* in_struct, const char* func); inline hg_return_t margo_forward_timed_wrap(hg_handle_t& handle, void* in_struct); void send_minimal_rpc(hg_id_t minimal_id); int rpc_send_mk_node(const std::string& path, const mode_t mode); Loading ifs/src/preload/adafs_functions.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -62,6 +62,7 @@ int adafs_access(const std::string& path, const int mask) { int adafs_stat(const std::string& path, struct stat* buf) { string attr = ""s; auto err = rpc_send_stat(path, attr); if (err == 0) db_val_to_stat(path, attr, *buf); return err; } Loading ifs/src/preload/preload.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -232,9 +232,9 @@ void init_preload() { void destroy_preload() { #ifdef MARGODIAG cout << "\n####################\n\nMargo IPC client stats: " << endl; margo_diag_dump(margo_ipc_id_, "-", 0); margo_diag_dump(ld_margo_ipc_id, "-", 0); cout << "\n####################\n\nMargo RPC client stats: " << endl; margo_diag_dump(margo_rpc_id_, "-", 0); margo_diag_dump(ld_margo_rpc_id, "-", 0); #endif ld_logger->info("{}() Freeing Margo daemon addr ...", __func__); auto ret = margo_addr_free(ld_margo_ipc_id, daemon_svr_addr); Loading ifs/src/preload/rpc/ld_rpc_metadentry.cpp +154 −108 Original line number Diff line number Diff line Loading @@ -4,45 +4,71 @@ using namespace std; using ns = chrono::nanoseconds; using get_time = chrono::steady_clock; inline hg_return_t margo_forward_timed_wrap_timer(hg_handle_t& handle, void* in_struct, const char* func) { auto start_t = get_time::now(); auto ret = margo_forward_timed_wrap(handle, in_struct); auto diff_count = chrono::duration_cast<ns>(get_time::now() - start_t).count(); if (((diff_count) / 1000000.) > MARGO_FORWARD_TIMER_THRESHOLD) ld_logger->info("{}() rpc_time: {} ms", func, ((diff_count) / 1000000.)); return ret; } inline hg_return_t margo_forward_timed_wrap(hg_handle_t& handle, void* in_struct) { hg_return_t ret = HG_OTHER_ERROR; for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, in_struct, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } } return ret; } int rpc_send_mk_node(const std::string& path, const mode_t mode) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_mk_node_in_t in{}; rpc_err_out_t out{}; hg_return_t ret; int err = EUNKNOWN; // fill in in.path = path.c_str(); in.mode = mode; // Create handle ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, svr_addr, false); ret = HG_OTHER_ERROR; ld_logger->debug("{}() About to send RPC ...", __func__); for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc ld_logger->debug("{}() About to send RPC ...", __func__); #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); if (out.err != 0) errno = out.err; else err = 0; err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; } Loading @@ -52,42 +78,40 @@ int rpc_send_access(const std::string& path, const int mask) { hg_addr_t svr_addr = HG_ADDR_NULL; rpc_access_in_t in{}; rpc_err_out_t out{}; hg_return_t ret; int err = EUNKNOWN; // fill in in.path = path.c_str(); in.mask = mask; ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, svr_addr, false); ret = HG_OTHER_ERROR; ld_logger->debug("{}() About to send RPC ...", __func__); for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc ld_logger->debug("{}() About to send RPC ...", __func__); #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() while getting margo output", __func__); errno = EIO; margo_free_output(handle, &out); margo_destroy(handle); return -1; } if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); if (out.err != 0) errno = out.err; else err = 0; err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); Loading @@ -99,35 +123,39 @@ int rpc_send_stat(const std::string& path, string& attr) { hg_addr_t svr_addr = HG_ADDR_NULL; rpc_stat_in_t in{}; rpc_stat_out_t out{}; hg_return_t ret; int err = EUNKNOWN; // fill in in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); ret = HG_OTHER_ERROR; for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; if (out.err == 0) attr = out.db_val; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { err = 1; ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; Loading @@ -143,27 +171,34 @@ int rpc_send_rm_node(const std::string& path) { in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); auto ret = HG_OTHER_ERROR; for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; Loading Loading @@ -200,28 +235,34 @@ int rpc_send_update_metadentry(const string& path, const Metadentry& md, const M in.ctime_flag = bool_to_merc_bool(md_flags.ctime); ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); int send_ret = HG_FALSE; for (int i = 0; i < RPC_TRIES; ++i) { send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (send_ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } if (send_ret == HG_SUCCESS) { /* decode response */ // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { ld_logger->trace("{}() Waiting for response", __func__); margo_get_output(handle, &out); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); Loading @@ -245,33 +286,38 @@ int rpc_send_update_metadentry_size(const string& path, const size_t size, const int err = EUNKNOWN; ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); int send_ret = HG_FALSE; for (int i = 0; i < RPC_TRIES; ++i) { send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (send_ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } if (send_ret == HG_SUCCESS) { /* decode response */ // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { ld_logger->trace("{}() Waiting for response", __func__); if (margo_get_output(handle, &out) != HG_SUCCESS) { ld_logger->error("{}() Unable to get rpc output", __func__); ret_size = 0; } else { ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; ret_size = out.ret_size; } else { // something is wrong errno = EBUSY; ret_size = 0; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } } else { ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; } No newline at end of file Loading
ifs/configure.hpp +9 −3 Original line number Diff line number Diff line Loading @@ -2,13 +2,13 @@ #ifndef FS_CONFIGURE_H #define FS_CONFIGURE_H // To enabled logging with info level // To enabled logging for daemon #define LOG_INFO //#define LOG_DEBUG //#define LOG_TRACE #define LOG_DAEMON_PATH "/tmp/adafs_daemon.log" // Enable logging for daemon // Enable logging for preload #define LOG_PRELOAD_INFO //#define LOG_PRELOAD_DEBUG //#define LOG_PRELOAD_TRACE Loading Loading @@ -43,7 +43,13 @@ // RPC configuration #define RPCPORT 4433 #define RPC_TRIES 3 #define RPC_TIMEOUT 150000 // rpc timeout to try again in milliseconds #define RPC_TIMEOUT 180000 // enables timing of sending rpcs //#define MARGO_FORWARD_TIMER // sets the threshold in milliseconds when a log entry should be created #define MARGO_FORWARD_TIMER_THRESHOLD 1000 // Set a hostname suffix when a connection is built. E.g., "-ib" to use Infiniband #define HOSTNAME_SUFFIX "" //#define MARGODIAG // enables diagnostics of margo (printed after shutting down Loading
ifs/include/preload/rpc/ld_rpc_metadentry.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,10 @@ #include <rpc/rpc_types.hpp> #include <iostream> inline hg_return_t margo_forward_timed_wrap_timer(hg_handle_t& handle, void* in_struct, const char* func); inline hg_return_t margo_forward_timed_wrap(hg_handle_t& handle, void* in_struct); void send_minimal_rpc(hg_id_t minimal_id); int rpc_send_mk_node(const std::string& path, const mode_t mode); Loading
ifs/src/preload/adafs_functions.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -62,6 +62,7 @@ int adafs_access(const std::string& path, const int mask) { int adafs_stat(const std::string& path, struct stat* buf) { string attr = ""s; auto err = rpc_send_stat(path, attr); if (err == 0) db_val_to_stat(path, attr, *buf); return err; } Loading
ifs/src/preload/preload.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -232,9 +232,9 @@ void init_preload() { void destroy_preload() { #ifdef MARGODIAG cout << "\n####################\n\nMargo IPC client stats: " << endl; margo_diag_dump(margo_ipc_id_, "-", 0); margo_diag_dump(ld_margo_ipc_id, "-", 0); cout << "\n####################\n\nMargo RPC client stats: " << endl; margo_diag_dump(margo_rpc_id_, "-", 0); margo_diag_dump(ld_margo_rpc_id, "-", 0); #endif ld_logger->info("{}() Freeing Margo daemon addr ...", __func__); auto ret = margo_addr_free(ld_margo_ipc_id, daemon_svr_addr); Loading
ifs/src/preload/rpc/ld_rpc_metadentry.cpp +154 −108 Original line number Diff line number Diff line Loading @@ -4,45 +4,71 @@ using namespace std; using ns = chrono::nanoseconds; using get_time = chrono::steady_clock; inline hg_return_t margo_forward_timed_wrap_timer(hg_handle_t& handle, void* in_struct, const char* func) { auto start_t = get_time::now(); auto ret = margo_forward_timed_wrap(handle, in_struct); auto diff_count = chrono::duration_cast<ns>(get_time::now() - start_t).count(); if (((diff_count) / 1000000.) > MARGO_FORWARD_TIMER_THRESHOLD) ld_logger->info("{}() rpc_time: {} ms", func, ((diff_count) / 1000000.)); return ret; } inline hg_return_t margo_forward_timed_wrap(hg_handle_t& handle, void* in_struct) { hg_return_t ret = HG_OTHER_ERROR; for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, in_struct, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } } return ret; } int rpc_send_mk_node(const std::string& path, const mode_t mode) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_mk_node_in_t in{}; rpc_err_out_t out{}; hg_return_t ret; int err = EUNKNOWN; // fill in in.path = path.c_str(); in.mode = mode; // Create handle ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, svr_addr, false); ret = HG_OTHER_ERROR; ld_logger->debug("{}() About to send RPC ...", __func__); for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc ld_logger->debug("{}() About to send RPC ...", __func__); #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); if (out.err != 0) errno = out.err; else err = 0; err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; } Loading @@ -52,42 +78,40 @@ int rpc_send_access(const std::string& path, const int mask) { hg_addr_t svr_addr = HG_ADDR_NULL; rpc_access_in_t in{}; rpc_err_out_t out{}; hg_return_t ret; int err = EUNKNOWN; // fill in in.path = path.c_str(); in.mask = mask; ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, svr_addr, false); ret = HG_OTHER_ERROR; ld_logger->debug("{}() About to send RPC ...", __func__); for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc ld_logger->debug("{}() About to send RPC ...", __func__); #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() while getting margo output", __func__); errno = EIO; margo_free_output(handle, &out); margo_destroy(handle); return -1; } if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); if (out.err != 0) errno = out.err; else err = 0; err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); Loading @@ -99,35 +123,39 @@ int rpc_send_stat(const std::string& path, string& attr) { hg_addr_t svr_addr = HG_ADDR_NULL; rpc_stat_in_t in{}; rpc_stat_out_t out{}; hg_return_t ret; int err = EUNKNOWN; // fill in in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); ret = HG_OTHER_ERROR; for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; if (out.err == 0) attr = out.db_val; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { err = 1; ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; Loading @@ -143,27 +171,34 @@ int rpc_send_rm_node(const std::string& path) { in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); auto ret = HG_OTHER_ERROR; for (int i = 0; i < RPC_TRIES; ++i) { ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { /* decode response */ ld_logger->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; Loading Loading @@ -200,28 +235,34 @@ int rpc_send_update_metadentry(const string& path, const Metadentry& md, const M in.ctime_flag = bool_to_merc_bool(md_flags.ctime); ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); int send_ret = HG_FALSE; for (int i = 0; i < RPC_TRIES; ++i) { send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (send_ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } if (send_ret == HG_SUCCESS) { /* decode response */ // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { ld_logger->trace("{}() Waiting for response", __func__); margo_get_output(handle, &out); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); Loading @@ -245,33 +286,38 @@ int rpc_send_update_metadentry_size(const string& path, const size_t size, const int err = EUNKNOWN; ld_logger->debug("{}() Creating Mercury handle ...", __func__); margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, svr_addr, false); ld_logger->debug("{}() About to send RPC ...", __func__); int send_ret = HG_FALSE; for (int i = 0; i < RPC_TRIES; ++i) { send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT); if (send_ret == HG_SUCCESS) { break; } auto ret = margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, svr_addr, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } if (send_ret == HG_SUCCESS) { /* decode response */ // Send rpc #if defined(MARGO_FORWARD_TIMER) ret = margo_forward_timed_wrap_timer(handle, &in, __func__); #else ret = margo_forward_timed_wrap(handle, &in); #endif // Get response if (ret == HG_SUCCESS) { ld_logger->trace("{}() Waiting for response", __func__); if (margo_get_output(handle, &out) != HG_SUCCESS) { ld_logger->error("{}() Unable to get rpc output", __func__); ret_size = 0; } else { ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { ld_logger->debug("{}() Got response success: {}", __func__, out.err); err = out.err; ret_size = out.ret_size; } else { // something is wrong errno = EBUSY; ret_size = 0; ld_logger->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } } else { ld_logger->warn("{}() timed out", __func__); ld_logger->warn("{}() timed out"); errno = EBUSY; } margo_destroy(handle); return err; } No newline at end of file