Commit 24573c2a authored by Marc Vef's avatar Marc Vef
Browse files

Making AbtIO optional for now, added Margo client, added minimal RPC example

parent d7c14039
Loading
Loading
Loading
Loading
+10 −2
Original line number Diff line number Diff line
@@ -30,8 +30,16 @@ find_package(Mercury REQUIRED)
find_package(MercuryUtil REQUIRED)
find_package(Abt REQUIRED)
find_package(Abt-Snoozer REQUIRED)
find_package(AbtIO REQUIRED)
find_package(Margo REQUIRED)
# Optional dependencies
find_package(AbtIO)
option(WITH_AbtIO "AbtIO enabled" ON)
if (WITH_AbtIO)
    add_definitions(-DUSE_AbtIO=1)
else ()
    add_definitions(-DUSE_AbtIO=0)
endif ()


# boost dependencies, system is required for filesystem #TODO VERSION UNTESTED. I USE 1.62
find_package(Boost 1.58 REQUIRED COMPONENTS system filesystem serialization)
@@ -56,7 +64,7 @@ set(SOURCE_FILES src/main.cpp src/main.hpp src/fuse_ops.hpp src/configure.hpp
        # db src
        src/db/db_ops.cpp src/db/db_txn_ops.cpp src/db/db_util.cpp
        # rpc src
        src/rpc/rpc_util.cpp src/rpc/my_rpc.cpp
        src/rpc/rpc_util.cpp src/rpc/rpcs.cpp
        )
add_executable(adafs ${SOURCE_FILES} src/main.cpp)
target_link_libraries(adafs ${FUSE3_LIBRARIES} ${ROCKSDB_LIBRARIES}
+1 −0
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@ void adafs_ll_init(void* pdata, struct fuse_conn_info* conn) {
    assert(err);
    // Initialize margo server
//    err = init_margo_server();
//    err = init_margo_client();
//    assert(err);

    // Check if fs already has some data and read the inode count
+198 −8
Original line number Diff line number Diff line
@@ -7,6 +7,13 @@

using namespace std;

// TODO I need 1 argobots instance (i.e. ABT_pool to create threads)
// TODO I need 1 hg_class and 1 hg_context instances for the SERVER
// TODO I need 1 hg_class and 1 hg_context instances for the CLIENT
// TODO I must store all client RPC IDs somewhere to be able to send rpcs (i.e., to use margo_forward())
// TODO I need a translation for modulo operation of keys to the actual IPs


bool init_margo_server() {
    auto protocol_port = "cci+tcp://3344"s;

@@ -14,19 +21,19 @@ bool init_margo_server() {
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];

    ADAFS_DATA->spdlogger()->info("Mercury: Initializing Mercury ...");
    ADAFS_DATA->spdlogger()->info("Mercury: Initializing Mercury server ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->info("[ERR]: HG_Init() Failed to init Mercury layer");
        ADAFS_DATA->spdlogger()->info("[ERR]: HG_Init() Failed to init Mercury server layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    auto hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->info("[ERR]: HG_Context_create() Failed to create Mercury context");
        ADAFS_DATA->spdlogger()->info("[ERR]: HG_Context_create() Failed to create Mercury server context");
        HG_Finalize(hg_class);
        return false;
    }
@@ -78,7 +85,7 @@ bool init_margo_server() {
    ADAFS_DATA->spdlogger()->info("Margo successfully initialized");

    // register RPCs
    register_rpcs(hg_class);
    register_server_rpcs(hg_class);

    margo_wait_for_finalize(margo_id);

@@ -95,13 +102,196 @@ bool init_margo_server() {
    return true;
}

void register_rpcs(hg_class_t* hg_class) {
    MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
                     my_rpc_ult_handler);
void register_server_rpcs(hg_class_t* hg_class) {
    MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_ult_handler);
    MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult_handler);
    MERCURY_REGISTER(hg_class, "my_rpc_minimal", my_rpc_minimal_in_t, my_rpc_minimal_out_t, my_rpc_minimal_handler);
}

bool init_margo_client() {
    auto protocol_port = "cci+tcp"s;
    ADAFS_DATA->spdlogger()->info("Mercury: Initializing Mercury client ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_class = HG_Init(protocol_port.c_str(), HG_FALSE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->info("[ERR]: HG_Init() Failed to init Mercury client layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    auto hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->info("[ERR]: HG_Context_create() Failed to create Mercury client context");
        HG_Finalize(hg_class);
        return false;
    }

    ADAFS_DATA->spdlogger()->info("Initializing Argobots ...");
    /* ARGOBOTS PART */

    ABT_xstream xstream;
    ABT_pool pool;
    // We need no arguments to init
    auto argo_ret = ABT_init(0, nullptr);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->info("[ERR]: ABT_init() Failed to init Argobots (client)");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    // Set primary ES to idle without polling XXX (Not sure yet what that does)
    argo_ret = ABT_snoozer_xstream_self_set();
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->info("[ERR]: ABT_snoozer_xstream_self_set()  (client)");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    argo_ret = ABT_xstream_self(&xstream);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->info("[ERR]: ABT_xstream_self()  (client)");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    argo_ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->info("[ERR]: ABT_xstream_get_main_pools()  (client)");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }

    ADAFS_DATA->spdlogger()->info("Initializing Margo ...");
    /* MARGO PART */
    // Start Margo TODO set first two parameters later.
    auto margo_id = margo_init(0, 0, hg_context);
    assert(margo_id);
    ADAFS_DATA->spdlogger()->info("Margo successfully initialized");

    // register RPCs
    auto rpc_id = register_client_rpcs(hg_class);

    // below should be the actual rpc calls
    run_my_rpc(margo_id, hg_class, hg_context, pool, rpc_id);

    margo_finalize(margo_id);

    ABT_finalize();
    HG_Context_destroy(hg_context);
    HG_Finalize(hg_class);


    return true;
}

void run_my_rpc(margo_instance_id& margo_id, hg_class_t* hg_class, hg_context_t* hg_context, ABT_pool& pool,
                hg_id_t rpc_id) {
    hg_addr_t svr_addr = HG_ADDR_NULL;
//    ABT_thread thread;
    hg_handle_t handle;

    my_rpc_in_t in;
    my_rpc_out_t out;

    auto protocol_port = "cci+tcp://localhost:3344"s;
    /* find addr for server */
    auto margo_ret = margo_addr_lookup(margo_id, protocol_port.c_str(), &svr_addr);
    assert(margo_ret == HG_SUCCESS);

    struct run_my_rpc_args args;
    args.mid = margo_id;
    args.svr_addr = svr_addr;
    args.val = 42;
    args.hg_class = hg_class;
    args.hg_context = hg_context;

    ADAFS_DATA->spdlogger()->info("Sending RPC...");

    margo_ret = HG_Create(args.hg_context, args.svr_addr, rpc_id, &handle);
    assert(margo_ret == HG_SUCCESS);

    const struct hg_info* hgi;
    hg_size_t size;
    void* buffer;
    size = 512;
    buffer = calloc(1, 512);
    hgi = HG_Get_info(handle);
    margo_ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, HG_BULK_READ_ONLY, &in.bulk_handle);


    in.input_val = args.val;
    // send rpc
    margo_forward(margo_id, handle, &in);

    // get response
    margo_ret = HG_Get_output(handle, &out);
    assert(margo_ret == HG_SUCCESS);

    ADAFS_DATA->spdlogger()->info("Got RPC response {}", out.ret);


    HG_Free_output(handle, &out);
    HG_Destroy(handle);

    HG_Addr_free(hg_class, args.svr_addr);
    ADAFS_DATA->spdlogger()->info("RPC sending done.");

    return;
}

void run_my_minimal_rpc(margo_instance_id& margo_id, hg_class_t* hg_class, hg_context_t* hg_context, hg_id_t rpc_id) {
    hg_addr_t svr_addr = HG_ADDR_NULL;
    hg_handle_t handle;

    my_rpc_minimal_in_t in;
    my_rpc_minimal_out_t out;

    auto protocol_port = "cci+tcp://localhost:3344"s;
    /* find addr for server */
    auto margo_ret = margo_addr_lookup(margo_id, protocol_port.c_str(), &svr_addr);
    assert(margo_ret == HG_SUCCESS);

    ADAFS_DATA->spdlogger()->info("Sending RPC...");

    margo_ret = HG_Create(hg_context, svr_addr, rpc_id, &handle);
    assert(margo_ret == HG_SUCCESS);


    in.input = 42;
    // send rpc
    margo_forward(margo_id, handle, &in);

    // get response
    margo_ret = HG_Get_output(handle, &out);
    assert(margo_ret == HG_SUCCESS);

    ADAFS_DATA->spdlogger()->info("Got RPC response {}", out.output);


    HG_Free_output(handle, &out);
    HG_Destroy(handle);

    HG_Addr_free(hg_class, svr_addr);
    ADAFS_DATA->spdlogger()->info("RPC sending done.");

    return;
}


hg_id_t register_client_rpcs(hg_class_t* hg_class) {
    // TODO just for testing. all rpc_ids need to be stored somewhere.
    hg_id_t rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
                                      nullptr);
    MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
                     my_rpc_shutdown_ult_handler);
                     nullptr);
    MERCURY_REGISTER(hg_class, "my_rpc_minimal", my_rpc_minimal_in_t, my_rpc_minimal_out_t,
                     nullptr);
    return rpc_id;
}


bool destroy_margo_server(hg_class_t* hg_class, hg_context_t* hg_context) {

    HG_Context_destroy(hg_context);
+18 −1
Original line number Diff line number Diff line
@@ -7,9 +7,26 @@

#include "../main.hpp"

struct run_my_rpc_args {
    int val;
    margo_instance_id mid;
    hg_context_t* hg_context;
    hg_class_t* hg_class;
    hg_addr_t svr_addr;
};


bool init_margo_server();

void register_rpcs(hg_class_t* hg_class);
void register_server_rpcs(hg_class_t* hg_class);

bool init_margo_client();

void run_my_rpc(margo_instance_id& margo_id, hg_class_t* hg_class, hg_context_t* hg_context, ABT_pool& pool,
                hg_id_t rpc_id);

void run_my_minimal_rpc(margo_instance_id& margo_id, hg_class_t* hg_class, hg_context_t* hg_context, hg_id_t rpc_id);

hg_id_t register_client_rpcs(hg_class_t* hg_class);

#endif //LFS_INIT_HPP
+28 −3
Original line number Diff line number Diff line
@@ -47,12 +47,12 @@ static void my_rpc_ult(hg_handle_t handle) {
    // do bulk transfer from client to server
    ret = margo_bulk_transfer(margo_id, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, size);
    assert(ret == 0);
#if 1
#ifdef USE_AbtIO
    int fd;
    char filename[256];
#endif
    /* write to a file; would be done with abt-io if we enabled it */
#if 1
#ifdef USE_AbtIO
    auto aid = abt_io_init(0);
    sprintf(filename, "/tmp/margo-%d.txt", in.input_val);
    fd = abt_io_open(aid, filename, O_WRONLY | O_CREAT, S_IWUSR | S_IRUSR);
@@ -104,3 +104,28 @@ static void my_rpc_shutdown_ult(hg_handle_t handle) {
}

DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)

static void my_rpc_minimal(hg_handle_t handle) {
    my_rpc_minimal_in_t in;
    my_rpc_minimal_out_t out;
    // Get input
    auto ret = HG_Get_input(handle, &in);
    assert(ret == HG_SUCCESS);

    ADAFS_DATA->spdlogger()->info("Got simple RPC with input {}", in.input);
    // Get hg_info handle
    auto hgi = HG_Get_info(handle);
    // extract margo id from hg_info (needed to know where to send response)
    auto mid = margo_hg_class_to_instance(hgi->hg_class);

    // Create output and send it
    out.output = in.input * 2;
    ADAFS_DATA->spdlogger()->info("Sending output {}", out.output);
    auto hret = margo_respond(mid, handle, &out);
    assert(hret == HG_SUCCESS);
    // Destroy handle when finished
    HG_Destroy(handle);

}

DEFINE_MARGO_RPC_HANDLER(my_rpc_minimal)
 No newline at end of file
Loading