Loading ifs/include/preload/preload_util.hpp +1 −2 Original line number Diff line number Diff line Loading @@ -132,8 +132,7 @@ bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr); bool is_local_op(size_t recipient); template<typename T> hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const T&, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc); hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const T&, hg_handle_t& handle, bool force_rpc); #endif //IFS_PRELOAD_UTIL_HPP ifs/src/preload/preload_util.cpp +6 −8 Original line number Diff line number Diff line Loading @@ -398,13 +398,13 @@ bool is_local_op(const size_t recipient) { inline hg_return margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t recipient, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc) { bool force_rpc) { hg_return_t ret; if (is_local_op(recipient) && !force_rpc) { // local ret = margo_create(ld_margo_ipc_id, daemon_svr_addr, ipc_id, &handle); ld_logger->debug("{}() to local daemon (IPC)", __func__); } else { // remote // TODO HG_ADDR_T is never freed atm. Need to change LRUCache hg_addr_t svr_addr = HG_ADDR_NULL; if (!get_addr_by_hostid(recipient, svr_addr)) { ld_logger->error("{}() server address not resolvable for host id {}", __func__, recipient); return HG_OTHER_ERROR; Loading @@ -425,14 +425,12 @@ margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_ * @param rpc_id * @param path * @param handle * @param svr_addr * @return */ template<> hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, adafs_hash_path(path, fs_config->host_size), handle, svr_addr, force_rpc); bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, adafs_hash_path(path, fs_config->host_size), handle, force_rpc); } /** Loading @@ -446,6 +444,6 @@ hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const st */ template<> hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t& recipient, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, svr_addr, force_rpc); bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, force_rpc); } No newline at end of file ifs/src/preload/rpc/ld_rpc_data_ws.cpp +4 −4 Original line number Diff line number Diff line Loading @@ -78,7 +78,7 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { Loading Loading @@ -193,7 +193,7 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { Loading Loading @@ -266,7 +266,7 @@ void rpc_send_write_abt(void* _arg) { in.chunk_end = arg->chnk_end; in.total_chunk_size = arg->total_chunk_size; margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->recipient, handle, svr_addr, false); margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->recipient, handle, false); auto used_mid = margo_hg_handle_get_instance(handle); Loading Loading @@ -332,7 +332,7 @@ void rpc_send_read_abt(void* _arg) { in.chunk_end = arg->chnk_end; in.total_chunk_size = arg->total_chunk_size; margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, svr_addr, false); margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, false); auto used_mid = margo_hg_handle_get_instance(handle); Loading ifs/src/preload/rpc/ld_rpc_metadentry.cpp +7 −16 Original line number Diff line number Diff line Loading @@ -30,7 +30,6 @@ inline hg_return_t margo_forward_timed_wrap(hg_handle_t& handle, void* in_struct int rpc_send_mk_node(const std::string& path, const mode_t mode) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_mk_node_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; Loading @@ -39,7 +38,7 @@ int rpc_send_mk_node(const std::string& path, const mode_t mode) { in.mode = mode; // Create handle ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -75,7 +74,6 @@ int rpc_send_mk_node(const std::string& path, const mode_t mode) { int rpc_send_access(const std::string& path, const int mask) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_access_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; Loading @@ -83,7 +81,7 @@ int rpc_send_access(const std::string& path, const int mask) { in.path = path.c_str(); in.mask = mask; ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -120,14 +118,13 @@ int rpc_send_access(const std::string& path, const int mask) { int rpc_send_stat(const std::string& path, string& attr) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_path_only_in_t in{}; rpc_stat_out_t out{}; int err = EUNKNOWN; // fill in in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -170,13 +167,12 @@ int rpc_send_rm_node(const std::string& path) { rpc_rm_node_in_t in{}; rpc_err_out_t out{}; hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; int err = EUNKNOWN; // fill in in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -212,7 +208,6 @@ int rpc_send_rm_node(const std::string& path) { int rpc_send_update_metadentry(const string& path, const Metadentry& md, const MetadentryUpdateFlags& md_flags) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_update_metadentry_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; Loading Loading @@ -240,7 +235,7 @@ int rpc_send_update_metadentry(const string& path, const Metadentry& md, const M in.ctime_flag = bool_to_merc_bool(md_flags.ctime); ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -277,7 +272,6 @@ int rpc_send_update_metadentry(const string& path, const Metadentry& md, const M int rpc_send_update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, off64_t& ret_size) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_update_metadentry_size_in_t in{}; rpc_update_metadentry_size_out_t out{}; // add data Loading @@ -291,8 +285,7 @@ int rpc_send_update_metadentry_size(const string& path, const size_t size, const int err = EUNKNOWN; ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -329,7 +322,6 @@ int rpc_send_update_metadentry_size(const string& path, const size_t size, const int rpc_send_get_metadentry_size(const std::string& path, off64_t& ret_size) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_path_only_in_t in{}; rpc_get_metadentry_size_out_t out{}; // add data Loading @@ -337,8 +329,7 @@ int rpc_send_get_metadentry_size(const std::string& path, off64_t& ret_size) { int err = EUNKNOWN; ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_get_metadentry_size_id, rpc_get_metadentry_size_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_get_metadentry_size_id, rpc_get_metadentry_size_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading
ifs/include/preload/preload_util.hpp +1 −2 Original line number Diff line number Diff line Loading @@ -132,8 +132,7 @@ bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr); bool is_local_op(size_t recipient); template<typename T> hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const T&, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc); hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const T&, hg_handle_t& handle, bool force_rpc); #endif //IFS_PRELOAD_UTIL_HPP
ifs/src/preload/preload_util.cpp +6 −8 Original line number Diff line number Diff line Loading @@ -398,13 +398,13 @@ bool is_local_op(const size_t recipient) { inline hg_return margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t recipient, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc) { bool force_rpc) { hg_return_t ret; if (is_local_op(recipient) && !force_rpc) { // local ret = margo_create(ld_margo_ipc_id, daemon_svr_addr, ipc_id, &handle); ld_logger->debug("{}() to local daemon (IPC)", __func__); } else { // remote // TODO HG_ADDR_T is never freed atm. Need to change LRUCache hg_addr_t svr_addr = HG_ADDR_NULL; if (!get_addr_by_hostid(recipient, svr_addr)) { ld_logger->error("{}() server address not resolvable for host id {}", __func__, recipient); return HG_OTHER_ERROR; Loading @@ -425,14 +425,12 @@ margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_ * @param rpc_id * @param path * @param handle * @param svr_addr * @return */ template<> hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, adafs_hash_path(path, fs_config->host_size), handle, svr_addr, force_rpc); bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, adafs_hash_path(path, fs_config->host_size), handle, force_rpc); } /** Loading @@ -446,6 +444,6 @@ hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const st */ template<> hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t& recipient, hg_handle_t& handle, hg_addr_t& svr_addr, bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, svr_addr, force_rpc); bool force_rpc) { return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, force_rpc); } No newline at end of file
ifs/src/preload/rpc/ld_rpc_data_ws.cpp +4 −4 Original line number Diff line number Diff line Loading @@ -78,7 +78,7 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { Loading Loading @@ -193,7 +193,7 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { Loading Loading @@ -266,7 +266,7 @@ void rpc_send_write_abt(void* _arg) { in.chunk_end = arg->chnk_end; in.total_chunk_size = arg->total_chunk_size; margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->recipient, handle, svr_addr, false); margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->recipient, handle, false); auto used_mid = margo_hg_handle_get_instance(handle); Loading Loading @@ -332,7 +332,7 @@ void rpc_send_read_abt(void* _arg) { in.chunk_end = arg->chnk_end; in.total_chunk_size = arg->total_chunk_size; margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, svr_addr, false); margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, false); auto used_mid = margo_hg_handle_get_instance(handle); Loading
ifs/src/preload/rpc/ld_rpc_metadentry.cpp +7 −16 Original line number Diff line number Diff line Loading @@ -30,7 +30,6 @@ inline hg_return_t margo_forward_timed_wrap(hg_handle_t& handle, void* in_struct int rpc_send_mk_node(const std::string& path, const mode_t mode) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_mk_node_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; Loading @@ -39,7 +38,7 @@ int rpc_send_mk_node(const std::string& path, const mode_t mode) { in.mode = mode; // Create handle ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -75,7 +74,6 @@ int rpc_send_mk_node(const std::string& path, const mode_t mode) { int rpc_send_access(const std::string& path, const int mask) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_access_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; Loading @@ -83,7 +81,7 @@ int rpc_send_access(const std::string& path, const int mask) { in.path = path.c_str(); in.mask = mask; ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -120,14 +118,13 @@ int rpc_send_access(const std::string& path, const int mask) { int rpc_send_stat(const std::string& path, string& attr) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_path_only_in_t in{}; rpc_stat_out_t out{}; int err = EUNKNOWN; // fill in in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -170,13 +167,12 @@ int rpc_send_rm_node(const std::string& path) { rpc_rm_node_in_t in{}; rpc_err_out_t out{}; hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; int err = EUNKNOWN; // fill in in.path = path.c_str(); ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -212,7 +208,6 @@ int rpc_send_rm_node(const std::string& path) { int rpc_send_update_metadentry(const string& path, const Metadentry& md, const MetadentryUpdateFlags& md_flags) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_update_metadentry_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; Loading Loading @@ -240,7 +235,7 @@ int rpc_send_update_metadentry(const string& path, const Metadentry& md, const M in.ctime_flag = bool_to_merc_bool(md_flags.ctime); ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -277,7 +272,6 @@ int rpc_send_update_metadentry(const string& path, const Metadentry& md, const M int rpc_send_update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, off64_t& ret_size) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_update_metadentry_size_in_t in{}; rpc_update_metadentry_size_out_t out{}; // add data Loading @@ -291,8 +285,7 @@ int rpc_send_update_metadentry_size(const string& path, const size_t size, const int err = EUNKNOWN; ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading Loading @@ -329,7 +322,6 @@ int rpc_send_update_metadentry_size(const string& path, const size_t size, const int rpc_send_get_metadentry_size(const std::string& path, off64_t& ret_size) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_path_only_in_t in{}; rpc_get_metadentry_size_out_t out{}; // add data Loading @@ -337,8 +329,7 @@ int rpc_send_get_metadentry_size(const std::string& path, off64_t& ret_size) { int err = EUNKNOWN; ld_logger->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(ipc_get_metadentry_size_id, rpc_get_metadentry_size_id, path, handle, svr_addr, false); auto ret = margo_create_wrap(ipc_get_metadentry_size_id, rpc_get_metadentry_size_id, path, handle, false); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; Loading