Commit d0170dc9 authored by Marc Vef's avatar Marc Vef
Browse files

LD_Preload migrated to margo 0.2

parent 4413059e
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -81,6 +81,7 @@ struct MetadentryUpdateFlags {
// fs_config is set ONCE in the beginning. It shall not be modified afterwards
extern shared_ptr<struct FsConfig> fs_config;

// TODO get rid of that horrible, disgusting logging X.x
extern FILE* debug_fd;

#define LD_LOG_DEBUG(fd, fmt, ...) \
+14 −14
Original line number Diff line number Diff line
@@ -23,11 +23,11 @@ int rpc_send_read(const hg_id_t ipc_read_data_id, const hg_id_t rpc_read_data_id
                  const size_t in_size, const off_t in_offset, T* tar_buf, size_t& read_size) {
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    bool local_op = true;
    rpc_read_data_in_t in;
    rpc_data_out_t out;
    rpc_read_data_in_t in{};
    rpc_data_out_t out{};
    int err;
    hg_return_t ret;
    margo_instance_id used_mid;
    // fill in
    in.path = path.c_str();
    in.size = in_size;
@@ -35,26 +35,26 @@ int rpc_send_read(const hg_id_t ipc_read_data_id, const hg_id_t rpc_read_data_id

    auto recipient = get_rpc_node(path);
    if (is_local_op(recipient)) { // local
        ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_read_data_id, &handle);
        ret = margo_create(ld_margo_ipc_id(), daemon_addr(), ipc_read_data_id, &handle);
        LD_LOG_TRACE0(debug_fd, "rpc_send_read to local daemon (IPC)\n");
        used_mid = ld_margo_ipc_id();
    } else { // remote
        local_op = false;
        // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
        if (!get_addr_by_hostid(recipient, svr_addr)) {
            LD_LOG_ERROR(debug_fd, "server address not resolvable for host id %lu\n", recipient);
            return 1;
        }
        ret = HG_Create(margo_get_context(ld_margo_rpc_id()), svr_addr, rpc_read_data_id, &handle);
        ret = margo_create(ld_margo_rpc_id(), svr_addr, rpc_read_data_id, &handle);
        LD_LOG_TRACE0(debug_fd, "rpc_send_read to remote daemon (RPC)\n");
        used_mid = ld_margo_rpc_id();
    }
    if (ret != HG_SUCCESS) {
        LD_LOG_ERROR0(debug_fd, "creating handle FAILED\n");
        return 1;
    }
    auto hgi = HG_Get_info(handle);
    /* register local target buffer for bulk access */
    auto b_buf = static_cast<void*>(tar_buf);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    ret = margo_bulk_create(used_mid, 1, &b_buf, &in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if (ret != 0)
        LD_LOG_ERROR0(debug_fd, "failed to create bulk on client\n");

@@ -67,14 +67,14 @@ int rpc_send_read(const hg_id_t ipc_read_data_id, const hg_id_t rpc_read_data_id
    }
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        ret = HG_Get_output(handle,
        ret = margo_get_output(handle,
                               &out); // XXX handle ret out.res can inidicate a failure with reading on the other side.
        tar_buf = static_cast<T*>(b_buf);
        read_size = static_cast<size_t>(out.io_size);
        err = out.res;
        LD_LOG_TRACE(debug_fd, "Got response %d\n", out.res);
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
        margo_free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "RPC rpc_send_read (timed out)");
        err = EAGAIN;
@@ -82,9 +82,9 @@ int rpc_send_read(const hg_id_t ipc_read_data_id, const hg_id_t rpc_read_data_id

    in.path = nullptr;

    HG_Bulk_free(in.bulk_handle);
    HG_Free_input(handle, &in);
    HG_Destroy(handle);
    margo_bulk_free(in.bulk_handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);

    return err;
}
+40 −40
Original line number Diff line number Diff line
@@ -9,13 +9,13 @@
void send_minimal_ipc(const hg_id_t minimal_id) {

    hg_handle_t handle;
    rpc_minimal_in_t in;
    rpc_minimal_out_t out;
    rpc_minimal_in_t in{};
    rpc_minimal_out_t out{};

    printf("minimal RPC is running...\n");

    /* create handle */
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), minimal_id, &handle);
    auto ret = margo_create(ld_margo_ipc_id(), daemon_addr(), minimal_id, &handle);
    assert(ret == HG_SUCCESS);

    /* Send rpc. Note that we are also transmitting the bulk handle in the
@@ -23,36 +23,36 @@ void send_minimal_ipc(const hg_id_t minimal_id) {
     */
    in.input = 42;
    printf("About to send RPC\n");
    margo_forward(ld_margo_ipc_id(), handle, &in);
    margo_forward(handle, &in);
    printf("Waiting for response\n");
    /* decode response */
    ret = HG_Get_output(handle, &out);
    ret = margo_get_output(handle, &out);
    assert(ret == HG_SUCCESS);

    printf("Got response ret: %d\n", out.output);

    /* clean up resources consumed by this rpc */
    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    margo_free_output(handle, &out);
    margo_destroy(handle);

    printf("minimal RPC is done.\n");
}

bool ipc_send_get_fs_config(const hg_id_t ipc_get_config_id) {
    hg_handle_t handle;
    ipc_config_in_t in;
    ipc_config_out_t out;
    ipc_config_in_t in{};
    ipc_config_out_t out{};
    // fill in
    in.dummy = 0; // XXX should be removed. havent checked yet how empty input with margo works
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_get_config_id, &handle);
    auto ret = margo_create(ld_margo_ipc_id(), daemon_addr(), ipc_get_config_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "About to send get config IPC to daemon\n");
    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(ld_margo_ipc_id(), handle, &in, RPC_TIMEOUT);
        send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
@@ -60,7 +60,7 @@ bool ipc_send_get_fs_config(const hg_id_t ipc_get_config_id) {
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        LD_LOG_DEBUG0(debug_fd, "Waiting for response\n");
        ret = HG_Get_output(handle, &out);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            fs_config->mountdir = out.mountdir;
            fs_config->rootdir = out.rootdir;
@@ -96,26 +96,26 @@ bool ipc_send_get_fs_config(const hg_id_t ipc_get_config_id) {
        out.rootdir = nullptr;
        out.mountdir = nullptr;
        out.hosts_raw = nullptr;
        HG_Free_output(handle, &out);
        margo_free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "IPC send_get_config (timed out)\n");
    }

    HG_Free_input(handle, &in);
    HG_Destroy(handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);
    return ret == HG_SUCCESS;
}

int ipc_send_open(const string& path, int flags, const mode_t mode, const hg_id_t ipc_open_id) {
    hg_handle_t handle;
    ipc_open_in_t in;
    ipc_err_out_t out;
    ipc_open_in_t in{};
    ipc_err_out_t out{};
    // fill in
    in.mode = mode;
    in.flags = flags;
    in.path = path.c_str();
    int err = EUNKNOWN;
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_open_id, &handle);
    auto ret = margo_create(ld_margo_ipc_id(), daemon_addr(), ipc_open_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -123,7 +123,7 @@ int ipc_send_open(const string& path, int flags, const mode_t mode, const hg_id_
    LD_LOG_DEBUG0(debug_fd, "About to send open IPC to daemon\n");
    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(ld_margo_ipc_id(), handle, &in, RPC_TIMEOUT);
        send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
@@ -131,31 +131,31 @@ int ipc_send_open(const string& path, int flags, const mode_t mode, const hg_id_
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        LD_LOG_DEBUG0(debug_fd, "Waiting for response\n");
        ret = HG_Get_output(handle, &out);
        ret = margo_get_output(handle, &out);

        LD_LOG_DEBUG(debug_fd, "Got response success: %d\n", out.err);
        err = out.err;
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
        margo_free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "IPC send_open (timed out)\n");
    }

    in.path = nullptr; // XXX temporary. If this is not done free input crashes because of invalid pointer?!

    HG_Free_input(handle, &in);
    HG_Destroy(handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);
    return err;
}

int ipc_send_stat(const string& path, string& attr, const hg_id_t ipc_stat_id) {
    hg_handle_t handle;
    ipc_stat_in_t in;
    ipc_stat_out_t out;
    ipc_stat_in_t in{};
    ipc_stat_out_t out{};
    // fill in
    in.path = path.c_str();
    int err;
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_stat_id, &handle);
    auto ret = margo_create(ld_margo_ipc_id(), daemon_addr(), ipc_stat_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -163,7 +163,7 @@ int ipc_send_stat(const string& path, string& attr, const hg_id_t ipc_stat_id) {
    LD_LOG_DEBUG0(debug_fd, "About to send stat IPC to daemon\n");
    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(ld_margo_ipc_id(), handle, &in, RPC_TIMEOUT);
        send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
@@ -171,7 +171,7 @@ int ipc_send_stat(const string& path, string& attr, const hg_id_t ipc_stat_id) {
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        LD_LOG_DEBUG0(debug_fd, "Waiting for response\n");
        ret = HG_Get_output(handle, &out);
        ret = margo_get_output(handle, &out);

        LD_LOG_DEBUG(debug_fd, "Got response success: %d\n", out.err);
        err = out.err;
@@ -180,7 +180,7 @@ int ipc_send_stat(const string& path, string& attr, const hg_id_t ipc_stat_id) {
        }
        /* clean up resources consumed by this rpc */
        out.db_val = nullptr;
        HG_Free_output(handle, &out);
        margo_free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "IPC send_stat (timed out)\n");
        err = 1;
@@ -188,19 +188,19 @@ int ipc_send_stat(const string& path, string& attr, const hg_id_t ipc_stat_id) {

    in.path = nullptr; // XXX temporary. If this is not done free input crashes because of invalid pointer?!

    HG_Free_input(handle, &in);
    HG_Destroy(handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);
    return err;
}

int ipc_send_unlink(const string& path, const hg_id_t ipc_unlink_id) {
    hg_handle_t handle;
    ipc_unlink_in_t in;
    ipc_err_out_t out;
    ipc_unlink_in_t in{};
    ipc_err_out_t out{};
    // fill in
    in.path = path.c_str();
    int err = EUNKNOWN;
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_unlink_id, &handle);
    auto ret = margo_create(ld_margo_ipc_id(), daemon_addr(), ipc_unlink_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -208,7 +208,7 @@ int ipc_send_unlink(const string& path, const hg_id_t ipc_unlink_id) {
    LD_LOG_DEBUG0(debug_fd, "About to send unlink IPC to daemon\n");
    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(ld_margo_ipc_id(), handle, &in, RPC_TIMEOUT);
        send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
@@ -216,19 +216,19 @@ int ipc_send_unlink(const string& path, const hg_id_t ipc_unlink_id) {
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        LD_LOG_DEBUG0(debug_fd, "Waiting for response\n");
        ret = HG_Get_output(handle, &out);
        ret = margo_get_output(handle, &out);

        LD_LOG_DEBUG(debug_fd, "Got response success: %d\n", out.err);
        err = out.err;
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
        margo_free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "IPC send_unlink (timed out)\n");
    }

    in.path = nullptr; // XXX temporary. If this is not done free input crashes because of invalid pointer?!

    HG_Free_input(handle, &in);
    HG_Destroy(handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);
    return err;
}
 No newline at end of file
+49 −114
Original line number Diff line number Diff line
@@ -124,7 +124,7 @@ int open(const char* path, int flags, ...) {
                    err = rpc_send_create_node(rpc_create_node_id, recipient, path,
                                               mode);
                }
            } else { // single node operation
            } else { // single node operationHG_Destroy
                err = ipc_send_open(path, flags, mode, ipc_open_id);
            }
        } else {
@@ -460,29 +460,6 @@ int dup2(int oldfd, int newfd) __THROW {
    return (reinterpret_cast<decltype(&dup2)>(libc_dup2))(oldfd, newfd);
}

/**
 * Initializes the Argobots environment
 * @return
 */
bool init_ld_argobots() {
    LD_LOG_DEBUG0(debug_fd, "Initializing Argobots ...\n");

    // We need no arguments to init
    auto argo_err = ABT_init(0, nullptr);
    if (argo_err != 0) {
        LD_LOG_DEBUG0(debug_fd, "ABT_init() Failed to init Argobots (client)\n");
        return false;
    }
    // Set primary execution stream to idle without polling. Normally xstreams cannot sleep. This is what ABT_snoozer does
    argo_err = ABT_snoozer_xstream_self_set();
    if (argo_err != 0) {
        LD_LOG_DEBUG0(debug_fd, "ABT_snoozer_xstream_self_set()  (client)\n");
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");
    return true;
}

bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {

    if (rpc_address_cache_.tryGet(hostid, svr_addr)) {
@@ -512,76 +489,56 @@ bool is_local_op(const size_t recipient) {
    return recipient == fs_config->host_id;
}

void register_client_ipcs(hg_class_t* hg_class) {
    minimal_id = MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, nullptr);
    ipc_open_id = MERCURY_REGISTER(hg_class, "ipc_srv_open", ipc_open_in_t, ipc_err_out_t, nullptr);
    ipc_stat_id = MERCURY_REGISTER(hg_class, "ipc_srv_stat", ipc_stat_in_t, ipc_stat_out_t, nullptr);
    ipc_unlink_id = MERCURY_REGISTER(hg_class, "ipc_srv_unlink", ipc_unlink_in_t, ipc_err_out_t, nullptr);
    ipc_update_metadentry_id = MERCURY_REGISTER(hg_class, "rpc_srv_update_metadentry", rpc_update_metadentry_in_t,
                                                rpc_err_out_t, nullptr);
    ipc_update_metadentry_size_id = MERCURY_REGISTER(hg_class, "rpc_srv_update_metadentry_size",
void register_client_ipcs(margo_instance_id mid) {
    minimal_id = MARGO_REGISTER(mid, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, NULL);
    ipc_open_id = MARGO_REGISTER(mid, "ipc_srv_open", ipc_open_in_t, ipc_err_out_t, NULL);
    ipc_stat_id = MARGO_REGISTER(mid, "ipc_srv_stat", ipc_stat_in_t, ipc_stat_out_t, NULL);
    ipc_unlink_id = MARGO_REGISTER(mid, "ipc_srv_unlink", ipc_unlink_in_t, ipc_err_out_t, NULL);
    ipc_update_metadentry_id = MARGO_REGISTER(mid, "rpc_srv_update_metadentry", rpc_update_metadentry_in_t,
                                              rpc_err_out_t, NULL);
    ipc_update_metadentry_size_id = MARGO_REGISTER(mid, "rpc_srv_update_metadentry_size",
                                                   rpc_update_metadentry_size_in_t, rpc_update_metadentry_size_out_t,
                                                     nullptr);
    ipc_config_id = MERCURY_REGISTER(hg_class, "ipc_srv_fs_config", ipc_config_in_t, ipc_config_out_t,
                                     nullptr);
    ipc_write_data_id = MERCURY_REGISTER(hg_class, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t,
                                         nullptr);
    ipc_read_data_id = MERCURY_REGISTER(hg_class, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t,
                                        nullptr);
}

void register_client_rpcs(hg_class_t* hg_class) {
    rpc_minimal_id = MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, nullptr);
    rpc_create_node_id = MERCURY_REGISTER(hg_class, "rpc_srv_create_node", rpc_create_node_in_t,
                                          rpc_err_out_t, nullptr);
    rpc_attr_id = MERCURY_REGISTER(hg_class, "rpc_srv_attr", rpc_get_attr_in_t, rpc_get_attr_out_t, nullptr);
    rpc_remove_node_id = MERCURY_REGISTER(hg_class, "rpc_srv_remove_node", rpc_remove_node_in_t,
                                          rpc_err_out_t, nullptr);
    rpc_update_metadentry_id = MERCURY_REGISTER(hg_class, "rpc_srv_update_metadentry", rpc_update_metadentry_in_t,
                                                rpc_err_out_t, nullptr);
    rpc_update_metadentry_size_id = MERCURY_REGISTER(hg_class, "rpc_srv_update_metadentry_size",
                                                   NULL);
    ipc_config_id = MARGO_REGISTER(mid, "ipc_srv_fs_config", ipc_config_in_t, ipc_config_out_t,
                                   NULL);
    ipc_write_data_id = MARGO_REGISTER(mid, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t,
                                       NULL);
    ipc_read_data_id = MARGO_REGISTER(mid, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t,
                                      NULL);
}

void register_client_rpcs(margo_instance_id mid) {
    rpc_minimal_id = MARGO_REGISTER(mid, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, NULL);
    rpc_create_node_id = MARGO_REGISTER(mid, "rpc_srv_create_node", rpc_create_node_in_t,
                                        rpc_err_out_t, NULL);
    rpc_attr_id = MARGO_REGISTER(mid, "rpc_srv_attr", rpc_get_attr_in_t, rpc_get_attr_out_t, NULL);
    rpc_remove_node_id = MARGO_REGISTER(mid, "rpc_srv_remove_node", rpc_remove_node_in_t,
                                        rpc_err_out_t, NULL);
    rpc_update_metadentry_id = MARGO_REGISTER(mid, "rpc_srv_update_metadentry", rpc_update_metadentry_in_t,
                                              rpc_err_out_t, NULL);
    rpc_update_metadentry_size_id = MARGO_REGISTER(mid, "rpc_srv_update_metadentry_size",
                                                   rpc_update_metadentry_size_in_t, rpc_update_metadentry_size_out_t,
                                                     nullptr);
    rpc_write_data_id = MERCURY_REGISTER(hg_class, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t,
                                         nullptr);
    rpc_read_data_id = MERCURY_REGISTER(hg_class, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t,
                                        nullptr);
                                                   NULL);
    rpc_write_data_id = MARGO_REGISTER(mid, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t,
                                       NULL);
    rpc_read_data_id = MARGO_REGISTER(mid, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t,
                                      NULL);
}

bool init_ipc_client() {
    auto protocol_port = "na+sm"s;
    LD_LOG_DEBUG0(debug_fd, "Initializing Mercury IPC client ...\n");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_context_t* hg_context;
    hg_class = HG_Init(protocol_port.c_str(), HG_FALSE);
    if (hg_class == nullptr) {
        LD_LOG_DEBUG0(debug_fd, "HG_Init() Failed to init Mercury IPC client layer\n");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        LD_LOG_DEBUG0(debug_fd, "HG_Context_create() Failed to create Mercury IPC client context\n");
        HG_Finalize(hg_class);
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");

    /* MARGO PART */
    LD_LOG_DEBUG0(debug_fd, "Initializing Margo IPC client ...\n");
    // Start Margo
    auto mid = margo_init(0, 0,
                          hg_context);

    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init(protocol_port.c_str(), MARGO_CLIENT_MODE, 0, 0);

    if (mid == MARGO_INSTANCE_NULL) {
        LD_LOG_DEBUG0(debug_fd, "[ERR]: margo_init failed to initialize the Margo IPC client\n");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");


    margo_ipc_id_ = mid;

    auto adafs_daemon_pid = getProcIdByName("adafs_daemon"s);
@@ -594,7 +551,7 @@ bool init_ipc_client() {
    string sm_addr_str = "na+sm://"s + to_string(adafs_daemon_pid) + "/0";
    margo_addr_lookup(margo_ipc_id_, sm_addr_str.c_str(), &daemon_svr_addr_);

    register_client_ipcs(hg_class);
    register_client_ipcs(mid);

//    for (int i = 0; i < 10; ++i) {
//        printf("Running %d iteration\n", i);
@@ -606,41 +563,21 @@ bool init_ipc_client() {

bool init_rpc_client() {
    string protocol_port = RPC_PROTOCOL;
    LD_LOG_DEBUG0(debug_fd, "Initializing Mercury RPC client ...\n");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_context_t* hg_context;
    hg_class = HG_Init(protocol_port.c_str(), HG_FALSE);
    if (hg_class == nullptr) {
        LD_LOG_DEBUG0(debug_fd, "HG_Init() Failed to init Mercury RPC client layer\n");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        LD_LOG_DEBUG0(debug_fd, "HG_Context_create() Failed to create Mercury RPC client context\n");
        HG_Finalize(hg_class);
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");

    /* MARGO PART */
    LD_LOG_DEBUG0(debug_fd, "Initializing Margo RPC client ...\n");
    // Start Margo
    auto mid = margo_init(0, 0,
                          hg_context);

    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init(protocol_port.c_str(), MARGO_CLIENT_MODE, 0, 0);

    if (mid == MARGO_INSTANCE_NULL) {
        LD_LOG_DEBUG0(debug_fd, "[ERR]: margo_init failed to initialize the Margo RPC client\n");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");

    margo_rpc_id_ = mid;

    register_client_rpcs(hg_class);
    register_client_rpcs(mid);

//    for (int i = 0; i < 10000; ++i) {
//        printf("Running %d iteration\n", i);
@@ -668,9 +605,7 @@ hg_addr_t daemon_addr() {
 */
void init_environment() {
    // init margo client for IPC
    auto err = init_ld_argobots();
    assert(err);
    err = init_ipc_client();
    auto err = init_ipc_client();
    assert(err);
    err = ipc_send_get_fs_config(ipc_config_id); // get fs configurations the daemon was started with.
    assert(err);
@@ -730,7 +665,7 @@ void init_passthrough_if_needed() {
/**
 * Called initially when preload library is used with the LD_PRELOAD environment variable
 */
void init_preload(void) {
void init_preload() {
    init_passthrough_if_needed();
    init_environment();
    printf("[INFO] preload init successful.\n");
@@ -739,7 +674,7 @@ void init_preload(void) {
/**
 * Called last when preload library is used with the LD_PRELOAD environment variable
 */
void destroy_preload(void) {
void destroy_preload() {

    LD_LOG_DEBUG0(debug_fd, "Freeing Mercury daemon addr ...\n");
    HG_Addr_free(margo_get_class(margo_ipc_id_), daemon_svr_addr_);
+13 −13
Original line number Diff line number Diff line
@@ -13,11 +13,11 @@ int rpc_send_write(const hg_id_t ipc_write_data_id, const hg_id_t rpc_write_data

    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    bool local_op = true;
    rpc_write_data_in_t in;
    rpc_data_out_t out;
    rpc_write_data_in_t in{};
    rpc_data_out_t out{};
    int err;
    hg_return_t ret;
    margo_instance_id used_mid;
    // fill in
    in.path = path.c_str();
    in.size = in_size;
@@ -30,27 +30,27 @@ int rpc_send_write(const hg_id_t ipc_write_data_id, const hg_id_t rpc_write_data

    auto recipient = get_rpc_node(path);
    if (is_local_op(recipient)) { // local
        ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_write_data_id, &handle);
        ret = margo_create(ld_margo_ipc_id(), daemon_addr(), ipc_write_data_id, &handle);
        LD_LOG_TRACE0(debug_fd, "rpc_send_write to local daemon (IPC)\n");
        used_mid = ld_margo_ipc_id();
    } else { // remote
        local_op = false;
        // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
        if (!get_addr_by_hostid(recipient, svr_addr)) {
            LD_LOG_ERROR(debug_fd, "server address not resolvable for host id %lu\n", recipient);
            return 1;
        }
        ret = HG_Create(margo_get_context(ld_margo_rpc_id()), svr_addr, rpc_write_data_id, &handle);
        ret = margo_create(ld_margo_rpc_id(), svr_addr, rpc_write_data_id, &handle);
        LD_LOG_TRACE0(debug_fd, "rpc_send_write to remote daemon (RPC)\n");
        used_mid = ld_margo_ipc_id();
    }
    if (ret != HG_SUCCESS) {
        LD_LOG_ERROR0(debug_fd, "creating handle FAILED\n");
        return 1;
    }
    auto hgi = HG_Get_info(handle);
    /* register local target buffer for bulk access */
    // remove constness from buffer for transfer
    void* b_buf = const_cast<void*>(buf);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in_size, HG_BULK_READ_ONLY, &in.bulk_handle);
    ret = margo_bulk_create(used_mid, 1, &b_buf, &in_size, HG_BULK_READ_ONLY, &in.bulk_handle);
    if (ret != 0)
        LD_LOG_ERROR0(debug_fd, "failed to create bulk on client\n");

@@ -64,12 +64,12 @@ int rpc_send_write(const hg_id_t ipc_write_data_id, const hg_id_t rpc_write_data
    if (send_ret == HG_SUCCESS) {

        /* decode response */
        ret = HG_Get_output(handle, &out);
        ret = margo_get_output(handle, &out);
        err = out.res;
        write_size = static_cast<size_t>(out.io_size);
        LD_LOG_TRACE(debug_fd, "Got response %d\n", out.res);
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
        margo_free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "RPC rpc_send_write (timed out)");
        err = EAGAIN;
@@ -77,9 +77,9 @@ int rpc_send_write(const hg_id_t ipc_write_data_id, const hg_id_t rpc_write_data

    in.path = nullptr;

    HG_Bulk_free(in.bulk_handle);
    HG_Free_input(handle, &in);
    HG_Destroy(handle);
    margo_bulk_free(in.bulk_handle);
    margo_free_input(handle, &in);
    margo_destroy(handle);

    return err;
}
 No newline at end of file
Loading