Verified Commit 4955e1d2 authored by Marc Vef's avatar Marc Vef
Browse files

first working proxy version

parent f0bb38c7
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -62,6 +62,7 @@ private:

    // RPC management
    std::string rpc_protocol_;
    std::string proxy_rpc_protocol_;
    std::string bind_addr_;
    std::string bind_proxy_addr_; // optional when used with running proxy.
                                  // Remains empty if unused
@@ -139,6 +140,11 @@ public:
    void
    rpc_protocol(const std::string& rpc_protocol);

    const std::string&
    proxy_rpc_protocol() const;
    void
    proxy_rpc_protocol(const std::string& proxy_rpc_protocol);

    const std::string&
    bind_addr() const;

+7 −0
Original line number Diff line number Diff line
@@ -42,6 +42,8 @@ private:
    margo_instance_id server_ipc_mid_{};
    std::string server_self_addr_{};

    bool use_auto_sm_{false};

    std::map<uint64_t, hg_addr_t> rpc_endpoints_;
    uint64_t local_host_id_;

@@ -91,6 +93,11 @@ public:
    void
    server_self_addr(const std::string& server_self_addr);

    bool
    use_auto_sm() const;
    void
    use_auto_sm(bool use_auto_sm);

    std::map<uint64_t, hg_addr_t>&
    rpc_endpoints();

+4 −2
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@
#define GEKKOFS_PROXY_UTIL_HPP

#include <string>
#include <vector>

namespace gkfs {
namespace util {
@@ -31,8 +32,9 @@ remove_proxy_pid_file();
bool
check_for_hosts_file(const std::string& hostfile);

void
load_hosts(const std::string& hostfile);
std::vector<std::pair<std::string, std::string>> read_hosts_file(const std::string& hostfile);

void connect_to_hosts(const std::vector<std::pair<std::string, std::string>>& hosts);

} // namespace util
} // namespace gkfs
+9 −0
Original line number Diff line number Diff line
@@ -110,6 +110,15 @@ FsData::rpc_protocol(const std::string& rpc_protocol) {
    rpc_protocol_ = rpc_protocol;
}

const std::string&
FsData::proxy_rpc_protocol() const {
    return proxy_rpc_protocol_;
}
void
FsData::proxy_rpc_protocol(const std::string& proxy_rpc_protocol) {
    proxy_rpc_protocol_ = proxy_rpc_protocol;
}

const std::string&
FsData::bind_addr() const {
    return bind_addr_;
+13 −9
Original line number Diff line number Diff line
@@ -198,23 +198,19 @@ register_proxy_server_rpcs(margo_instance_id mid) {


void
init_proxy_rpc_server(const string& protocol_port) {
init_proxy_rpc_server() {
    // TODO currently copy-paste. redundant function. fix.
    hg_addr_t addr_self;
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];
    struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER;
#if USE_PROXY_SHM
    hg_options.auto_sm = HG_TRUE;
#else
    hg_options.auto_sm = HG_FALSE;
#endif
    hg_options.auto_sm = GKFS_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE;
    hg_options.stats = HG_FALSE;
    hg_options.na_class = nullptr;
    if(protocol_port.find(gkfs::rpc::protocol::ofi_psm2) != string::npos)
    if(gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->proxy_rpc_protocol())
        hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init_opt(protocol_port.c_str(), MARGO_SERVER_MODE,
    auto mid = margo_init_opt(GKFS_DATA->bind_proxy_addr().c_str(), MARGO_SERVER_MODE,
                              &hg_options, HG_TRUE,
                              gkfs::config::rpc::proxy_handler_xstreams);
    if(mid == MARGO_INSTANCE_NULL) {
@@ -317,7 +313,7 @@ init_environment() {
                "{}() Initializing proxy RPC server: '{}'", __func__,
                GKFS_DATA->bind_proxy_addr());
        try {
            init_proxy_rpc_server(GKFS_DATA->bind_proxy_addr());
            init_proxy_rpc_server();
        } catch(const std::exception& e) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Failed to initialize proxy RPC server: {}", __func__,
@@ -505,6 +501,13 @@ parse_input(const po::variables_map& vm) {
    string proxy_protocol{};
    if(vm.count("proxy-protocol")) {
        proxy_protocol = vm["proxy-protocol"].as<string>();
        if(proxy_protocol != gkfs::rpc::protocol::ofi_verbs &&
                proxy_protocol != gkfs::rpc::protocol::ofi_sockets &&
                proxy_protocol != gkfs::rpc::protocol::ofi_psm2) {
            throw runtime_error(fmt::format(
                    "Given RPC protocol  for proxy '{}' not supported. Check --help for supported protocols.",
                    proxy_protocol));
        }
        if(vm.count("proxy-listen")) {
            proxy_addr = vm["proxy-listen"].as<string>();
            // ofi+verbs requires an empty proxy_addr to bind to the ib
@@ -524,6 +527,7 @@ parse_input(const po::variables_map& vm) {
            if(proxy_protocol != string(gkfs::rpc::protocol::ofi_verbs))
                proxy_addr = gkfs::rpc::get_my_hostname(true);
        }
        GKFS_DATA->proxy_rpc_protocol(proxy_protocol);
        GKFS_DATA->bind_proxy_addr(
                fmt::format("{}://{}", proxy_protocol, proxy_addr));
    }
Loading