Commit c264a827 authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch '45-improve-client-rpc-address-lookups' into 'master'

Resolve "Improve client RPC address lookups"

Closes #45

See merge request zdvresearch_bsc/adafs!31
parents f62ad5bb aee6f5e5
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -97,9 +97,8 @@ extern hg_id_t rpc_read_data_id;
extern std::shared_ptr<struct FsConfig> fs_config;
// global logger instance
extern std::shared_ptr<spdlog::logger> ld_logger;
// rpc address cache
extern std::map<uint64_t, hg_addr_t> rpc_address_cache;
extern ABT_mutex rpc_address_cache_mutex;
// rpc addresses. Populated when environment is initialized. After that it is read-only accessed
extern std::map<uint64_t, hg_addr_t> rpc_addresses;
// file descriptor index validation flag
extern std::atomic<bool> fd_validation_needed;

@@ -120,6 +119,8 @@ int get_daemon_pid();

bool read_system_hostfile();

bool lookup_all_hosts();

bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr);

bool is_local_op(size_t recipient);
+5 −11
Original line number Diff line number Diff line
@@ -43,8 +43,7 @@ hg_id_t rpc_read_data_id;
margo_instance_id ld_margo_ipc_id;
margo_instance_id ld_margo_rpc_id;
// rpc address cache
std::map<uint64_t, hg_addr_t> rpc_address_cache;
ABT_mutex rpc_address_cache_mutex;
std::map<uint64_t, hg_addr_t> rpc_addresses;
// local daemon IPC address
hg_addr_t daemon_svr_addr = HG_ADDR_NULL;

@@ -237,8 +236,8 @@ void init_ld_environment_() {
        ld_logger->error("{}() Unable to read system hostfile /etc/hosts for address mapping.", __func__);
        exit(EXIT_FAILURE);
    }
    if (ABT_mutex_create(&rpc_address_cache_mutex) != ABT_SUCCESS) {
        ld_logger->error("{}() Unable to create RPC address cache mutex.", __func__);
    if (!lookup_all_hosts()) {
        ld_logger->error("{}() Unable to lookup all host RPC addresses.", __func__);
        exit(EXIT_FAILURE);
    }
    ld_logger->info("{}() Environment initialization successful.", __func__);
@@ -282,16 +281,11 @@ void destroy_preload() {
        margo_diag_dump(ld_margo_rpc_id, "-", 0);
    }
#endif
    if (services_used) {
        ld_logger->debug("{}() Freeing ABT constructs ...", __func__);
        ABT_mutex_free(&rpc_address_cache_mutex);
        ld_logger->debug("{}() Freeing ABT constructs successful", __func__);
    }
    // Shut down RPC client if used
    if (ld_margo_rpc_id != nullptr) {
        // free all rpc addresses in LRU map and finalize margo rpc
        ld_logger->debug("{}() Freeing Margo RPC svr addresses ...", __func__);
        for (auto& e : rpc_address_cache) {
        for (auto& e : rpc_addresses) {
            ld_logger->info("{}() Trying to free hostid {}", __func__, e.first);
            if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) {
                ld_logger->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first);
@@ -311,7 +305,7 @@ void destroy_preload() {
        ld_logger->debug("{}() Shut down Margo IPC client successful", __func__);
    }
    if (services_used) {
        rpc_address_cache.clear();
        rpc_addresses.clear();
        ld_logger->info("All services shut down. Client shutdown complete.");
    }
    else
+45 −35
Original line number Diff line number Diff line

#include <preload/preload_util.hpp>
#include <global/rpc/rpc_utils.hpp>
#include <global/global_func.hpp>

#include <fstream>
#include <iterator>
#include <sstream>
#include <global/rpc/rpc_utils.hpp>
#include <global/global_func.hpp>
#include <csignal>
#include <random>

using namespace std;

@@ -283,33 +284,23 @@ bool read_system_hostfile() {
    return true;
}

/**
 * Creates an abstract rpc address for a given hostid and puts it into an address cache map
 * @param hostid
 * @param svr_addr
 * @return
 */
bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
bool lookup_all_hosts() {
    vector<uint64_t> hosts(fs_config->host_size);
    // populate vector with [0, ..., host_size - 1]
    ::iota(::begin(hosts), ::end(hosts), 0);
    /*
     * This function might get called from within an Argobots thread.
     * A std::mutex would lead to a deadlock as it does not yield the thread by sending it to BLOCKING state
     * Shuffle hosts to balance addr lookups to all hosts
     * Too many concurrent lookups send to same host could overwhelm the server, returning error when addr lookup
     */
    ABT_mutex_lock(rpc_address_cache_mutex);
    auto address_lookup = rpc_address_cache.find(hostid);
    if (address_lookup != rpc_address_cache.end()) {
        svr_addr = address_lookup->second;
        ld_logger->trace("RPC address lookup success with hostid {}", address_lookup->first);
        //found
        ABT_mutex_unlock(rpc_address_cache_mutex);
        return true;
    } else {
        // not found, manual lookup and add address mapping to LRU cache
        ld_logger->trace("not found in lrucache");
    ::random_device rd; // obtain a random number from hardware
    ::mt19937 g(rd()); // seed the random generator
    ::shuffle(hosts.begin(), hosts.end(), g); // Shuffle hosts vector
    // lookup addresses and put abstract server addresses into rpc_addresses
    for (auto& host : hosts) {
        string remote_addr;
        // Try to get the ip of remote addr. If it cannot be found, use hostname
        // first get the hostname with the hostid
        auto hostname = fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX;
        // then get the ip address from /etc/hosts which is mapped to the sys_hostfile map
        hg_addr_t svr_addr = HG_ADDR_NULL;
        auto hostname = fs_config->hosts.at(host) + HOSTNAME_SUFFIX;
        // get the ip address from /etc/hosts which is mapped to the sys_hostfile map
        if (fs_config->sys_hostfile.count(hostname) == 1) {
            auto remote_ip = fs_config->sys_hostfile.at(hostname);
            remote_addr = RPC_PROTOCOL + "://"s + remote_ip + ":"s + fs_config->rpc_port;
@@ -323,20 +314,19 @@ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
                         remote_addr, hostname, fs_config->rpc_port);
        // try to look up 3 times before erroring out
        hg_return_t ret;
        // TODO If this is solution is somewhat helpful, write a more versatile solution
        for (unsigned int i = 0; i < 3; i++) {
        for (uint32_t i = 0; i < 4; i++) {
            ret = margo_addr_lookup(ld_margo_rpc_id, remote_addr.c_str(), &svr_addr);
            if (ret != HG_SUCCESS) {
                // still not working after 5 tries.
                if (i == 4) {
                    ld_logger->error("{}() Unable to lookup address {} from host {}", __func__,
                                     remote_addr, fs_config->hosts.at(fs_config->host_id));
                    ABT_mutex_unlock(rpc_address_cache_mutex);
                    return false;
                }
                // Wait a second then try again
                // TODO fix that terrible solution
                sleep(1 * (i + 1));
                // Wait a random amount of time and try again
                ::mt19937 eng(rd()); // seed the random generator
                ::uniform_int_distribution<> distr(50, 50 * (i + 2)); // define the range
                ::this_thread::sleep_for(std::chrono::milliseconds(distr(eng)));
            } else {
                break;
            }
@@ -344,13 +334,33 @@ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
        if (svr_addr == HG_ADDR_NULL) {
            ld_logger->error("{}() looked up address is NULL for address {} from host {}", __func__,
                             remote_addr, fs_config->hosts.at(fs_config->host_id));
            ABT_mutex_unlock(rpc_address_cache_mutex);
            return false;
        }
        rpc_address_cache.insert(make_pair(hostid, svr_addr));
        ABT_mutex_unlock(rpc_address_cache_mutex);
        rpc_addresses.insert(make_pair(host, svr_addr));
    }
    return true;
}

/**
 * Retrieve abstract svr address handle for hostid
 * @param hostid
 * @param svr_addr
 * @return
 */
bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
    auto address_lookup = rpc_addresses.find(hostid);
    auto found = address_lookup != rpc_addresses.end();
    if (found) {
        svr_addr = address_lookup->second;
        ld_logger->trace("{}() RPC address lookup success with hostid {}", __func__, address_lookup->first);
        return true;
    } else {
        // not found, unexpected host.
        // This should not happen because all addresses are looked when the environment is initialized.
        ld_logger->error("{}() Unexpected host id {}. Not found in RPC address cache", __func__, hostid);
        assert(found && "Unexpected host id for rpc address lookup. ID was not found in RPC address cache.");
    }
    return false;
}

/**