Program Listing for File preload.cpp

Return to documentation for file (src/client/preload.cpp)

/*
  Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This software was partially supported by the
  the European Union’s Horizon 2020 JTI-EuroHPC research and
  innovation programme, by the project ADMIRE (Project ID: 956748,
  admire-eurohpc.eu)

  This project was partially promoted by the Ministry for Digital Transformation
  and the Civil Service, within the framework of the Recovery,
  Transformation and Resilience Plan - Funded by the European Union
  -NextGenerationEU.

  This file is part of GekkoFS' POSIX interface.

  GekkoFS' POSIX interface is free software: you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public License as
  published by the Free Software Foundation, either version 3 of the License,
  or (at your option) any later version.

  GekkoFS' POSIX interface is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public License
  along with GekkoFS' POSIX interface.  If not, see
  <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: LGPL-3.0-or-later
*/

#include <client/preload.hpp>
#include <client/path.hpp>
#include <client/logging.hpp>
#include <client/rpc/forward_management.hpp>
#include <client/preload_util.hpp>
#include <client/intercept.hpp>
#include <client/cache.hpp>

#include <common/rpc/distributor.hpp>
#include <common/common_defs.hpp>
#ifdef GKFS_ENABLE_CLIENT_METRICS
#include <common/msgpack_util.hpp>
#endif

#include <ctime>
#include <cstdlib>
#include <fstream>

#include <hermes.hpp>


using namespace std;

std::unique_ptr<hermes::async_engine> ld_network_service; // extern variable
std::unique_ptr<hermes::async_engine> ld_proxy_service;   // extern variable

namespace {

// FORWARDING
pthread_t mapper;
bool forwarding_running;

pthread_mutex_t remap_mutex;
pthread_cond_t remap_signal;
// END FORWARDING

inline void
exit_error_msg(int errcode, const string& msg) {

    LOG_ERROR("{}", msg);
    gkfs::log::logger::log_message(stderr, "{}\n", msg);

    // if we don't disable interception before calling ::exit()
    // syscall hooks may find an inconsistent in shared state
    // (e.g. the logger) and thus, crash
    if(CTX->interception_enabled()) {
        gkfs::preload::stop_interception();
        CTX->disable_interception();
    }
    ::exit(errcode);
}

bool
init_hermes_client() {

    try {

        hermes::engine_options opts{};

        if(CTX->auto_sm())
            opts |= hermes::use_auto_sm;
        if(gkfs::rpc::protocol::ofi_psm2 == CTX->rpc_protocol()) {
            opts |= hermes::force_no_block_progress;
        }

        opts |= hermes::process_may_fork;

        ld_network_service = std::make_unique<hermes::async_engine>(
                hermes::get_transport_type(CTX->rpc_protocol()), opts);
        ld_network_service->run();
    } catch(const std::exception& ex) {
        fmt::print(stderr, "Failed to initialize Hermes RPC client {}\n",
                   ex.what());
        return false;
    }
    if(CTX->use_proxy()) {
        try {
            LOG(INFO, "Initializing IPC proxy subsystem...");
            hermes::engine_options opts{};
            ld_proxy_service = std::make_unique<hermes::async_engine>(
                    hermes::get_transport_type("na+sm"), opts, "", false, 1);
            ld_proxy_service->run();
        } catch(const std::exception& ex) {
            fmt::print(stderr,
                       "Failed to initialize Hermes IPC client for proxy {}\n",
                       ex.what());
            return false;
        }
    }

    return true;
}

void*
forwarding_mapper(void* p) {
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME, &timeout);
    timeout.tv_sec += 10; // 10 seconds

    int previous = -1;

    while(forwarding_running) {
        try {
            gkfs::utils::load_forwarding_map();

            if(previous != (int64_t) CTX->fwd_host_id()) {
                LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());

                previous = CTX->fwd_host_id();
            }
        } catch(std::exception& e) {
            exit_error_msg(EXIT_FAILURE,
                           fmt::format("Unable set the forwarding host '{}'",
                                       e.what()));
        }

        pthread_mutex_lock(&remap_mutex);
        pthread_cond_timedwait(&remap_signal, &remap_mutex, &timeout);
        pthread_mutex_unlock(&remap_mutex);
    }

    return nullptr;
}

void
init_forwarding_mapper() {
    forwarding_running = true;

    pthread_create(&mapper, NULL, forwarding_mapper, NULL);
}

void
destroy_forwarding_mapper() {
    forwarding_running = false;

    pthread_cond_signal(&remap_signal);

    pthread_join(mapper, NULL);
}

void
log_prog_name() {
    std::string line;
    std::ifstream cmdline("/proc/self/cmdline");
    if(!cmdline.is_open()) {
        LOG(ERROR, "Unable to open cmdline file");
        throw std::runtime_error("Unable to open cmdline file");
    }
    if(!getline(cmdline, line)) {
        throw std::runtime_error("Unable to read cmdline file");
    }
    std::replace(line.begin(), line.end(), '\0', ' ');
    line.erase(line.length() - 1, line.length());
    LOG(INFO, "Process cmdline: '{}'", line);
    cmdline.close();
}

} // namespace

namespace gkfs::preload {

void
init_environment() {

    vector<pair<string, string>> hosts{};
    try {
        LOG(INFO, "Loading peer addresses...");
        hosts = gkfs::utils::read_hosts_file();
    } catch(const std::exception& e) {
        exit_error_msg(EXIT_FAILURE,
                       "Failed to load hosts addresses: "s + e.what());
    }

    LOG(INFO, "Checking for GKFS Proxy");
    gkfs::utils::check_for_proxy();

    // initialize Hermes interface to Mercury
    LOG(INFO, "Initializing RPC subsystem...");

    if(!init_hermes_client()) {
        exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem");
    }

    try {
        gkfs::utils::connect_to_hosts(hosts);
        if(CTX->use_proxy()) {
            LOG(INFO, "Connecting to proxy...");
            gkfs::utils::lookup_proxy_addr();
        }
    } catch(const std::exception& e) {
        exit_error_msg(EXIT_FAILURE,
                       "Failed to connect to hosts: "s + e.what());
    }

    /* Setup distributor */
    auto forwarding_map_file = gkfs::env::get_var(
            gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);

    if(!forwarding_map_file.empty()) {
        try {
            gkfs::utils::load_forwarding_map();

            LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
        } catch(std::exception& e) {
            exit_error_msg(EXIT_FAILURE,
                           fmt::format("Unable set the forwarding host '{}'",
                                       e.what()));
        }

        auto forwarder_dist = std::make_shared<gkfs::rpc::ForwarderDistributor>(
                CTX->fwd_host_id(), CTX->hosts().size());
        CTX->distributor(forwarder_dist);
    } else {

#ifdef GKFS_USE_GUIDED_DISTRIBUTION
        auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
                CTX->local_host_id(), CTX->hosts().size());
#else
        auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
                CTX->local_host_id(), CTX->hosts().size());
#endif
        CTX->distributor(distributor);
    }

    auto use_dcache = gkfs::env::get_var(gkfs::env::cache::DENTRY,
                                         gkfs::config::cache::use_dentry_cache
                                                 ? "ON"
                                                 : "OFF") == "ON";
    if(use_dcache) {
        try {
            LOG(INFO, "Initializing dentry caching...");
            auto dentry_cache =
                    std::make_shared<gkfs::cache::dir::DentryCache>();
            CTX->dentry_cache(dentry_cache);
            LOG(INFO, "dentry caching enabled.");
            CTX->use_dentry_cache(true);
        } catch(const std::exception& e) {
            exit_error_msg(EXIT_FAILURE,
                           "Failed to initialize dentry cache: "s + e.what());
        }
    } else {
        if(gkfs::env::var_is_set(gkfs::env::cache::DENTRY)) {
            LOG(INFO, "Dentry cache is disabled by environment variable.");
        } else {
            LOG(INFO, "Dentry cache is disabled by configuration.");
        }
    }

    auto use_write_size_cache =
            gkfs::env::get_var(gkfs::env::cache::WRITE_SIZE,
                               gkfs::config::cache::use_write_size_cache
                                       ? "ON"
                                       : "OFF") == "ON";
    if(use_write_size_cache) {
        try {
            LOG(INFO, "Initializing write size cache...");
            auto write_size_cache =
                    std::make_shared<gkfs::cache::file::WriteSizeCache>();
            CTX->write_size_cache(write_size_cache);
            CTX->write_size_cache()->flush_threshold(gkfs::env::get_var(
                    gkfs::env::cache::WRITE_SIZE_THRESHOLD,
                    gkfs::config::cache::write_size_flush_threshold));
            CTX->use_write_size_cache(true);
            if(CTX->write_size_cache()->flush_threshold() == 0) {
                LOG(WARNING,
                    "Write size cache is enabled but flush threshold is set to 0. Cache is disabled as a result.");
                CTX->use_write_size_cache(false);
            } else {
                LOG(INFO, "Write size cache enabled. Flushing at '{}' writes",
                    CTX->write_size_cache()->flush_threshold());
            }
        } catch(const std::exception& e) {
            exit_error_msg(EXIT_FAILURE,
                           "Failed to initialize write size cache: "s +
                                   e.what());
        }
    } else {
        if(gkfs::env::var_is_set(gkfs::env::cache::WRITE_SIZE)) {
            LOG(INFO, "Write size cache is disabled by environment variable.");
        } else {
            LOG(INFO, "Write size cache is disabled by configuration.");
        }
    }

    LOG(INFO, "Retrieving file system configuration...");

    if(!gkfs::rpc::forward_get_fs_config()) {
        exit_error_msg(
                EXIT_FAILURE,
                "Unable to fetch file system configurations from daemon process through RPC.");
    }
    // Initialize random number generator and seed for replica selection
    // in case of failure, a new replica will be selected
    if(CTX->get_replicas() > 0) {
        srand(time(nullptr));
    }

    LOG(INFO, "Environment initialization successful.");
}

} // namespace gkfs::preload


std::atomic<bool> init{false};

void
init_preload() {
    // The original errno value will be restored after initialization to not
    // leak internal error codes
    auto oerrno = errno;
    if(!init) {
        init = true;
        pthread_atfork(&at_fork_syscall, &at_parent_syscall, &at_child_syscall);
    }
    CTX->enable_interception();
    gkfs::preload::start_self_interception();

    CTX->init_logging();
    // from here ownwards it is safe to print messages
    LOG(DEBUG, "Logging subsystem initialized");

    // Kernel modules such as ib_uverbs may create fds in kernel space and pass
    // them to user-space processes using ioctl()-like interfaces. if this
    // happens during our internal initialization, there's no way for us to
    // control this creation and the fd will be created in the
    // [0, MAX_USER_FDS) range rather than in our private
    // [MAX_USER_FDS, GKFS_MAX_OPEN_FDS) range.
    // with MAX_USER_FDS = GKFS_MAX_OPEN_FDS - GKFS_MAX_INTERNAL_FDS
    // To prevent this for our internal
    // initialization code, we forcefully occupy the user fd range to force
    // such modules to create fds in our private range.
    CTX->protect_user_fds();

    log_prog_name();
    gkfs::path::init_cwd();

    LOG(DEBUG, "Current working directory: '{}'", CTX->cwd());
    LOG(DEBUG, "Number of replicas : '{}'", CTX->get_replicas());
    gkfs::preload::init_environment();
    CTX->enable_interception();

    CTX->unprotect_user_fds();

    auto forwarding_map_file = gkfs::env::get_var(
            gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
    if(!forwarding_map_file.empty()) {
        init_forwarding_mapper();
    }

    gkfs::preload::start_interception();
    errno = oerrno;
    if(!CTX->init_metrics()) {
        exit_error_msg(EXIT_FAILURE,
                       "Unable to initialize client metrics. Exiting...");
    }
}

void
destroy_preload() {
    auto forwarding_map_file = gkfs::env::get_var(
            gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
    if(!forwarding_map_file.empty()) {
        destroy_forwarding_mapper();
    }
#ifdef GKFS_ENABLE_CLIENT_METRICS
    LOG(INFO, "Flushing final metrics...");
    CTX->write_metrics()->flush_msgpack();
    CTX->read_metrics()->flush_msgpack();
    LOG(INFO, "Metrics flushed. Total flush operations: {}",
        CTX->write_metrics()->flush_count());
    CTX->destroy_metrics();
#endif
    CTX->clear_hosts();
    LOG(DEBUG, "Peer information deleted");

    if(CTX->use_proxy()) {
        CTX->clear_proxy_host();
        LOG(DEBUG, "Shutting down IPC subsystem");
        ld_proxy_service.reset();
    }
    LOG(DEBUG, "Shutting down RPC subsystem");
    ld_network_service.reset();
    LOG(DEBUG, "RPC subsystem shut down");

    if(CTX->interception_enabled()) {
        gkfs::preload::stop_interception();
        CTX->disable_interception();
        LOG(DEBUG, "Syscall interception stopped");
    }

    LOG(INFO, "All subsystems shut down. Client shutdown complete.");
}


extern "C" int
gkfs_init() {
    CTX->init_logging();

    // from here ownwards it is safe to print messages
    LOG(DEBUG, "Logging subsystem initialized");

    gkfs::preload::init_environment();

    return 0;
}


extern "C" int
gkfs_end() {
    CTX->clear_hosts();
    LOG(DEBUG, "Peer information deleted");

    ld_network_service.reset();
    LOG(DEBUG, "RPC subsystem shut down");

    LOG(INFO, "All subsystems shut down. Client shutdown complete.");

    return 0;
}

void
at_fork_syscall() {
    destroy_preload();
}
void
at_parent_syscall() {
    init_preload();
}

void
at_child_syscall() {
    init_preload();
}