Loading ifs/include/preload/preload_util.hpp +2 −2 Original line number Diff line number Diff line Loading @@ -70,8 +70,6 @@ extern hg_id_t rpc_update_metadentry_size_id; extern hg_id_t rpc_write_data_id; extern hg_id_t rpc_read_data_id; extern hg_id_t rpc_get_dirents_id; // rpc addresses. Populated when environment is initialized. After that it is read-only accessed extern std::map<uint64_t, hg_addr_t> rpc_addresses; // function definitions Loading @@ -88,6 +86,8 @@ bool read_system_hostfile(); bool lookup_all_hosts(); void cleanup_addresses(); bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr); bool is_local_op(size_t recipient); Loading ifs/src/preload/preload.cpp +1 −10 Original line number Diff line number Diff line Loading @@ -45,8 +45,6 @@ hg_id_t rpc_get_dirents_id; // Margo instances margo_instance_id ld_margo_ipc_id; margo_instance_id ld_margo_rpc_id; // rpc address cache std::map<uint64_t, hg_addr_t> rpc_addresses; // local daemon IPC address hg_addr_t daemon_svr_addr = HG_ADDR_NULL; Loading Loading @@ -322,13 +320,7 @@ void destroy_preload() { // Shut down RPC client if used if (ld_margo_rpc_id != nullptr) { // free all rpc addresses in LRU map and finalize margo rpc CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (auto& e : rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } cleanup_addresses(); CTX->log()->debug("{}() About to finalize the margo RPC client. Actually not doing it XXX", __func__); // XXX Sometimes this hangs on the cluster. Investigate. // Might been solved in margo 0.3. It is not an issue with Omnipath for sure. Maybe CCI only issue. Loading @@ -345,7 +337,6 @@ void destroy_preload() { CTX->log()->debug("{}() Shut down Margo IPC client successful", __func__); } if (services_used) { rpc_addresses.clear(); CTX->log()->info("All services shut down. Client shutdown complete."); } else Loading ifs/src/preload/preload_util.cpp +19 −3 Original line number Diff line number Diff line Loading @@ -6,6 +6,8 @@ #include <fstream> #include <iterator> #include <memory> #include <unordered_map> #include <sstream> #include <csignal> #include <random> Loading @@ -14,6 +16,9 @@ using namespace std; static const std::string dentry_val_delim = ","s; // rpc address cache std::unique_ptr<std::unordered_map<uint64_t, hg_addr_t>> rpc_addresses; bool is_fs_path(const char* path) { return strstr(path, CTX->mountdir().c_str()) == path; } Loading Loading @@ -257,6 +262,7 @@ bool read_system_hostfile() { } bool lookup_all_hosts() { rpc_addresses = std::make_unique<std::unordered_map<uint64_t, hg_addr_t>>(); vector<uint64_t> hosts(CTX->fs_conf()->host_size); // populate vector with [0, ..., host_size - 1] ::iota(::begin(hosts), ::end(hosts), 0); Loading Loading @@ -308,11 +314,21 @@ bool lookup_all_hosts() { remote_addr, CTX->fs_conf()->hosts.at(CTX->fs_conf()->host_id)); return false; } rpc_addresses.insert(make_pair(host, svr_addr)); rpc_addresses->insert(make_pair(host, svr_addr)); } return true; } void cleanup_addresses() { CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (const auto& e : *rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } } /** * Retrieve abstract svr address handle for hostid * @param hostid Loading @@ -320,8 +336,8 @@ bool lookup_all_hosts() { * @return */ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { auto address_lookup = rpc_addresses.find(hostid); auto found = address_lookup != rpc_addresses.end(); auto address_lookup = rpc_addresses->find(hostid); auto found = address_lookup != rpc_addresses->end(); if (found) { svr_addr = address_lookup->second; CTX->log()->trace("{}() RPC address lookup success with hostid {}", __func__, address_lookup->first); Loading ifs/include/preload/open_file_map.hpp +1 −1 File changed.Contains only whitespace changes. Show changes Loading
ifs/include/preload/preload_util.hpp +2 −2 Original line number Diff line number Diff line Loading @@ -70,8 +70,6 @@ extern hg_id_t rpc_update_metadentry_size_id; extern hg_id_t rpc_write_data_id; extern hg_id_t rpc_read_data_id; extern hg_id_t rpc_get_dirents_id; // rpc addresses. Populated when environment is initialized. After that it is read-only accessed extern std::map<uint64_t, hg_addr_t> rpc_addresses; // function definitions Loading @@ -88,6 +86,8 @@ bool read_system_hostfile(); bool lookup_all_hosts(); void cleanup_addresses(); bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr); bool is_local_op(size_t recipient); Loading
ifs/src/preload/preload.cpp +1 −10 Original line number Diff line number Diff line Loading @@ -45,8 +45,6 @@ hg_id_t rpc_get_dirents_id; // Margo instances margo_instance_id ld_margo_ipc_id; margo_instance_id ld_margo_rpc_id; // rpc address cache std::map<uint64_t, hg_addr_t> rpc_addresses; // local daemon IPC address hg_addr_t daemon_svr_addr = HG_ADDR_NULL; Loading Loading @@ -322,13 +320,7 @@ void destroy_preload() { // Shut down RPC client if used if (ld_margo_rpc_id != nullptr) { // free all rpc addresses in LRU map and finalize margo rpc CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (auto& e : rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } cleanup_addresses(); CTX->log()->debug("{}() About to finalize the margo RPC client. Actually not doing it XXX", __func__); // XXX Sometimes this hangs on the cluster. Investigate. // Might been solved in margo 0.3. It is not an issue with Omnipath for sure. Maybe CCI only issue. Loading @@ -345,7 +337,6 @@ void destroy_preload() { CTX->log()->debug("{}() Shut down Margo IPC client successful", __func__); } if (services_used) { rpc_addresses.clear(); CTX->log()->info("All services shut down. Client shutdown complete."); } else Loading
ifs/src/preload/preload_util.cpp +19 −3 Original line number Diff line number Diff line Loading @@ -6,6 +6,8 @@ #include <fstream> #include <iterator> #include <memory> #include <unordered_map> #include <sstream> #include <csignal> #include <random> Loading @@ -14,6 +16,9 @@ using namespace std; static const std::string dentry_val_delim = ","s; // rpc address cache std::unique_ptr<std::unordered_map<uint64_t, hg_addr_t>> rpc_addresses; bool is_fs_path(const char* path) { return strstr(path, CTX->mountdir().c_str()) == path; } Loading Loading @@ -257,6 +262,7 @@ bool read_system_hostfile() { } bool lookup_all_hosts() { rpc_addresses = std::make_unique<std::unordered_map<uint64_t, hg_addr_t>>(); vector<uint64_t> hosts(CTX->fs_conf()->host_size); // populate vector with [0, ..., host_size - 1] ::iota(::begin(hosts), ::end(hosts), 0); Loading Loading @@ -308,11 +314,21 @@ bool lookup_all_hosts() { remote_addr, CTX->fs_conf()->hosts.at(CTX->fs_conf()->host_id)); return false; } rpc_addresses.insert(make_pair(host, svr_addr)); rpc_addresses->insert(make_pair(host, svr_addr)); } return true; } void cleanup_addresses() { CTX->log()->debug("{}() Freeing Margo RPC svr addresses ...", __func__); for (const auto& e : *rpc_addresses) { CTX->log()->info("{}() Trying to free hostid {}", __func__, e.first); if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) { CTX->log()->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first); } } } /** * Retrieve abstract svr address handle for hostid * @param hostid Loading @@ -320,8 +336,8 @@ bool lookup_all_hosts() { * @return */ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { auto address_lookup = rpc_addresses.find(hostid); auto found = address_lookup != rpc_addresses.end(); auto address_lookup = rpc_addresses->find(hostid); auto found = address_lookup != rpc_addresses->end(); if (found) { svr_addr = address_lookup->second; CTX->log()->trace("{}() RPC address lookup success with hostid {}", __func__, address_lookup->first); Loading
ifs/include/preload/open_file_map.hpp +1 −1 File changed.Contains only whitespace changes. Show changes