Commit bd82757d authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'marc/17-remove-old-preload-init-function' into 'master'

Resolve "remove old preload init function"

Closes #17

See merge request !136
parents d87a4df1 5283e1ef
Loading
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -31,6 +31,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
  shutdown ([!110](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/110)).

### Removed
- Removed old initialization code in the GekkoFS
  client ([!136](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/136)).

### Fixed

+66 −76
Original line number Diff line number Diff line
@@ -41,17 +41,12 @@

#include <hermes.hpp>

extern "C" {
#include <sys/types.h>
}

using namespace std;

std::unique_ptr<hermes::async_engine> ld_network_service; // extern variable

namespace {
// make sure that things are only initialized once
pthread_once_t init_env_thread = PTHREAD_ONCE_INIT;

#ifdef GKFS_ENABLE_FORWARDING
pthread_t mapper;
@@ -103,74 +98,6 @@ init_hermes_client() {
    return true;
}

/**
 * This function is only called in the preload constructor and initializes
 * the file system client
 */
void
init_ld_environment_() {

    vector<pair<string, string>> hosts{};
    try {
        LOG(INFO, "Loading peer addresses...");
        hosts = gkfs::utils::read_hosts_file();
    } catch(const std::exception& e) {
        exit_error_msg(EXIT_FAILURE,
                       "Failed to load hosts addresses: "s + e.what());
    }

    // initialize Hermes interface to Mercury
    LOG(INFO, "Initializing RPC subsystem...");

    if(!init_hermes_client()) {
        exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem");
    }

    try {
        gkfs::utils::connect_to_hosts(hosts);
    } catch(const std::exception& e) {
        exit_error_msg(EXIT_FAILURE,
                       "Failed to connect to hosts: "s + e.what());
    }

    /* Setup distributor */
#ifdef GKFS_ENABLE_FORWARDING
    try {
        gkfs::utils::load_forwarding_map();

        LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
    } catch(std::exception& e) {
        exit_error_msg(
                EXIT_FAILURE,
                fmt::format("Unable set the forwarding host '{}'", e.what()));
    }

    auto forwarder_dist = std::make_shared<gkfs::rpc::ForwarderDistributor>(
            CTX->fwd_host_id(), CTX->hosts().size());
    CTX->distributor(forwarder_dist);
#else
#ifdef GKFS_USE_GUIDED_DISTRIBUTION
    auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
            CTX->local_host_id(), CTX->hosts().size());
#else
    auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
            CTX->local_host_id(), CTX->hosts().size());
#endif
    CTX->distributor(distributor);
#endif


    LOG(INFO, "Retrieving file system configuration...");

    if(!gkfs::rpc::forward_get_fs_config()) {
        exit_error_msg(
                EXIT_FAILURE,
                "Unable to fetch file system configurations from daemon process through RPC.");
    }

    LOG(INFO, "Environment initialization successful.");
}

#ifdef GKFS_ENABLE_FORWARDING
void*
forwarding_mapper(void* p) {
@@ -245,9 +172,72 @@ log_prog_name() {

namespace gkfs::preload {

/**
 * This function is only called in the preload constructor and initializes
 * the file system client
 */
void
init_ld_env_if_needed() {
    pthread_once(&init_env_thread, init_ld_environment_);
init_environment() {

    vector<pair<string, string>> hosts{};
    try {
        LOG(INFO, "Loading peer addresses...");
        hosts = gkfs::utils::read_hosts_file();
    } catch(const std::exception& e) {
        exit_error_msg(EXIT_FAILURE,
                       "Failed to load hosts addresses: "s + e.what());
    }

    // initialize Hermes interface to Mercury
    LOG(INFO, "Initializing RPC subsystem...");

    if(!init_hermes_client()) {
        exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem");
    }

    try {
        gkfs::utils::connect_to_hosts(hosts);
    } catch(const std::exception& e) {
        exit_error_msg(EXIT_FAILURE,
                       "Failed to connect to hosts: "s + e.what());
    }

    /* Setup distributor */
#ifdef GKFS_ENABLE_FORWARDING
    try {
        gkfs::utils::load_forwarding_map();

        LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
    } catch(std::exception& e) {
        exit_error_msg(
                EXIT_FAILURE,
                fmt::format("Unable set the forwarding host '{}'", e.what()));
    }

    auto forwarder_dist = std::make_shared<gkfs::rpc::ForwarderDistributor>(
            CTX->fwd_host_id(), CTX->hosts().size());
    CTX->distributor(forwarder_dist);
#else
#ifdef GKFS_USE_GUIDED_DISTRIBUTION
    auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
            CTX->local_host_id(), CTX->hosts().size());
#else
    auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
            CTX->local_host_id(), CTX->hosts().size());
#endif
    CTX->distributor(distributor);
#endif


    LOG(INFO, "Retrieving file system configuration...");

    if(!gkfs::rpc::forward_get_fs_config()) {
        exit_error_msg(
                EXIT_FAILURE,
                "Unable to fetch file system configurations from daemon process through RPC.");
    }

    LOG(INFO, "Environment initialization successful.");
}

} // namespace gkfs::preload
@@ -283,7 +273,7 @@ init_preload() {
    gkfs::path::init_cwd();

    LOG(DEBUG, "Current working directory: '{}'", CTX->cwd());
    gkfs::preload::init_ld_env_if_needed();
    gkfs::preload::init_environment();
    CTX->enable_interception();

    CTX->unprotect_user_fds();