Commit d2139c87 authored by Marc Vef's avatar Marc Vef
Browse files

ifs: preload read ipc/rpc (without chunking)

parent e6b1e9ac
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@

// RPC configuration
#define RPCPORT 4433
#define RPC_TRIES 3
#define RPC_TIMEOUT 150000
#define RPC_PROTOCOL "bmi+tcp"

+64 −0
Original line number Diff line number Diff line
@@ -7,6 +7,10 @@

#include <preload/preload.hpp>
#include <preload/ipc_types.hpp>
#include <rpc/rpc_types.hpp>

using namespace std;


void send_minimal_ipc(const hg_id_t minimal_id);

@@ -21,4 +25,64 @@ int ipc_send_unlink(const string& path, const hg_id_t ipc_unlink_id);
int ipc_send_write(const string& path, const size_t in_size, const off_t in_offset,
                   const void* buf, size_t& write_size, const bool append, const hg_id_t ipc_write_id);

template<typename T>
int ipc_send_read(const string& path, const size_t in_size, const off_t in_offset,
                  T* tar_buf, size_t& read_size, const hg_id_t ipc_read_id) {

    hg_handle_t handle;
    ipc_read_data_in_t in;
    ipc_data_out_t out;
    int err;
    // fill in
    in.path = path.c_str();
    in.size = in_size;
    in.offset = in_offset;

    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_read_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_ERROR0(debug_fd, "creating handle FAILED\n");
        return 1;
    }

    auto hgi = HG_Get_info(handle);
    /* register local target buffer for bulk access */
    auto b_buf = static_cast<void*>(tar_buf);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if (ret != 0) {
        LD_LOG_ERROR0(debug_fd, "failed to create bulkd on client when reading\n");
        return 1;
    }

    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(ld_margo_ipc_id(), handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
    }
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        ret = HG_Get_output(handle,
                            &out); // XXX handle ret out.res can inidicate a failure with reading on the other side.
        tar_buf = static_cast<T*>(b_buf); // XXX wtf am I doing here?
        read_size = static_cast<size_t>(out.io_size);
        err = out.res;
        LD_LOG_DEBUG(debug_fd, "Got response %d\n", out.res);

        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "ipc_send_read (timed out)\n");
        err = EAGAIN;
    }

    in.path = nullptr;

    HG_Bulk_free(in.bulk_handle);
    HG_Free_input(handle, &in);
    HG_Destroy(handle);

    return err;
}

#endif //IFS_MARGO_IPC_HPP
+58 −58
Original line number Diff line number Diff line
@@ -14,69 +14,69 @@ extern "C" {
}

#include <rpc/rpc_types.hpp>
#include <preload/preload.hpp>

#include <iostream>

static int max_retries = 3;

template<typename T>
int rpc_send_read(const size_t recipient, const std::string& path, const size_t in_size, const off_t in_offset,
                  T* tar_buf, size_t& read_size) {
//    hg_handle_t handle;
//    hg_addr_t svr_addr = HG_ADDR_NULL;
//    rpc_read_data_in_t in;
//    rpc_data_out_t out;
//    int err;
//    // fill in
//    in.path = path.c_str();
//    in.size = in_size;
//    in.offset = in_offset;
//    // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
//    if (!RPC_DATA->get_addr_by_hostid(recipient, svr_addr)) {
//        ADAFS_DATA->spdlogger()->error("server address not resolvable for host id {}", recipient);
//        return 1;
//    }
//    auto ret = HG_Create(RPC_DATA->client_hg_context(), svr_addr, RPC_DATA->rpc_srv_read_data_id(), &handle);
//    if (ret != HG_SUCCESS) {
//        ADAFS_DATA->spdlogger()->error("creating handle FAILED");
//        return 1;
//    }
//
//    auto b_buf = static_cast<void*>(tar_buf);
//    /* register local target buffer for bulk access */
//    auto hgi = HG_Get_info(handle);
//    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
//    if (ret != 0)
//        ADAFS_DATA->spdlogger()->error("failed to create bulk on client");
//
//    int send_ret = HG_FALSE;
//    for (int i = 0; i < max_retries; ++i) {
//        send_ret = margo_forward_timed(RPC_DATA->client_mid(), handle, &in, RPC_TIMEOUT);
//        if (send_ret == HG_SUCCESS) {
//            break;
//        }
//    }
//    if (send_ret == HG_SUCCESS) {
//        /* decode response */
//        ret = HG_Get_output(handle, &out);
//        tar_buf = static_cast<T*>(b_buf);
//        read_size = static_cast<size_t>(out.io_size);
//        err = out.res;
//        ADAFS_DATA->spdlogger()->debug("Got response {}", out.res);
//        /* clean up resources consumed by this rpc */
//        HG_Free_output(handle, &out);
//    } else {
//        ADAFS_DATA->spdlogger()->error("RPC rpc_send_read (timed out)");
//        err = EAGAIN;
//    }
//
//    in.path = nullptr;
//
//    HG_Bulk_free(in.bulk_handle);
//    HG_Free_input(handle, &in);
//    HG_Destroy(handle);
//
//    return err;
                  T* tar_buf, size_t& read_size, const hg_id_t rpc_read_data_id) {
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_read_data_in_t in;
    rpc_data_out_t out;
    int err;
    // fill in
    in.path = path.c_str();
    in.size = in_size;
    in.offset = in_offset;
    // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
    if (!get_addr_by_hostid(recipient, svr_addr)) {
        LD_LOG_ERROR(debug_fd, "server address not resolvable for host id %lu\n", recipient);
        return 1;
    }
    auto ret = HG_Create(margo_get_context(ld_margo_rpc_id()), svr_addr, rpc_read_data_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_ERROR0(debug_fd, "creating handle FAILED\n");
        return 1;
    }

    auto hgi = HG_Get_info(handle);
    /* register local target buffer for bulk access */
    auto b_buf = static_cast<void*>(tar_buf);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if (ret != 0)
        LD_LOG_ERROR0(debug_fd, "failed to create bulk on client\n");

    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(ld_margo_rpc_id(), handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
    }
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        ret = HG_Get_output(handle,
                            &out); // XXX handle ret out.res can inidicate a failure with reading on the other side.
        tar_buf = static_cast<T*>(b_buf);
        read_size = static_cast<size_t>(out.io_size);
        err = out.res;
        LD_LOG_TRACE(debug_fd, "Got response %d\n", out.res);
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "RPC rpc_send_read (timed out)");
        err = EAGAIN;
    }

    in.path = nullptr;

    HG_Bulk_free(in.bulk_handle);
    HG_Free_input(handle, &in);
    HG_Destroy(handle);

    return err;

    return 0;
}
+2 −0
Original line number Diff line number Diff line
@@ -38,6 +38,8 @@ DECLARE_MARGO_RPC_HANDLER(ipc_srv_unlink)

DECLARE_MARGO_RPC_HANDLER(ipc_srv_write_data)

DECLARE_MARGO_RPC_HANDLER(ipc_srv_read_data)


/** OLD BELOW
// mdata ops
+0 −1
Original line number Diff line number Diff line
@@ -10,7 +10,6 @@ extern "C" {
#include <mercury_proc_string.h>
#include <margo.h>
}
//#include "../../main.hpp"

/* visible API for RPC data types used in RPCS */

Loading