Verified Commit 145f53e6 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Remove references to Margo in client code

parent b223f6eb
Loading
Loading
Loading
Loading
+0 −16
Original line number Diff line number Diff line
@@ -22,10 +22,6 @@
#include <iostream>
#include <map>

extern "C" {
#include <margo.h>
}

struct MetadentryUpdateFlags {
    bool atime = false;
    bool mtime = false;
@@ -39,9 +35,6 @@ struct MetadentryUpdateFlags {
    bool path = false;
};

// Margo instances
extern margo_instance_id ld_margo_rpc_id;

// Hermes instance
namespace hermes { class async_engine; }
extern std::unique_ptr<hermes::async_engine> ld_network_service;
@@ -77,13 +70,4 @@ hg_addr_t get_local_addr();
void load_hosts();
bool lookup_all_hosts();

void cleanup_addresses();

hg_return margo_create_wrap_helper(const hg_id_t rpc_id, uint64_t recipient,
                                   hg_handle_t& handle);

hg_return margo_create_wrap(const hg_id_t rpc_id, const std::string&,
                            hg_handle_t& handle);


#endif //IFS_PRELOAD_UTIL_HPP
+14 −125
Original line number Diff line number Diff line
@@ -30,8 +30,8 @@


using namespace std;
//
// thread to initialize the whole margo shazaam only once per process

// make sure that things are only initialized once
static pthread_once_t init_env_thread = PTHREAD_ONCE_INIT;

// RPC IDs
@@ -49,8 +49,6 @@ hg_id_t rpc_read_data_id;
hg_id_t rpc_trunc_data_id;
hg_id_t rpc_get_dirents_id;
hg_id_t rpc_chunk_stat_id;
// Margo instances
margo_instance_id ld_margo_rpc_id;

std::unique_ptr<hermes::async_engine> ld_network_service;

@@ -61,91 +59,13 @@ static inline void exit_error_msg(int errcode, const string& msg) {
}

/**
 * Registers a margo instance with all used RPC
 * Note that the r(pc tags are redundant for rpc
 * @param mid
 * @param mode
 * Initializes the Hermes client for a given transport prefix
 * @param transport_prefix
 * @return true if succesfully initialized; false otherwise
 */
void register_client_rpcs(margo_instance_id mid) {

    rpc_config_id = MARGO_REGISTER(mid,
        hg_tag::fs_config,
        void,
        rpc_config_out_t,
        NULL);

    rpc_mk_node_id = MARGO_REGISTER(mid, hg_tag::create, rpc_mk_node_in_t, rpc_err_out_t, NULL);
    rpc_stat_id = MARGO_REGISTER(mid, hg_tag::stat, rpc_path_only_in_t, rpc_stat_out_t, NULL);
    rpc_rm_node_id = MARGO_REGISTER(mid, hg_tag::remove, rpc_rm_node_in_t,
                                    rpc_err_out_t, NULL);

    rpc_decr_size_id = MARGO_REGISTER(mid,
        hg_tag::decr_size,
        rpc_trunc_in_t,
        rpc_err_out_t,
        NULL);

    rpc_update_metadentry_id = MARGO_REGISTER(mid, hg_tag::update_metadentry, rpc_update_metadentry_in_t,
                                              rpc_err_out_t, NULL);
    rpc_get_metadentry_size_id = MARGO_REGISTER(mid, hg_tag::get_metadentry_size, rpc_path_only_in_t,
                                                rpc_get_metadentry_size_out_t, NULL);
    rpc_update_metadentry_size_id = MARGO_REGISTER(mid, hg_tag::update_metadentry_size,
                                                   rpc_update_metadentry_size_in_t,
                                                   rpc_update_metadentry_size_out_t,
                                                   NULL);

#ifdef HAS_SYMLINKS
    rpc_mk_symlink_id = MARGO_REGISTER(mid,
         hg_tag::mk_symlink,
         rpc_mk_symlink_in_t,
         rpc_err_out_t,
         NULL);
#endif

    rpc_write_data_id = MARGO_REGISTER(mid, hg_tag::write_data, rpc_write_data_in_t, rpc_data_out_t,
                                       NULL);
    rpc_read_data_id = MARGO_REGISTER(mid, hg_tag::read_data, rpc_read_data_in_t, rpc_data_out_t,
                                      NULL);

    rpc_trunc_data_id = MARGO_REGISTER(mid,
         hg_tag::trunc_data,
         rpc_trunc_in_t,
         rpc_err_out_t,
         NULL);

    rpc_get_dirents_id = MARGO_REGISTER(mid, hg_tag::get_dirents, rpc_get_dirents_in_t, rpc_get_dirents_out_t,
                                      NULL);

    rpc_chunk_stat_id = MARGO_REGISTER(mid,
        hg_tag::chunk_stat,
        rpc_chunk_stat_in_t,
        rpc_chunk_stat_out_t,
        NULL);

    fmt::print(stdout, "rpc_config_id: {}\n", rpc_config_id);
    fmt::print(stdout, "rpc_mk_node_id: {}\n", rpc_mk_node_id);
    fmt::print(stdout, "rpc_stat_id: {}\n", rpc_stat_id);
    fmt::print(stdout, "rpc_rm_node_id: {}\n", rpc_rm_node_id);
    fmt::print(stdout, "rpc_decr_size_id: {}\n", rpc_decr_size_id);
    fmt::print(stdout, "rpc_update_metadentry_id: {}\n", rpc_update_metadentry_id);
    fmt::print(stdout, "rpc_get_metadentry_size_id: {}\n", rpc_get_metadentry_size_id);
    fmt::print(stdout, "rpc_update_metadentry_size_id: {}\n", rpc_update_metadentry_size_id);
    fmt::print(stdout, "rpc_mk_symlink_id: {}\n", rpc_mk_symlink_id);
    fmt::print(stdout, "rpc_write_data_id: {}\n", rpc_write_data_id);
    fmt::print(stdout, "rpc_read_data_id: {}\n", rpc_read_data_id);
    fmt::print(stdout, "rpc_trunc_data_id: {}\n", rpc_trunc_data_id);
    fmt::print(stdout, "rpc_get_dirents_id: {}\n", rpc_get_dirents_id);
    fmt::print(stdout, "rpc_chunk_stat_id: {}\n", rpc_chunk_stat_id);

}
bool init_hermes_client(const std::string& transport_prefix) {

/**
 * Initializes the Margo client for a given na_plugin
 * @param mode
 * @param na_plugin
 * @return
 */
bool init_margo_client(const std::string& na_plugin) {
#if 0
    // IMPORTANT: this struct needs to be zeroed before use
    struct hg_init_info hg_options = {};
#if USE_SHM
@@ -155,29 +75,7 @@ bool init_margo_client(const std::string& na_plugin) {
#endif
    hg_options.stats = HG_FALSE;
    hg_options.na_class = nullptr;

    ld_margo_rpc_id = margo_init_opt(na_plugin.c_str(),
                                     MARGO_CLIENT_MODE,
                                     &hg_options,
                                     HG_FALSE,
                                     1);
    if (ld_margo_rpc_id == MARGO_INSTANCE_NULL) {
        CTX->log()->error("{}() margo_init_pool failed to initialize the Margo client", __func__);
        return false;
    }
    register_client_rpcs(ld_margo_rpc_id);
    return true;
}




/**
 * Initializes the Hermes client for a given transport prefix
 * @param transport_prefix
 * @return true if succesfully initialized; false otherwise
 */
bool init_hermes_client(const std::string& transport_prefix) {
#endif

    try {
        ld_network_service = 
@@ -214,15 +112,11 @@ bool init_hermes_client(const std::string& transport_prefix) {


/**
 * This function is only called in the preload constructor and initializes Argobots and Margo clients
 * This function is only called in the preload constructor and initializes 
 * the file system client
 */
void init_ld_environment_() {

    //use rpc_addresses here to avoid "static initialization order problem"
    if (!init_margo_client(RPC_PROTOCOL)) {
        exit_error_msg(EXIT_FAILURE, "Unable to initializa Margo RPC client");
    }

    // initialize Hermes interface to Mercury
    if (!init_hermes_client(RPC_PROTOCOL)) {
        exit_error_msg(EXIT_FAILURE, "Unable to initialize Hermes RPC client");
@@ -305,20 +199,15 @@ void init_preload() {
 * Called last when preload library is used with the LD_PRELOAD environment variable
 */
void destroy_preload() {

    stop_interception();
    CTX->disable_interception();
    if (ld_margo_rpc_id == nullptr) {
        CTX->log()->debug("{}() No services in preload library used. Nothing to shut down.", __func__);
        return;
    }
    cleanup_addresses();

    CTX->clear_hosts();
    CTX->log()->debug("{}() About to finalize the Hermes RPC client", __func__);

    ld_network_service.reset();

    CTX->log()->debug("{}() About to finalize the margo RPC client", __func__);
    // XXX Sometimes this hangs on the cluster. Investigate.
    margo_finalize(ld_margo_rpc_id);
    CTX->log()->debug("{}() Shut down Margo RPC client successful", __func__);
    CTX->log()->debug("{}() Shut down Hermes RPC client successful", __func__);
    CTX->log()->info("All services shut down. Client shutdown complete.");
}
+0 −84
Original line number Diff line number Diff line
@@ -133,28 +133,6 @@ hermes::endpoint lookup_endpoint(const std::string& uri,
                        uri, error_msg));
}

hg_addr_t margo_addr_lookup_retry(const std::string& uri) {
    CTX->log()->debug("{}() Lookink up address '{}'", __func__, uri);
    // try to look up 3 times before erroring out
    hg_return_t ret;
    hg_addr_t remote_addr = HG_ADDR_NULL;
    ::random_device rd; // obtain a random number from hardware
    unsigned int attempts = 0;
    do {
        ret = margo_addr_lookup(ld_margo_rpc_id, uri.c_str(), &remote_addr);
        if (ret == HG_SUCCESS) {
            return remote_addr;
        }
        CTX->log()->warn("{}() Failed to lookup address '{}'. Attempts [{}/3]", __func__, uri, attempts + 1);
        // Wait a random amount of time and try again
        ::mt19937 g(rd()); // seed the random generator
        ::uniform_int_distribution<> distr(50, 50 * (attempts + 2)); // define the range
        ::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
    } while (++attempts < 3);
    throw runtime_error(
            fmt::format("Failed to lookup address '{}', error: {}", uri, HG_Error_to_string(ret)));
}

void load_hosts() {
    string hosts_file;
    try {
@@ -210,9 +188,6 @@ void load_hosts() {
        auto it = std::next(addrs2.begin(), id);
        addrs2.emplace(it, endp);

        auto addr = margo_addr_lookup_retry(uri); // TODO(amiranda) remove
        addrs.at(id) = addr; // TODO(amiranda) remove

        if (!local_host_found && hostname == local_hostname) {
            CTX->log()->debug("{}() Found local host: {}", __func__, hostname);
            CTX->local_host_id(id);
@@ -220,32 +195,6 @@ void load_hosts() {
        }
    }

#if 0
    fmt::print(stdout, " YYY hi!\n");

    std::for_each(
        addrs.begin(), 
        addrs.end(), 
        [](hg_addr_t addr) {
            hg_class_t* hg_class = margo_get_class(ld_margo_rpc_id);
            hg_size_t bsize = 0;
            hg_return ret = HG_Addr_to_string(hg_class, NULL, &bsize, addr);

            const auto buffer = std::make_unique<char[]>(bsize);
            HG_Addr_to_string(hg_class, buffer.get(), &bsize, addr);
            fmt::print(stdout, " XXX {}\n", std::string(buffer.get()));
        }
    );

    std::for_each(
        addrs2.begin(), 
        addrs2.end(), 
        [](const hermes::endpoint& endp) {
            fmt::print(stdout, " ZZZ {}\n", endp.to_string());
        }
    );
#endif

    if (!local_host_found) {
        CTX->log()->warn("{}() Failed to find local host."
                            "Fallback: use host id '0' as local host", __func__);
@@ -257,36 +206,3 @@ void load_hosts() {
#endif
    CTX->hosts2(addrs2);
}

void cleanup_addresses() {
#if 1 //TODO(amiranda) remove
    for (auto& addr: CTX->hosts()) {
        margo_addr_free(ld_margo_rpc_id, addr);
    }
#endif

    CTX->clear_hosts();
}



hg_return
margo_create_wrap_helper(const hg_id_t rpc_id, uint64_t recipient, hg_handle_t& handle) {
    auto ret = margo_create(ld_margo_rpc_id, CTX->hosts().at(recipient), rpc_id, &handle);
    if (ret != HG_SUCCESS) {
        CTX->log()->error("{}() creating handle FAILED", __func__);
        return HG_OTHER_ERROR;
    }
    return ret;
}

/**
 * Wraps certain margo functions to create a Mercury handle
 * @param path
 * @param handle
 * @return
 */
hg_return margo_create_wrap(const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle) {
    auto recipient = CTX->distributor()->locate_file_metadata(path);
    return margo_create_wrap_helper(rpc_id, recipient, handle);
}
+0 −5
Original line number Diff line number Diff line
@@ -25,11 +25,6 @@ namespace rpc_send {

using namespace std;

static inline hg_return_t
margo_forward_timed_wrap(const hg_handle_t& handle, void* in_struct) {
    return margo_forward_timed(handle, in_struct, RPC_TIMEOUT);
}

int mk_node(const std::string& path, const mode_t mode) {

    int err = EUNKNOWN;