Commit 6d9d3ff1 authored by Jean Bez's avatar Jean Bez
Browse files

use a file to map clients to forwarders

parent 368f38cb
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -66,12 +66,14 @@ extern hg_id_t rpc_mk_symlink_id;
int metadata_to_stat(const std::string& path, const Metadata& md, struct stat& attr);

std::vector<std::pair<std::string, std::string>> load_hosts_file(const std::string& lfpath);
std::map<std::string, uint64_t> load_forwarding_map_file(const std::string& lfpath);

hg_addr_t get_local_addr();

void load_hosts();
bool lookup_all_hosts();
uint64_t get_my_forwarder();
//uint64_t get_my_forwarder();
void load_forwarding_map();

void cleanup_addresses();

+1 −0
Original line number Diff line number Diff line
@@ -55,6 +55,7 @@
#define RPC_TIMEOUT 3000

#define DEFAULT_HOSTS_FILE "./gkfs_hosts.txt"
#define DEFAULT_FORWARDING_MAP_FILE "./gkfs_forwarding.map"

//size of preallocated buffer to hold directory entries in rpc call
#define RPC_DIRENTS_BUFF_SIZE (8 * 1024 * 1024) // 8 mega
+9 −1
Original line number Diff line number Diff line
@@ -169,7 +169,7 @@ void init_ld_environment_() {
    //auto simple_hash_dist = std::make_shared<SimpleHashDistributor>(CTX->local_host_id(), CTX->hosts().size());
    //CTX->distributor(simple_hash_dist);

    try {
    /*try {
        CTX->fwd_host_id(get_my_forwarder());
        if (CTX->fwd_host_id() > CTX->hosts().size()) {
            throw std::runtime_error("Invalid forwarding host");
@@ -178,6 +178,14 @@ void init_ld_environment_() {
        CTX->log()->debug("{}() Forward to {}", __func__, CTX->fwd_host_id());
    } catch (std::exception& e){
        exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what()));
    }*/

    try {
        load_forwarding_map();

        CTX->log()->info("{}() Forward to {}", __func__, CTX->fwd_host_id());
    } catch (std::exception& e){
        exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what()));
    }
    
    auto forwarder_dist = std::make_shared<ForwarderDistributor>(CTX->fwd_host_id(), CTX->hosts().size());
+62 −1
Original line number Diff line number Diff line
@@ -103,6 +103,34 @@ vector<pair<string, string>> load_hosts_file(const std::string& lfpath) {
    return hosts;
}

map<string, uint64_t> load_forwarding_map_file(const std::string& lfpath) {
    CTX->log()->debug("{}() Loading forwarding map file: '{}'", __func__, lfpath);
    ifstream lf(lfpath);
    if (!lf) {
        throw runtime_error(fmt::format("Failed to open forwarding map file '{}': {}",
                            lfpath, strerror(errno)));
    }
    map<string, uint64_t> forwarding_map;
    const regex line_re("^(\\S+)\\s+(\\S+)$",
                        regex::ECMAScript | regex::optimize);
    string line;
    string host;
    uint64_t forwarder;
    std::smatch match;
    while (getline(lf, line)) {
        if (!regex_match(line, match, line_re)) {
            spdlog::error("{}() Unrecognized line format: [path: '{}', line: '{}']",
                          __func__, lfpath, line);
            throw runtime_error(
                    fmt::format("unrecognized line format: '{}'", line));
        }
        host = match[1];
        forwarder = std::stoi(match[2].str());
        forwarding_map[host] = forwarder;
    }
    return forwarding_map;
}

hg_addr_t margo_addr_lookup_retry(const std::string& uri) {
    CTX->log()->debug("{}() Looking up address '{}'", __func__, uri);
    // try to look up 3 times before erroring out
@@ -125,13 +153,14 @@ hg_addr_t margo_addr_lookup_retry(const std::string& uri) {
            fmt::format("Failed to lookup address '{}', error: {}", uri, HG_Error_to_string(ret)));
}

uint64_t get_my_forwarder() {
/*uint64_t get_my_forwarder() {
    uint64_t forwarder;

    string forwarder_host;
    char *parsed;

    forwarder_host = gkfs::get_env_own("FORWARDER");

    forwarder = strtoul(forwarder_host.c_str(), &parsed, 10);

    if (parsed != forwarder_host.c_str() + forwarder_host.size()) {
@@ -140,6 +169,38 @@ uint64_t get_my_forwarder() {
    }
    
    return forwarder;
}*/

void load_forwarding_map() {
    string forwarding_map_file;
    try {
        forwarding_map_file = gkfs::get_env_own("FORWARDING_MAP_FILE");
    } catch (const exception& e) {
        CTX->log()->info("{}() Failed to get the forwarding map file path"
                         " from environment, using default: '{}'",
                         __func__, DEFAULT_FORWARDING_MAP_FILE);
        forwarding_map_file = DEFAULT_FORWARDING_MAP_FILE;
    }

    map<string, uint64_t> forwarding_map;
    try {
        forwarding_map = load_forwarding_map_file(forwarding_map_file);
    } catch (const exception& e) {
        auto emsg = fmt::format("Failed to load forwarding map file: {}", e.what());
        throw runtime_error(emsg);
    }

    if (forwarding_map.size() == 0) {
        throw runtime_error(fmt::format("Forwarding map file is empty: '{}'", forwarding_map_file));
    }

    auto local_hostname = get_my_hostname(true);

    if (forwarding_map.find(local_hostname) == forwarding_map.end()) {
        throw runtime_error(fmt::format("Unable to determine the forwarder for host: '{}'", local_hostname));   
    }

    CTX->fwd_host_id(forwarding_map[local_hostname]);
}

void load_hosts() {
+3 −3
Original line number Diff line number Diff line
@@ -161,7 +161,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    auto const host_id = in.host_id;
    auto const host_size = in.host_size;
    //auto const host_size = in.host_size;
    //SimpleHashDistributor distributor(host_id, host_size);
    //ForwarderDistributor distributor(host_id, host_size);

@@ -365,8 +365,8 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    auto const host_id = in.host_id;
    auto const host_size = in.host_size;
    //auto const host_id = in.host_id;
    //auto const host_size = in.host_size;
    //SimpleHashDistributor distributor(host_id, host_size);
    //ForwarderDistributor distributor(host_id, host_size);