Commit 4c5825c2 authored by Marc Vef's avatar Marc Vef
Browse files

ifs: minor modifications

parent 7c1b261e
Loading
Loading
Loading
Loading
+3 −4
Original line number Diff line number Diff line
@@ -15,16 +15,15 @@ void destroy_enviroment();

bool init_argobots();
void destroy_argobots();
bool init_rpc_server();

bool init_ipc_server();

void register_server_ipcs();
void register_server_rpcs();
bool init_rpc_server();

bool init_rpc_client();

void register_server_ipcs();
void register_server_rpcs();
void register_client_rpcs();


#endif //IFS_ADAFS_DAEMON_HPP
+1 −0
Original line number Diff line number Diff line
@@ -96,6 +96,7 @@ int main(int argc, const char* argv[]) {
        }
        ADAFS_DATA->hosts(hostmap);
        ADAFS_DATA->host_size(hostmap.size());
        ADAFS_DATA->rpc_port(fmt::FormatInt(RPCPORT).str());
    }

    //set all paths
+50 −50
Original line number Diff line number Diff line
@@ -147,8 +147,8 @@ void destroy_argobots() {
    }
}

bool init_rpc_server() {
    auto protocol_port = RPC_PROTOCOL + "://localhost:"s + to_string(RPCPORT);
bool init_ipc_server() {
    auto protocol_port = "na+sm://"s;
    hg_addr_t addr_self;
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];
@@ -157,18 +157,18 @@ bool init_rpc_server() {
    hg_class_t* hg_class;
    hg_context_t* hg_context;

    ADAFS_DATA->spdlogger()->info("Initializing Mercury RPC server ...");
    ADAFS_DATA->spdlogger()->info("Initializing Mercury IPC server ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury RPC server layer");
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury IPC server layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury RPC server context");
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury IPC server context");
        HG_Finalize(hg_class);
        return false;
    }
@@ -176,7 +176,7 @@ bool init_rpc_server() {
    // Figure out what address this server is listening on (must be freed when finished)
    auto hg_ret = HG_Addr_self(hg_class, &addr_self);
    if (hg_ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve server RPC address");
        ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve IPC server address");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
@@ -192,14 +192,14 @@ bool init_rpc_server() {
    }
    HG_Addr_free(hg_class, addr_self);

    ADAFS_DATA->spdlogger()->info("Success. Accepting RPCs on address {}", addr_self_cstring);
    ADAFS_DATA->spdlogger()->info("Success. Accepting IPCs on PID {}", addr_self_cstring);

    /* MARGO PART */
    ADAFS_DATA->spdlogger()->info("Initializing Margo RPC server...");
    ADAFS_DATA->spdlogger()->info("Initializing Margo IPC server...");
    // Start Margo
    auto mid = margo_init(1, 16, hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo RPC server");
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo IPC server");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
@@ -207,18 +207,18 @@ bool init_rpc_server() {
    ADAFS_DATA->spdlogger()->info("Success.");

    // Put context and class into RPC_data object
    RPC_DATA->server_rpc_hg_class(hg_class);
    RPC_DATA->server_rpc_hg_context(hg_context);
    RPC_DATA->server_rpc_mid(mid);
    RPC_DATA->server_ipc_hg_class(hg_class);
    RPC_DATA->server_ipc_hg_context(hg_context);
    RPC_DATA->server_ipc_mid(mid);

    // register RPCs
    register_server_rpcs();
    register_server_ipcs();

    return true;
}

bool init_ipc_server() {
    auto protocol_port = "na+sm://"s;
bool init_rpc_server() {
    auto protocol_port = RPC_PROTOCOL + "://localhost:"s + to_string(RPCPORT);
    hg_addr_t addr_self;
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];
@@ -227,18 +227,18 @@ bool init_ipc_server() {
    hg_class_t* hg_class;
    hg_context_t* hg_context;

    ADAFS_DATA->spdlogger()->info("Initializing Mercury IPC server ...");
    ADAFS_DATA->spdlogger()->info("Initializing Mercury RPC server ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury IPC server layer");
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury RPC server layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury IPC server context");
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury RPC server context");
        HG_Finalize(hg_class);
        return false;
    }
@@ -246,7 +246,7 @@ bool init_ipc_server() {
    // Figure out what address this server is listening on (must be freed when finished)
    auto hg_ret = HG_Addr_self(hg_class, &addr_self);
    if (hg_ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve IPC server address");
        ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve server RPC address");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
@@ -262,14 +262,14 @@ bool init_ipc_server() {
    }
    HG_Addr_free(hg_class, addr_self);

    ADAFS_DATA->spdlogger()->info("Success. Accepting IPCs on PID {}", addr_self_cstring);
    ADAFS_DATA->spdlogger()->info("Success. Accepting RPCs on address {}", addr_self_cstring);

    /* MARGO PART */
    ADAFS_DATA->spdlogger()->info("Initializing Margo IPC server...");
    ADAFS_DATA->spdlogger()->info("Initializing Margo RPC server...");
    // Start Margo
    auto mid = margo_init(1, 16, hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo IPC server");
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo RPC server");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
@@ -277,39 +277,16 @@ bool init_ipc_server() {
    ADAFS_DATA->spdlogger()->info("Success.");

    // Put context and class into RPC_data object
    RPC_DATA->server_ipc_hg_class(hg_class);
    RPC_DATA->server_ipc_hg_context(hg_context);
    RPC_DATA->server_ipc_mid(mid);
    RPC_DATA->server_rpc_hg_class(hg_class);
    RPC_DATA->server_rpc_hg_context(hg_context);
    RPC_DATA->server_rpc_mid(mid);

    // register RPCs
    register_server_ipcs();
    register_server_rpcs();

    return true;
}

/**
 * Register the rpcs for the server. There is no need to store rpc ids for the server
 * @param hg_class
 */
void register_server_rpcs() {
    auto hg_class = RPC_DATA->server_rpc_hg_class();
    MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, rpc_minimal_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_create_node", rpc_create_node_in_t, rpc_res_out_t, rpc_srv_create_node_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_attr", rpc_get_attr_in_t, rpc_get_attr_out_t, rpc_srv_attr_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_remove_node", rpc_remove_node_in_t, rpc_res_out_t, rpc_srv_remove_node_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t, rpc_srv_write_data_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t, rpc_srv_read_data_handler);
}

void register_server_ipcs() {
    auto hg_class = RPC_DATA->server_ipc_hg_class();
    // preload IPCs
    MERCURY_REGISTER(hg_class, "ipc_srv_fs_config", ipc_config_in_t, ipc_config_out_t, ipc_srv_fs_config_handler);
    MERCURY_REGISTER(hg_class, "ipc_srv_open", ipc_open_in_t, ipc_res_out_t, ipc_srv_open_handler);
    MERCURY_REGISTER(hg_class, "ipc_srv_stat", ipc_stat_in_t, ipc_stat_out_t, ipc_srv_stat_handler);
    MERCURY_REGISTER(hg_class, "ipc_srv_unlink", ipc_unlink_in_t, ipc_res_out_t, ipc_srv_unlink_handler);
}

bool init_rpc_client() {
    string protocol_port = RPC_PROTOCOL;
    ADAFS_DATA->spdlogger()->info("Initializing Mercury client ...");
@@ -334,7 +311,7 @@ bool init_rpc_client() {
    /* MARGO PART */
    ADAFS_DATA->spdlogger()->info("Initializing Margo client ...");
    // Start Margo
    auto mid = margo_init(0, 0,
    auto mid = margo_init(1, 16,
                          hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->info("[ERR]: margo_init failed to initialize the Margo client");
@@ -354,6 +331,29 @@ bool init_rpc_client() {
    return true;
}

void register_server_ipcs() {
    auto hg_class = RPC_DATA->server_ipc_hg_class();
    // preload IPCs
    MERCURY_REGISTER(hg_class, "ipc_srv_fs_config", ipc_config_in_t, ipc_config_out_t, ipc_srv_fs_config_handler);
    MERCURY_REGISTER(hg_class, "ipc_srv_open", ipc_open_in_t, ipc_res_out_t, ipc_srv_open_handler);
    MERCURY_REGISTER(hg_class, "ipc_srv_stat", ipc_stat_in_t, ipc_stat_out_t, ipc_srv_stat_handler);
    MERCURY_REGISTER(hg_class, "ipc_srv_unlink", ipc_unlink_in_t, ipc_res_out_t, ipc_srv_unlink_handler);
}

/**
 * Register the rpcs for the server. There is no need to store rpc ids for the server
 * @param hg_class
 */
void register_server_rpcs() {
    auto hg_class = RPC_DATA->server_rpc_hg_class();
    MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, rpc_minimal_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_create_node", rpc_create_node_in_t, rpc_res_out_t, rpc_srv_create_node_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_attr", rpc_get_attr_in_t, rpc_get_attr_out_t, rpc_srv_attr_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_remove_node", rpc_remove_node_in_t, rpc_res_out_t, rpc_srv_remove_node_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t, rpc_srv_write_data_handler);
    MERCURY_REGISTER(hg_class, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t, rpc_srv_read_data_handler);
}

/**
 * Register rpcs for the client and add the rpc id to rpc_data
 * @param hg_class
+2 −1
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ void send_minimal_rpc(const hg_id_t minimal_id) {

    ADAFS_DATA->spdlogger()->debug("Looking up address");

    margo_addr_lookup(RPC_DATA->client_mid(), "cci+tcp://134.93.182.11:1234"s.c_str(), &svr_addr);
    margo_addr_lookup(RPC_DATA->client_mid(), "bmi+tcp://134.93.182.11:1234"s.c_str(), &svr_addr);

    ADAFS_DATA->spdlogger()->debug("minimal RPC is running...");

@@ -82,6 +82,7 @@ int rpc_send_create_node(const size_t recipient, const std::string& path, const
        return 1;
    }
    int send_ret = HG_FALSE;
    ADAFS_DATA->spdlogger()->trace("About to send create_node RPC ...");
    for (int i = 0; i < max_retries; ++i) {
        send_ret = margo_forward_timed(RPC_DATA->client_mid(), handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {