Commit 815e76eb authored by Marc Vef's avatar Marc Vef
Browse files

Preload: RPC addresses of hosts are now obtained at startup

Previously, host rpc addresses have been looked up lazily when a request
was done. This is not necessary as all hosts are looked up anyways
while the file system is running.

Now, all RPC addresses are looked up when the environment is initialized.
After that, the rpc_address map is only accessed read-only and does not
require a mutex anymore.
parent f62ad5bb
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -97,9 +97,8 @@ extern hg_id_t rpc_read_data_id;
extern std::shared_ptr<struct FsConfig> fs_config;
// global logger instance
extern std::shared_ptr<spdlog::logger> ld_logger;
// rpc address cache
extern std::map<uint64_t, hg_addr_t> rpc_address_cache;
extern ABT_mutex rpc_address_cache_mutex;
// rpc addresses. Populated when environment is initialized. After that it is read-only accessed
extern std::map<uint64_t, hg_addr_t> rpc_addresses;
// file descriptor index validation flag
extern std::atomic<bool> fd_validation_needed;

@@ -120,6 +119,8 @@ int get_daemon_pid();

bool read_system_hostfile();

bool lookup_all_hosts();

bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr);

bool is_local_op(size_t recipient);
+5 −11
Original line number Diff line number Diff line
@@ -43,8 +43,7 @@ hg_id_t rpc_read_data_id;
margo_instance_id ld_margo_ipc_id;
margo_instance_id ld_margo_rpc_id;
// rpc address cache
std::map<uint64_t, hg_addr_t> rpc_address_cache;
ABT_mutex rpc_address_cache_mutex;
std::map<uint64_t, hg_addr_t> rpc_addresses;
// local daemon IPC address
hg_addr_t daemon_svr_addr = HG_ADDR_NULL;

@@ -237,8 +236,8 @@ void init_ld_environment_() {
        ld_logger->error("{}() Unable to read system hostfile /etc/hosts for address mapping.", __func__);
        exit(EXIT_FAILURE);
    }
    if (ABT_mutex_create(&rpc_address_cache_mutex) != ABT_SUCCESS) {
        ld_logger->error("{}() Unable to create RPC address cache mutex.", __func__);
    if (!lookup_all_hosts()) {
        ld_logger->error("{}() Unable to lookup all host RPC addresses.", __func__);
        exit(EXIT_FAILURE);
    }
    ld_logger->info("{}() Environment initialization successful.", __func__);
@@ -282,16 +281,11 @@ void destroy_preload() {
        margo_diag_dump(ld_margo_rpc_id, "-", 0);
    }
#endif
    if (services_used) {
        ld_logger->debug("{}() Freeing ABT constructs ...", __func__);
        ABT_mutex_free(&rpc_address_cache_mutex);
        ld_logger->debug("{}() Freeing ABT constructs successful", __func__);
    }
    // Shut down RPC client if used
    if (ld_margo_rpc_id != nullptr) {
        // free all rpc addresses in LRU map and finalize margo rpc
        ld_logger->debug("{}() Freeing Margo RPC svr addresses ...", __func__);
        for (auto& e : rpc_address_cache) {
        for (auto& e : rpc_addresses) {
            ld_logger->info("{}() Trying to free hostid {}", __func__, e.first);
            if (margo_addr_free(ld_margo_rpc_id, e.second) != HG_SUCCESS) {
                ld_logger->warn("{}() Unable to free RPC client's svr address: {}.", __func__, e.first);
@@ -311,7 +305,7 @@ void destroy_preload() {
        ld_logger->debug("{}() Shut down Margo IPC client successful", __func__);
    }
    if (services_used) {
        rpc_address_cache.clear();
        rpc_addresses.clear();
        ld_logger->info("All services shut down. Client shutdown complete.");
    }
    else
+42 −35
Original line number Diff line number Diff line

#include <preload/preload_util.hpp>
#include <global/rpc/rpc_utils.hpp>
#include <global/global_func.hpp>

#include <fstream>
#include <iterator>
#include <sstream>
#include <global/rpc/rpc_utils.hpp>
#include <global/global_func.hpp>
#include <csignal>
#include <random>

using namespace std;

@@ -283,33 +284,23 @@ bool read_system_hostfile() {
    return true;
}

/**
 * Creates an abstract rpc address for a given hostid and puts it into an address cache map
 * @param hostid
 * @param svr_addr
 * @return
 */
bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
bool lookup_all_hosts() {
    vector<uint64_t> hosts(fs_config->host_size);
    // populate vector with [0, ..., host_size - 1]
    ::iota(::begin(hosts), ::end(hosts), 0);
    /*
     * This function might get called from within an Argobots thread.
     * A std::mutex would lead to a deadlock as it does not yield the thread by sending it to BLOCKING state
     * Shuffle hosts to balance addr lookups to all hosts
     * Too many concurrent lookups send to same host could overwhelm the server, returning error when addr lookup
     */
    ABT_mutex_lock(rpc_address_cache_mutex);
    auto address_lookup = rpc_address_cache.find(hostid);
    if (address_lookup != rpc_address_cache.end()) {
        svr_addr = address_lookup->second;
        ld_logger->trace("RPC address lookup success with hostid {}", address_lookup->first);
        //found
        ABT_mutex_unlock(rpc_address_cache_mutex);
        return true;
    } else {
        // not found, manual lookup and add address mapping to LRU cache
        ld_logger->trace("not found in lrucache");
    ::random_device rd; // obtain a random number from hardware
    ::mt19937 g(rd()); // seed the random generator
    ::shuffle(hosts.begin(), hosts.end(), g); // Shuffle hosts vector
    // lookup addresses and put abstract server addresses into rpc_addresses
    for (auto& host : hosts) {
        string remote_addr;
        // Try to get the ip of remote addr. If it cannot be found, use hostname
        // first get the hostname with the hostid
        auto hostname = fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX;
        // then get the ip address from /etc/hosts which is mapped to the sys_hostfile map
        hg_addr_t svr_addr = HG_ADDR_NULL;
        auto hostname = fs_config->hosts.at(host) + HOSTNAME_SUFFIX;
        // get the ip address from /etc/hosts which is mapped to the sys_hostfile map
        if (fs_config->sys_hostfile.count(hostname) == 1) {
            auto remote_ip = fs_config->sys_hostfile.at(hostname);
            remote_addr = RPC_PROTOCOL + "://"s + remote_ip + ":"s + fs_config->rpc_port;
@@ -323,20 +314,19 @@ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
                         remote_addr, hostname, fs_config->rpc_port);
        // try to look up 3 times before erroring out
        hg_return_t ret;
        // TODO If this is solution is somewhat helpful, write a more versatile solution
        for (unsigned int i = 0; i < 3; i++) {
        for (uint32_t i = 0; i < 4; i++) {
            ret = margo_addr_lookup(ld_margo_rpc_id, remote_addr.c_str(), &svr_addr);
            if (ret != HG_SUCCESS) {
                // still not working after 5 tries.
                if (i == 4) {
                    ld_logger->error("{}() Unable to lookup address {} from host {}", __func__,
                                     remote_addr, fs_config->hosts.at(fs_config->host_id));
                    ABT_mutex_unlock(rpc_address_cache_mutex);
                    return false;
                }
                // Wait a second then try again
                // TODO fix that terrible solution
                sleep(1 * (i + 1));
                // Wait a random amount of time and try again
                ::mt19937 eng(rd()); // seed the random generator
                ::uniform_int_distribution<> distr(50, 50 * (i + 2)); // define the range
                ::this_thread::sleep_for(std::chrono::milliseconds(distr(eng)));
            } else {
                break;
            }
@@ -344,13 +334,30 @@ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
        if (svr_addr == HG_ADDR_NULL) {
            ld_logger->error("{}() looked up address is NULL for address {} from host {}", __func__,
                             remote_addr, fs_config->hosts.at(fs_config->host_id));
            ABT_mutex_unlock(rpc_address_cache_mutex);
            return false;
        }
        rpc_address_cache.insert(make_pair(hostid, svr_addr));
        ABT_mutex_unlock(rpc_address_cache_mutex);
        rpc_addresses.insert(make_pair(host, svr_addr));
    }
    return true;
}

/**
 * Retrieve abstract svr address handle for hostid
 * @param hostid
 * @param svr_addr
 * @return
 */
bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
    auto address_lookup = rpc_addresses.find(hostid);
    if (address_lookup != rpc_addresses.end()) {
        svr_addr = address_lookup->second;
        ld_logger->trace("{}() RPC address lookup success with hostid {}", __func__, address_lookup->first);
        return true;
    } else {
        // not found, unexpected host
        ld_logger->error("{}() Unexpected host id {}. Not found in RPC address cache", __func__);
    }
    return false;
}

/**