Commit 6a659634 authored by Marc Vef's avatar Marc Vef
Browse files

GKFS_RPC_PROTOCOL is now used instead of cmake arg for rpc protocol

parent 8b523c1e
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -108,7 +108,7 @@ set_property(CACHE RPC_PROTOCOL PROPERTY STRINGS
   "ofi+verbs"
   "ofi+psm2"
)
message(STATUS "[gekkofs] RPC protocol: '${RPC_PROTOCOL}'")
#message(STATUS "[gekkofs] RPC protocol: '${RPC_PROTOCOL}'")

option(USE_SHM "Use shared memory for intra-node communication" OFF)
message(STATUS "[gekkofs] Shared-memory communication: ${USE_SHM}")
+1 −0
Original line number Diff line number Diff line
@@ -185,6 +185,7 @@ can be provided to set the path to the log file, and the log module can be
selected with the `GKFS_LOG_LEVEL={off,critical,err,warn,info,debug,trace}`
environment variable.

GKFS_RPC_PROTOCOL defaults to ofi+sockets, and can be used for other RPC protocols

### Acknowledgment

+5 −1
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@
#define CLIENT_ENV_PREFIX "LIBGKFS_"
#define DAEMON_ENV_PREFIX "GKFS_"


namespace gkfs {
namespace config {

@@ -71,7 +72,10 @@ namespace rocksdb {
constexpr auto use_write_ahead_log = false;
} // namespace rocksdb

} // namespace gkfs
} // namespace config
namespace env {
constexpr auto rpc_protocol = "GKFS_RPC_PROTOCOL";
}
} // namespace gkfs

#endif //GEKKOFS_CONFIG_HPP
+6 −4
Original line number Diff line number Diff line
@@ -19,10 +19,12 @@
#include <client/intercept.hpp>

#include <global/rpc/distributor.hpp>
#include <global/global_defs.hpp>

#include <fstream>

#include <hermes.hpp>
#include <global/env_util.hpp>

extern "C" {
#include <sys/types.h>
@@ -55,8 +57,8 @@ inline void exit_error_msg(int errcode, const string& msg) {
 * @param transport_prefix
 * @return true if succesfully initialized; false otherwise
 */
bool init_hermes_client(const std::string& transport_prefix) {

bool init_hermes_client() {
    auto rpc_protocol = gkfs::env::get_var(gkfs::env::rpc_protocol, gkfs::rpc::protocol::ofi_sockets);
    try {

        hermes::engine_options opts{};
@@ -67,7 +69,7 @@ bool init_hermes_client(const std::string& transport_prefix) {

        ld_network_service =
                std::make_unique<hermes::async_engine>(
                        hermes::get_transport_type(transport_prefix), opts);
                        hermes::get_transport_type(rpc_protocol), opts);
        ld_network_service->run();
    } catch (const std::exception& ex) {
        fmt::print(stderr, "Failed to initialize Hermes RPC client {}\n",
@@ -86,7 +88,7 @@ void init_ld_environment_() {
    // initialize Hermes interface to Mercury
    LOG(INFO, "Initializing RPC subsystem...");

    if (!init_hermes_client(RPC_PROTOCOL)) {
    if (!init_hermes_client()) {
        exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem");
    }

+12 −9
Original line number Diff line number Diff line
@@ -99,7 +99,8 @@ void register_server_rpcs(margo_instance_id mid) {
                   rpc_srv_get_chunk_stat);
}

void init_rpc_server(const string& protocol_port) {
void init_rpc_server(const string& protocol_port, const string& rpc_protocol) {

    hg_addr_t addr_self;
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];
@@ -111,7 +112,7 @@ void init_rpc_server(const string& protocol_port) {
#endif
    hg_options.stats = HG_FALSE;
    hg_options.na_class = nullptr;
    if (gkfs::rpc::protocol::ofi_psm2 == string(RPC_PROTOCOL))
    if (gkfs::rpc::protocol::ofi_psm2 == string(rpc_protocol))
        hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init_opt(protocol_port.c_str(),
@@ -149,7 +150,7 @@ void init_rpc_server(const string& protocol_port) {
    register_server_rpcs(mid);
}

void init_environment() {
void init_environment(const string& rpc_protocol) {
    // Initialize metadata db
    std::string metadata_path = GKFS_DATA->metadir() + "/rocksdb"s;
    GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path);
@@ -176,7 +177,7 @@ void init_environment() {
    GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'",
                                  __func__, GKFS_DATA->bind_addr());
    try {
        init_rpc_server(GKFS_DATA->bind_addr());
        init_rpc_server(GKFS_DATA->bind_addr(), rpc_protocol);
    } catch (const std::exception& e) {
        GKFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what());
        throw;
@@ -276,6 +277,8 @@ void initialize_loggers() {
}

int main(int argc, const char* argv[]) {
    // set rpc protocol
    auto rpc_protocol = gkfs::env::get_var(gkfs::env::rpc_protocol, gkfs::rpc::protocol::ofi_sockets);

    // Parse input
    po::options_description desc("Allowed options");
@@ -307,7 +310,7 @@ int main(int argc, const char* argv[]) {
#else
        cout << "Debug: OFF" << endl;
#endif
        cout << "RPC protocol: " << RPC_PROTOCOL << endl;
        cout << "RPC protocol: " << rpc_protocol << endl;
#if USE_SHM
        cout << "Shared-memory comm: ON" << endl;
#else
@@ -336,7 +339,7 @@ int main(int argc, const char* argv[]) {
    if (vm.count("listen")) {
        addr = vm["listen"].as<string>();
        // ofi+verbs requires an empty addr to bind to the ib interface
        if (RPC_PROTOCOL == string(gkfs::rpc::protocol::ofi_verbs)) {
        if (rpc_protocol == string(gkfs::rpc::protocol::ofi_verbs)) {
            /*
             * FI_VERBS_IFACE : The prefix or the full name of the network interface associated with the verbs device (default: ib)
             * Mercury does not allow to bind to an address when ofi+verbs is used
@@ -346,11 +349,11 @@ int main(int argc, const char* argv[]) {
            addr = ""s;
        }
    } else {
        if (RPC_PROTOCOL != string(gkfs::rpc::protocol::ofi_verbs))
        if (rpc_protocol != string(gkfs::rpc::protocol::ofi_verbs))
            addr = gkfs::rpc::get_my_hostname(true);
    }

    GKFS_DATA->bind_addr(fmt::format("{}://{}", RPC_PROTOCOL, addr));
    GKFS_DATA->bind_addr(fmt::format("{}://{}", rpc_protocol, addr));

    string hosts_file;
    if (vm.count("hosts-file")) {
@@ -387,7 +390,7 @@ int main(int argc, const char* argv[]) {
    }

    try {
        init_environment();
        init_environment(rpc_protocol);
    } catch (const std::exception& e) {
        auto emsg = fmt::format("Failed to initialize environment: {}", e.what());
        GKFS_DATA->spdlogger()->error(emsg);