Commit 87687a45 authored by Marc Vef's avatar Marc Vef
Browse files

Omnipath support added. First cut

parent a2776cd4
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ struct FsConfig {

    // rpc infos
    std::map<uint64_t, std::string> hosts;
    std::map<std::string, std::string> sys_hostfile;
    uint64_t host_id; // my host number
    size_t host_size;
    std::string rpc_port;
@@ -114,6 +115,8 @@ std::string daemon_register_path(int pid);

bool get_daemon_auxiliaries();

bool read_system_hostfile();

bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr);

size_t get_rpc_node(const std::string& to_hash);
+5 −1
Original line number Diff line number Diff line
@@ -141,7 +141,7 @@ bool init_margo_client(Margo_mode mode, const string na_plugin) {
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_context_t* hg_context;
    hg_class = HG_Init(na_plugin.c_str(), HG_FALSE);
    hg_class = HG_Init((na_plugin + "://localhost"s).c_str(), HG_FALSE);
    if (hg_class == nullptr) {
        ld_logger->error("{}() HG_Init() Failed to init Mercury client layer", __func__);
        return false;
@@ -217,6 +217,10 @@ void init_ld_environment_() {
        ld_logger->error("{}() Unable to initialize Margo RPC client.", __func__);
        exit(EXIT_FAILURE);
    }
    if (!read_system_hostfile()) {
        ld_logger->error("{}() Unable to read system hostfile /etc/hosts for address mapping.", __func__);
        exit(EXIT_FAILURE);
    }
    ld_logger->info("{}() Environment initialization successful.", __func__);
}

+49 −6
Original line number Diff line number Diff line
@@ -3,6 +3,8 @@

#include <dirent.h>
#include <fstream>
#include <iterator>
#include <sstream>

using namespace std;

@@ -250,6 +252,33 @@ bool get_daemon_auxiliaries() {
    return ret;
}

/**
 * Read /etc/hosts and put hostname - ip association into a map in fs config.
 * We are working with hostnames but some network layers (such as Omnipath) does not look into /etc/hosts.
 * Hence, we have to store the mapping ourselves.
 * @return success
 */
bool read_system_hostfile() {
    ifstream hostfile("/etc/hosts");
    if (!hostfile.is_open())
        return false;
    string line;
    map<string, string> sys_hostfile;
    while (getline(hostfile, line)) {
        if (line.empty() || line == "\n" || line.at(0) == '#')
            continue;
        std::istringstream iss(line);
        std::vector<string> tmp_list((istream_iterator<string>(iss)), istream_iterator<string>());
        for (unsigned int i = 1; i < tmp_list.size(); i++) {
            if (tmp_list[i].find(HOSTNAME_SUFFIX) != string::npos)
                sys_hostfile.insert(make_pair(tmp_list[i], tmp_list[0]));
        }
    }
    fs_config->sys_hostfile = sys_hostfile;
    ld_logger->info("{}() /etc/hosts successfully mapped into ADA-FS", __func__);
    return true;
}

/**
 * Creates an abstract rpc address for a given hostid and puts it into an address cache map
 * @param hostid
@@ -264,20 +293,34 @@ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
        return true;
    } else {
        ld_logger->trace("not found in lrucache");
        string remote_addr;
        // not found, manual lookup and add address mapping to LRU cache
        auto hostname = RPC_PROTOCOL + "://"s + fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX + ":"s +
                        fs_config->rpc_port; // convert hostid to hostname and port
        ld_logger->trace("generated hostname {} with rpc_port {}", hostname, fs_config->rpc_port);
        // ofi+psm2 requires an IP. An remote_addr, even if put in /etc/hosts, does not suffice
        if (string(RPC_PROTOCOL) ==
            "ofi+psm2") { // XXX dangerous. redo. The whole definition insanity needs to be const expr anyways
            // first get the hostname with the hostid
            auto hostname = fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX;
            // then get the ip address from /etc/hosts which is mapped to the sys_hostfile map
            if (fs_config->sys_hostfile.count(hostname) == 1) {
                auto remote_ip = fs_config->sys_hostfile.at(hostname);
                remote_addr = RPC_PROTOCOL + "://"s + remote_ip + ":"s + fs_config->rpc_port;
            }
        }
        if (remote_addr.empty()) {
            remote_addr = RPC_PROTOCOL + "://"s + fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX + ":"s +
                          fs_config->rpc_port; // convert hostid to remote_addr and port
        }
        ld_logger->trace("generated remote_addr {} with rpc_port {}", remote_addr, fs_config->rpc_port);
        // try to look up 3 times before erroring out
        hg_return_t ret;
        // TODO If this is solution is somewhat helpful, write a more versatile solution
        for (unsigned int i = 0; i < 3; i++) {
            ret = margo_addr_lookup(ld_margo_rpc_id, hostname.c_str(), &svr_addr);
            ret = margo_addr_lookup(ld_margo_rpc_id, remote_addr.c_str(), &svr_addr);
            if (ret != HG_SUCCESS) {
                // still not working after 5 tries.
                if (i == 4) {
                    ld_logger->error("{}() Unable to lookup address {} from host {}", __func__,
                                     hostname, fs_config->hosts.at(fs_config->host_id));
                                     remote_addr, fs_config->hosts.at(fs_config->host_id));
                    return false;
                }
                // Wait a second then try again
@@ -288,7 +331,7 @@ bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
        }
        if (svr_addr == HG_ADDR_NULL) {
            ld_logger->error("{}() looked up address is NULL for address {} from host {}", __func__,
                             hostname, fs_config->hosts.at(fs_config->host_id));
                             remote_addr, fs_config->hosts.at(fs_config->host_id));
            return false;
        }
        rpc_address_cache.insert(hostid, svr_addr);