Skip to content
Snippets Groups Projects
preload_util.cpp 17.3 KiB
Newer Older
  Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This file is part of GekkoFS' POSIX interface.

  GekkoFS' POSIX interface is free software: you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public License as
  published by the Free Software Foundation, either version 3 of the License,
  or (at your option) any later version.

  GekkoFS' POSIX interface is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public License
  along with GekkoFS' POSIX interface.  If not, see
  <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: LGPL-3.0-or-later
*/

#include <client/preload_util.hpp>
#include <client/env.hpp>
#include <client/logging.hpp>
#include <client/rpc/forward_metadata.hpp>
Marc Vef's avatar
Marc Vef committed
#include <client/rpc/forward_metadata_proxy.hpp>
#include <common/rpc/distributor.hpp>
#include <common/rpc/rpc_util.hpp>
#include <common/env_util.hpp>
#include <common/common_defs.hpp>
#include <hermes.hpp>
#include <sstream>
#include <regex>
#include <sys/sysmacros.h>
/**
 * Looks up a host endpoint via Hermes
 * @param uri
 * @param max_retries
 * @return hermes endpoint, if successful
 * @throws std::runtime_error
 */
Marc Vef's avatar
Marc Vef committed
lookup_endpoint(const std::string& uri, bool use_proxy = false,
                std::size_t max_retries = 3) {

    LOG(DEBUG, "Looking up address \"{}\"", uri);

    std::random_device rd; // obtain a random number from hardware
    std::size_t attempts = 0;
    std::string error_msg;

    do {
        try {
Marc Vef's avatar
Marc Vef committed
            if(use_proxy)
                return ld_proxy_service->lookup(uri);
            else
                return ld_network_service->lookup(uri);
        } catch(const exception& ex) {
            LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]", uri,
                attempts + 1, max_retries);

            // Wait a random amount of time and try again
            std::mt19937 g(rd()); // seed the random generator
            std::uniform_int_distribution<> distr(
                    50, 50 * (attempts + 2)); // define the range
            std::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
            continue;
        }
    } while(++attempts < max_retries);

    throw std::runtime_error(
            fmt::format("Endpoint for address '{}' could not be found ({})",
                        uri, error_msg));
}

/**
 * extracts protocol from a given URI generated by the RPC server of the daemon
 * @param uri
 * @throws std::runtime_error
 */
void
extract_protocol(const string& uri) {
    if(uri.rfind("://") == string::npos) {
        // invalid format. kill client
        throw runtime_error(fmt::format("Invalid format for URI: '{}'", uri));
    }
    string protocol{};
Marc Vef's avatar
Marc Vef committed
    for(const auto& valid_protocol :
        gkfs::rpc::protocol::all_remote_protocols) {
        if(uri.find(valid_protocol) != string::npos) {
            protocol = valid_protocol;
            break;
        }
Marc Vef's avatar
Marc Vef committed
    // check for shared memory protocol. Can be plain shared memory or real
    // ofi protocol + auto_sm
    if(uri.find(gkfs::rpc::protocol::na_sm) != string::npos) {
        if(protocol.empty())
            protocol = gkfs::rpc::protocol::na_sm;
        else
            CTX->auto_sm(true);
    }
    if(protocol.empty()) {
        // unsupported protocol. kill client
        throw runtime_error(fmt::format(
                "Unsupported RPC protocol found in hosts file with URI: '{}'",
                uri));
    LOG(INFO,
        "RPC protocol '{}' extracted from hosts file. Using auto_sm is '{}'",
        protocol, CTX->auto_sm());
 * Reads the daemon generator hosts file by a given path, returning hosts and
 * URI addresses
 * @param path to hosts file
 * @return vector<pair<hosts, URI>>
 * @throws std::runtime_error
 */
vector<pair<string, string>>
load_hostfile(const std::string& path) {

    LOG(DEBUG, "Loading hosts file: \"{}\"", path);

    ifstream lf(path);
        throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
                                        path, strerror(errno)));
    }
    vector<pair<string, string>> hosts;
Marc Vef's avatar
Marc Vef committed
    const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$",
                        regex::ECMAScript | regex::optimize);
    string line;
    string host;
    string uri;
    std::smatch match;
    while(getline(lf, line)) {
        // if line starts with #, it indicates the end of current FS instance
        // Further hosts are not part of the file system instance yet and are
        // therefore skipped The hostfile is ordered, so nothgin below this line
        // can contain valid hosts
        if(line.find(gkfs::client::hostsfile_end_str) != string::npos)
            break;
        if(!regex_match(line, match, line_re)) {
            LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
                path, line);
            throw runtime_error(
                    fmt::format("unrecognized line format: '{}'", line));
        }
        host = match[1];
        uri = match[2];
        hosts.emplace_back(host, uri);
    }
    if(hosts.empty()) {
        throw runtime_error(
                "Hosts file found but no suitable addresses could be extracted");
    // sort hosts so that data always hashes to the same place during restart
    std::sort(hosts.begin(), hosts.end());
    // remove rootdir suffix from host after sorting as no longer required
    for(auto& h : hosts) {
        auto idx = h.first.rfind("#");
        if(idx != string::npos)
            h.first.erase(idx, h.first.length());
    }
namespace gkfs::utils {
 * Retrieve metadata from daemon and return Metadata object
 * errno may be set
 * @param path
 * @param follow_links
optional<gkfs::metadata::Metadata>
get_metadata(const string& path, bool follow_links) {
Marc Vef's avatar
Marc Vef committed
    int err{};
    // Use file metadata from dentry cache if available
    if(CTX->use_dentry_cache()) {
        // get parent and filename path to retrieve the cache entry
        std::filesystem::path p(path);
        auto parent = p.parent_path().string();
        auto filename = p.filename().string();
        auto cache_entry = CTX->dentry_cache()->get(parent, filename);
            LOG(DEBUG, "{}(): Dentry cache hit for file '{}'", __func__, path);
            // if cache_entry exists, generate a Metadata object from it.
            mode_t mode = gkfs::config::syscall::stat::file_mode_default;
            if(cache_entry->file_type == gkfs::filemap::FileType::directory) {
                mode = gkfs::config::syscall::stat::dir_mode_default;
            }
            gkfs::metadata::Metadata md{};
            md.mode(mode);
            md.ctime(cache_entry->ctime);
            md.size(cache_entry->size);
            return md;
        }
    }
Marc Vef's avatar
Marc Vef committed
    if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) {
        err = gkfs::rpc::forward_stat_proxy(path, attr);
    } else {
        err = gkfs::rpc::forward_stat(path, attr, 0);
        // TODO: retry on failure
        if(err) {
Marc Vef's avatar
Marc Vef committed
            auto copy = 1;
            while(copy < CTX->get_replicas() + 1 && err) {
                LOG(ERROR, "Retrying Stat on replica {} {}", copy,
                    follow_links);
                err = gkfs::rpc::forward_stat(path, attr, copy);
                copy++;
            }
Marc Vef's avatar
Marc Vef committed
    if(err) {
        errno = err;
        return {};
    }
    if(follow_links) {
        gkfs::metadata::Metadata md{attr};
        while(md.is_link()) {
Marc Vef's avatar
Marc Vef committed
            if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) {
                err = gkfs::rpc::forward_stat_proxy(md.target_path(), attr);
            } else {
                err = gkfs::rpc::forward_stat(md.target_path(), attr, 0);
            }
Marc Vef's avatar
Marc Vef committed
                errno = err;
            }
            md = gkfs::metadata::Metadata{attr};
        }
    }
#endif
    return gkfs::metadata::Metadata{attr};
 * Converts the Metadata object into a stat struct, which is needed by Linux
int
metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md,
                 struct stat& attr) {
    /* Populate default values */
    attr.st_dev = makedev(0, 0);
    attr.st_ino = std::hash<std::string>{}(path);
    attr.st_nlink = 1;
    attr.st_uid = CTX->fs_conf()->uid;
    attr.st_gid = CTX->fs_conf()->gid;
    attr.st_rdev = 0;
    attr.st_blksize = gkfs::config::rpc::chunksize;
    attr.st_blocks = 0;

    memset(&attr.st_atim, 0, sizeof(timespec));
    memset(&attr.st_mtim, 0, sizeof(timespec));
    memset(&attr.st_ctim, 0, sizeof(timespec));
    attr.st_mode = md.mode();

#ifdef HAS_SYMLINKS
        attr.st_size = md.target_path().size() + CTX->mountdir().size();
    else
#endif
    if(CTX->fs_conf()->atime_state) {
        attr.st_atim.tv_sec = md.atime();
    if(CTX->fs_conf()->mtime_state) {
        attr.st_mtim.tv_sec = md.mtime();
    if(CTX->fs_conf()->ctime_state) {
        attr.st_ctim.tv_sec = md.ctime();
    if(CTX->fs_conf()->link_cnt_state) {
        attr.st_nlink = md.link_count();
Marc Vef's avatar
Marc Vef committed
    if(CTX->fs_conf()->blocks_state) {
        attr.st_blocks = md.blocks();
Marc Vef's avatar
Marc Vef committed
        attr.st_blocks = md.size() / gkfs::config::syscall::stat::st_nblocksize;
pair<int, off64_t>
update_file_size(const std::string& path, size_t count, off64_t offset,
                 bool is_append) {
    LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'",
        __func__, path, count, offset, is_append);
    pair<int, long> ret_offset;
    auto num_replicas = CTX->get_replicas();
    if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) {
        ret_offset = gkfs::rpc::forward_update_metadentry_size_proxy(
                path, count, offset, is_append);
    } else {
        ret_offset = gkfs::rpc::forward_update_metadentry_size(
                path, count, offset, is_append, num_replicas);
    }
    return ret_offset;
}

map<string, uint64_t>
load_forwarding_map_file(const std::string& lfpath) {

    LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath);

    ifstream lf(lfpath);
    if(!lf) {
        throw runtime_error(
                fmt::format("Failed to open forwarding map file '{}': {}",
                            lfpath, strerror(errno)));
    }
    map<string, uint64_t> forwarding_map;
    const regex line_re("^(\\S+)\\s+(\\S+)$",
                        regex::ECMAScript | regex::optimize);
    string line;
    string host;
    uint64_t forwarder;
    std::smatch match;
    while(getline(lf, line)) {
        if(!regex_match(line, match, line_re)) {
            LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
                lfpath, line);

            throw runtime_error(
                    fmt::format("unrecognized line format: '{}'", line));
        }
        host = match[1];
        forwarder = std::stoi(match[2].str());
        forwarding_map[host] = forwarder;
    }
    return forwarding_map;
}

void
load_forwarding_map() {
    string forwarding_map_file;
    forwarding_map_file = gkfs::env::get_var(
            gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);

    map<string, uint64_t> forwarding_map;

    while(forwarding_map.size() == 0) {
        try {
            forwarding_map = load_forwarding_map_file(forwarding_map_file);
        } catch(const exception& e) {
            auto emsg = fmt::format("Failed to load forwarding map file: {}",
                                    e.what());
            throw runtime_error(emsg);
        }
    }

    // if (forwarding_map.size() == 0) {
    //    throw runtime_error(fmt::format("Forwarding map file is empty: '{}'",
    //    forwarding_map_file));
    auto local_hostname = gkfs::rpc::get_my_hostname(true);
    if(forwarding_map.find(local_hostname) == forwarding_map.end()) {
        throw runtime_error(
                fmt::format("Unable to determine the forwarder for host: '{}'",
                            local_hostname));
    LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname,
        forwarding_map[local_hostname]);

    CTX->fwd_host_id(forwarding_map[local_hostname]);
}

vector<pair<string, string>>
read_hosts_file() {
    hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
                                  gkfs::config::hostfile_path);
    vector<pair<string, string>> hosts;
    try {
        hosts = load_hostfile(hostfile);
    } catch(const exception& e) {
        auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
        throw runtime_error(emsg);
    if(hosts.empty()) {
        throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
    LOG(INFO, "Hosts pool size: {}", hosts.size());
    return hosts;
}

/**
 * Connects to daemons and lookup Mercury URI addresses via Hermes
 * @param hosts vector<pair<hostname, Mercury URI address>>
 * @throws std::runtime_error through lookup_endpoint()
 */
void
connect_to_hosts(const vector<pair<string, string>>& hosts) {
    auto local_hostname = gkfs::rpc::get_my_hostname(true);
    bool local_host_found = false;
Alberto Miranda's avatar
Alberto Miranda committed
    std::vector<hermes::endpoint> addrs;
    addrs.resize(hosts.size());

    vector<uint64_t> host_ids(hosts.size());
    // populate vector with [0, ..., host_size - 1]
    ::iota(::begin(host_ids), ::end(host_ids), 0);
     * Shuffle hosts to balance addr lookups to all hosts
     * Too many concurrent lookups send to same host
     * could overwhelm the server,
     * returning error when addr lookup
    ::random_device rd; // obtain a random number from hardware
    ::mt19937 g(rd());  // seed the random generator
    ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
    // lookup addresses and put abstract server addresses into rpc_addresses
    for(const auto& id : host_ids) {
        const auto& hostname = hosts.at(id).first;
        const auto& uri = hosts.at(id).second;
        addrs[id] = lookup_endpoint(uri);
        if(!local_host_found && hostname == local_hostname) {
            LOG(DEBUG, "Found local host: {}", hostname);
            CTX->local_host_id(id);
            local_host_found = true;
        LOG(DEBUG, "Found peer: {}", addrs[id].to_string());
    if(!local_host_found) {
        LOG(WARNING, "Failed to find local host. Using host '0' as local host");
        CTX->local_host_id(0);
    CTX->hosts(addrs);
Marc Vef's avatar
Marc Vef committed
/**
 * Looks for a proxy pid file. If it exists, we set address string in preload
 * context.
 */
void
check_for_proxy() {
    auto pid_path = gkfs::env::get_var(gkfs::env::PROXY_PID_FILE,
                                       gkfs::config::proxy::pid_path);
    ifstream ifs(pid_path, ::ifstream::in);
    if(!ifs) {
        LOG(INFO, "Proxy pid file NOT FOUND. Proxy will NOT be used!");
        return;
    }
    /*
     * read two lines in pid file
     * 1. line: process id (used to check for process existence)
     * 2. line: na_sm address to connect to (which will be returned)
     */
    if(ifs) {
        // get PID
        string running_pid;
        if(getline(ifs, running_pid) && !running_pid.empty()) {
            // check if process exists without killing it. Signal 0 doesn't
            // kill
            if(0 != ::kill(::stoi(running_pid), 0)) {
                LOG(WARNING,
                    "Proxy pid file '{}' found but process with pid '{}' was not found. Will NOT use proxy",
                    pid_path, running_pid);
                return;
            }
        } else {
            LOG(WARNING,
                "Proxy pid file '{}' first line is empty. Will NOT use proxy",
                pid_path);
            return;
        }
        // get proxy address
        string proxy_address{};
        if(getline(ifs, proxy_address) && !proxy_address.empty()) {
            CTX->proxy_address_str(proxy_address);
        } else {
            LOG(WARNING,
                "Proxy pid file '{}' second line is empty. Will NOT use proxy",
                pid_path);
            return;
        }
    } else {
        LOG(WARNING,
            "Proxy pid file '{}' was found but cannot be opened. Will NOT use proxy.",
            pid_path);
        return;
    }
    LOG(INFO, "Proxy is enabled and will be used!");
    CTX->use_proxy(true);
}

/**
 * Lookup proxy address via hermes RPC client
 * @throws runtime_error
 */
void
lookup_proxy_addr() {
    auto addr = lookup_endpoint(CTX->proxy_address_str(), true);
    LOG(DEBUG, "Found proxy peer: {}", addr.to_string());
    CTX->proxy_host(addr);
}

} // namespace gkfs::utils