Loading ifs/include/preload/preload_util.hpp +2 −2 Original line number Diff line number Diff line Loading @@ -61,8 +61,6 @@ extern hg_id_t rpc_read_data_id; extern hg_id_t rpc_trunc_data_id; extern hg_id_t rpc_get_dirents_id; extern hg_id_t rpc_chunk_stat_id; // rpc addresses. Populated when environment is initialized. After that it is read-only accessed extern std::map<uint64_t, hg_addr_t> rpc_addresses; // function definitions Loading @@ -76,6 +74,8 @@ bool read_system_hostfile(); bool lookup_all_hosts(); void cleanup_addresses(); bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr); bool is_local_op(size_t recipient); Loading ifs/src/preload/preload.cpp +1 −12 Original line number Diff line number Diff line Loading @@ -55,8 +55,6 @@ hg_id_t rpc_chunk_stat_id; // Margo instances margo_instance_id ld_margo_ipc_id; margo_instance_id ld_margo_rpc_id; // rpc address cache std::map<uint64_t, hg_addr_t> rpc_addresses; // local daemon IPC address hg_addr_t daemon_svr_addr = HG_ADDR_NULL; Loading Loading @@ -271,8 +269,6 @@ void init_ld_environment_() { CTX->log()->error("{}() Unable to read system hostfile /etc/hosts for address mapping.", __func__); exit(EXIT_FAILURE); } //use rpc_addresses here to avoid "static initialization order problem" rpc_addresses.clear(); if (!lookup_all_hosts()) { CTX->log()->error("{}() Unable to lookup all host RPC addresses.", __func__); exit(EXIT_FAILURE); Loading Loading @@ -365,13 +361,7 @@ void destroy_preload() { // Shut down RPC client if used if (ld_margo_rpc_id != nullptr) { // free all rpc addresses in LRU map and finalize margo rpc CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (auto& e : rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } cleanup_addresses(); CTX->log()->debug("{}() About to finalize the margo RPC client. Actually not doing it XXX", __func__); // XXX Sometimes this hangs on the cluster. Investigate. // Might been solved in margo 0.3. It is not an issue with Omnipath for sure. Maybe CCI only issue. Loading @@ -388,7 +378,6 @@ void destroy_preload() { CTX->log()->debug("{}() Shut down Margo IPC client successful", __func__); } if (services_used) { rpc_addresses.clear(); CTX->log()->info("All services shut down. Client shutdown complete."); } else Loading ifs/src/preload/preload_util.cpp +20 −3 Original line number Diff line number Diff line Loading @@ -6,6 +6,8 @@ #include <fstream> #include <iterator> #include <memory> #include <unordered_map> #include <sstream> #include <csignal> #include <random> Loading @@ -13,6 +15,10 @@ using namespace std; // rpc address cache std::unique_ptr<std::unordered_map<uint64_t, hg_addr_t>> rpc_addresses; bool is_fs_path(const char* path) { return strstr(path, CTX->mountdir().c_str()) == path; } Loading Loading @@ -141,6 +147,7 @@ bool read_system_hostfile() { } bool lookup_all_hosts() { rpc_addresses = std::make_unique<std::unordered_map<uint64_t, hg_addr_t>>(); vector<uint64_t> hosts(CTX->fs_conf()->host_size); // populate vector with [0, ..., host_size - 1] ::iota(::begin(hosts), ::end(hosts), 0); Loading Loading @@ -192,11 +199,21 @@ bool lookup_all_hosts() { remote_addr, CTX->fs_conf()->hosts.at(CTX->fs_conf()->host_id)); return false; } rpc_addresses.insert(make_pair(host, svr_addr)); rpc_addresses->insert(make_pair(host, svr_addr)); } return true; } void cleanup_addresses() { CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (const auto& e : *rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } } /** * Retrieve abstract svr address handle for hostid * @param hostid Loading @@ -204,8 +221,8 @@ bool lookup_all_hosts() { * @return */ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { auto address_lookup = rpc_addresses.find(hostid); auto found = address_lookup != rpc_addresses.end(); auto address_lookup = rpc_addresses->find(hostid); auto found = address_lookup != rpc_addresses->end(); if (found) { svr_addr = address_lookup->second; CTX->log()->trace("{}() RPC address lookup success with hostid {}", __func__, address_lookup->first); Loading Loading
ifs/include/preload/preload_util.hpp +2 −2 Original line number Diff line number Diff line Loading @@ -61,8 +61,6 @@ extern hg_id_t rpc_read_data_id; extern hg_id_t rpc_trunc_data_id; extern hg_id_t rpc_get_dirents_id; extern hg_id_t rpc_chunk_stat_id; // rpc addresses. Populated when environment is initialized. After that it is read-only accessed extern std::map<uint64_t, hg_addr_t> rpc_addresses; // function definitions Loading @@ -76,6 +74,8 @@ bool read_system_hostfile(); bool lookup_all_hosts(); void cleanup_addresses(); bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr); bool is_local_op(size_t recipient); Loading
ifs/src/preload/preload.cpp +1 −12 Original line number Diff line number Diff line Loading @@ -55,8 +55,6 @@ hg_id_t rpc_chunk_stat_id; // Margo instances margo_instance_id ld_margo_ipc_id; margo_instance_id ld_margo_rpc_id; // rpc address cache std::map<uint64_t, hg_addr_t> rpc_addresses; // local daemon IPC address hg_addr_t daemon_svr_addr = HG_ADDR_NULL; Loading Loading @@ -271,8 +269,6 @@ void init_ld_environment_() { CTX->log()->error("{}() Unable to read system hostfile /etc/hosts for address mapping.", __func__); exit(EXIT_FAILURE); } //use rpc_addresses here to avoid "static initialization order problem" rpc_addresses.clear(); if (!lookup_all_hosts()) { CTX->log()->error("{}() Unable to lookup all host RPC addresses.", __func__); exit(EXIT_FAILURE); Loading Loading @@ -365,13 +361,7 @@ void destroy_preload() { // Shut down RPC client if used if (ld_margo_rpc_id != nullptr) { // free all rpc addresses in LRU map and finalize margo rpc CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (auto& e : rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } cleanup_addresses(); CTX->log()->debug("{}() About to finalize the margo RPC client. Actually not doing it XXX", __func__); // XXX Sometimes this hangs on the cluster. Investigate. // Might been solved in margo 0.3. It is not an issue with Omnipath for sure. Maybe CCI only issue. Loading @@ -388,7 +378,6 @@ void destroy_preload() { CTX->log()->debug("{}() Shut down Margo IPC client successful", __func__); } if (services_used) { rpc_addresses.clear(); CTX->log()->info("All services shut down. Client shutdown complete."); } else Loading
ifs/src/preload/preload_util.cpp +20 −3 Original line number Diff line number Diff line Loading @@ -6,6 +6,8 @@ #include <fstream> #include <iterator> #include <memory> #include <unordered_map> #include <sstream> #include <csignal> #include <random> Loading @@ -13,6 +15,10 @@ using namespace std; // rpc address cache std::unique_ptr<std::unordered_map<uint64_t, hg_addr_t>> rpc_addresses; bool is_fs_path(const char* path) { return strstr(path, CTX->mountdir().c_str()) == path; } Loading Loading @@ -141,6 +147,7 @@ bool read_system_hostfile() { } bool lookup_all_hosts() { rpc_addresses = std::make_unique<std::unordered_map<uint64_t, hg_addr_t>>(); vector<uint64_t> hosts(CTX->fs_conf()->host_size); // populate vector with [0, ..., host_size - 1] ::iota(::begin(hosts), ::end(hosts), 0); Loading Loading @@ -192,11 +199,21 @@ bool lookup_all_hosts() { remote_addr, CTX->fs_conf()->hosts.at(CTX->fs_conf()->host_id)); return false; } rpc_addresses.insert(make_pair(host, svr_addr)); rpc_addresses->insert(make_pair(host, svr_addr)); } return true; } void cleanup_addresses() { CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (const auto& e : *rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } } /** * Retrieve abstract svr address handle for hostid * @param hostid Loading @@ -204,8 +221,8 @@ bool lookup_all_hosts() { * @return */ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { auto address_lookup = rpc_addresses.find(hostid); auto found = address_lookup != rpc_addresses.end(); auto address_lookup = rpc_addresses->find(hostid); auto found = address_lookup != rpc_addresses->end(); if (found) { svr_addr = address_lookup->second; CTX->log()->trace("{}() RPC address lookup success with hostid {}", __func__, address_lookup->first); Loading