Commit f82dc12c authored by David Auer's avatar David Auer
Browse files

Move daemon to daemon communication to new file

parent 2757b777
Loading
Loading
Loading
Loading
+24 −0
Original line number Diff line number Diff line
/*
  Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  SPDX-License-Identifier: MIT
*/

#ifndef GEKKOFS_CONFIG_MANAGER_HPP
#define GEKKOFS_CONFIG_MANAGER_HPP

namespace gkfs::relocation {

void
test_d2d_rpc();

} // namespace gkfs::relocation

#endif // GEKKOFS_CONFIG_MANAGER_HPP
 No newline at end of file
+0 −5
Original line number Diff line number Diff line
@@ -14,9 +14,6 @@
#ifndef GEKKOFS_DAEMON_UTIL_HPP
#define GEKKOFS_DAEMON_UTIL_HPP

#include <vector>
#include <string>

namespace gkfs::utils {
void
populate_hosts_file();
@@ -24,8 +21,6 @@ populate_hosts_file();
void
destroy_hosts_file();

std::vector<std::pair<std::string, std::string>>
read_hosts_file();
} // namespace gkfs::utils

#endif // GEKKOFS_DAEMON_UTIL_HPP
+2 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ set(DAEMON_SRC
    handler/srv_data.cpp
    handler/srv_metadata.cpp
    handler/srv_management.cpp
    relocation/config_manager.cpp
    )
set(DAEMON_HEADERS
    ../../include/config.hpp
@@ -30,6 +31,7 @@ set(DAEMON_HEADERS
    ../../include/daemon/classes/rpc_data.hpp
    ../../include/daemon/handler/rpc_defs.hpp
    ../../include/daemon/handler/rpc_util.hpp
    ../../include/daemon/relocation/config_manager.hpp
    )
set(DAEMON_LINK_LIBRARIES
    # internal libs
+2 −32
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@
#include <daemon/ops/metadentry.hpp>
#include <daemon/backend/metadata/db.hpp>
#include <daemon/backend/data/chunk_storage.hpp>
#include <daemon/relocation/config_manager.hpp>
#include <daemon/util.hpp>

#ifdef GKFS_ENABLE_AGIOS
@@ -467,37 +468,6 @@ parse_input(const po::variables_map& vm) {
    }
}

void
test_d2d_rpc() {
    auto hosts = gkfs::util::read_hosts_file();

    cout << "number of hosts: " << hosts.size() << "\n";
    // TODO need to pull correct addr_str
    auto mid = margo_init("ofi+sockets", MARGO_CLIENT_MODE, 0, 0);
    assert(mid);


    hg_id_t shuffle_start_id =
            MARGO_REGISTER(mid, gkfs::rpc::tag::shuffle_start, void, void,
                           rpc_srv_shuffle_start);


    for(auto host : hosts) {
        cout << fmt::format("Host {} bla {}\n", host.first, host.second);
        hg_addr_t host_addr;
        auto ret = margo_addr_lookup(mid, host.second.c_str(), &host_addr);
        assert(ret == HG_SUCCESS);

        // let's do this sequential first
        hg_handle_t handle;
        ret = margo_create(mid, host_addr, shuffle_start_id, &handle);
        assert(ret == HG_SUCCESS);

        ret = margo_forward(handle, nullptr); // blocking
        assert(ret == HG_SUCCESS);
    }
}

int
main(int argc, const char* argv[]) {

@@ -585,7 +555,7 @@ main(int argc, const char* argv[]) {

    if(gkfs::config::dynamic_placement && vm.count("shuffle")) {
        cout << "Starting shuffle...\n";
        test_d2d_rpc();
        gkfs::relocation::test_d2d_rpc();
        return 0;
    }

+102 −0
Original line number Diff line number Diff line
/*
  Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  SPDX-License-Identifier: MIT
*/


#include <daemon/relocation/config_manager.hpp>
#include <global/hostsfile_util.hpp>

#include <iostream>

#include <daemon/daemon.hpp>
#include <daemon/handler/rpc_defs.hpp>


// #include <string>
// #include <spdlog/spdlog.h>

// #include <config.hpp>
#include <global/global_defs.hpp>
#include <global/hostsfile_util.hpp>


#include <fmt/format.h>
#include <cassert>

// margo
extern "C" {
// #include <abt.h>
// #include <mercury.h>
#include <margo.h>
}

// #include <daemon/classes/fs_data.hpp>
// #include <daemon/classes/rpc_data.hpp>

using namespace std;

namespace gkfs::relocation {


vector<pair<string, string>>
read_hosts_file() {
    string hostfile = GKFS_DATA->hosts_file();

    vector<pair<string, string>> hosts;
    try {
        hosts = gkfs::util::load_hostfile(hostfile);
    } catch(const exception& e) {
        auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
        throw runtime_error(emsg);
    }

    if(hosts.empty()) {
        throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
    }

    GKFS_DATA->spdlogger()->info("Hosts pool size: {}", hosts.size());

    return hosts;
}

void
test_d2d_rpc() {
    auto hosts = read_hosts_file();

    cout << "number of hosts: " << hosts.size() << "\n";
    auto mid =
            margo_init(GKFS_DATA->bind_addr().c_str(), MARGO_CLIENT_MODE, 0, 0);
    assert(mid);


    hg_id_t shuffle_start_id =
            MARGO_REGISTER(mid, gkfs::rpc::tag::shuffle_start, void, void,
                           rpc_srv_shuffle_start);


    for(auto host : hosts) {
        cout << fmt::format("Host {} bla {}\n", host.first, host.second);
        hg_addr_t host_addr;
        auto ret = margo_addr_lookup(mid, host.second.c_str(), &host_addr);
        assert(ret == HG_SUCCESS);

        // let's do this sequential first
        hg_handle_t handle;
        ret = margo_create(mid, host_addr, shuffle_start_id, &handle);
        assert(ret == HG_SUCCESS);

        ret = margo_forward(handle, nullptr); // blocking
        assert(ret == HG_SUCCESS);
    }
}

} // namespace gkfs::relocation
 No newline at end of file
Loading