Commit beffafc1 authored by Marc Vef's avatar Marc Vef
Browse files

Adding documentation

parent 69b26ab6
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -71,7 +71,6 @@ MERCURY_GEN_PROC(rpc_data_out_t,

MERCURY_GEN_PROC(rpc_write_data_in_t,
                 ((hg_const_string_t) (path))\
((hg_size_t) (size))\
((int64_t) (offset))\
((hg_bool_t) (append))\
((hg_bulk_t) (bulk_handle))\
+20 −10
Original line number Diff line number Diff line

#include <preload/rpc/ld_rpc_data.hpp>
#include <numeric>


using namespace std;

/**
 * Called by an argobots thread in pwrite() and sends all chunks that go to the same destination at once
 * @param _arg <struct write_args*>
 */
void rpc_send_write_abt(void* _arg) {
    auto* arg = static_cast<struct write_args*>(_arg);

@@ -27,7 +30,7 @@ void rpc_send_write_abt(void* _arg) {
            chnks[i / 2] = static_cast<char*>(const_cast<void*>(arg->buf)) + (CHUNKSIZE * chnk_ids[i / 2]);
        }
    }
    auto buf_sum = std::accumulate(buf_sizes.begin(), buf_sizes.end(), static_cast<rpc_chnk_id_t>(0));
    // setting pointers to the ids and to the chunks
    vector<void*> buf_ptrs(recipient_size * 2);
    for (unsigned long i = 0; i < buf_ptrs.size(); i++) {
        if (i % 2 == 0) // id pointer
@@ -41,11 +44,10 @@ void rpc_send_write_abt(void* _arg) {
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_write_data_in_t in{};
    rpc_data_out_t out{};
    int err; // TODO use err as return val
    int err;
    hg_return_t ret;
    // fill in
    in.path = arg->path.c_str();
    in.size = buf_sum;
    in.offset = arg->in_offset;
    in.updated_size = arg->updated_size;
    if (arg->append)
@@ -72,31 +74,40 @@ void rpc_send_write_abt(void* _arg) {
            break;
        }
    }
    auto write_size = static_cast<size_t>(0);
    if (send_ret == HG_SUCCESS) {

        /* decode response */
        ret = margo_get_output(handle, &out);
        err = out.res;
        size_t write_size;
        if (err != 0)
            write_size = static_cast<size_t>(0);
        else
            write_size = static_cast<size_t>(out.io_size);
        // Signal calling process that RPC is finished and put written size into return value
        ABT_eventual_set(*(arg->eventual), &write_size, sizeof(write_size));
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        /* clean up resources consumed by this rpc */
        margo_bulk_free(in.bulk_handle);
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out", __func__);
        err = EAGAIN;
    }

    // Signal calling process that RPC is finished and put written size into return value
    ABT_eventual_set(*(arg->eventual), &write_size, sizeof(write_size));
    margo_destroy(handle);

}

/**
 * XXX Currently unused.
 * @param path
 * @param in_size
 * @param in_offset
 * @param buf
 * @param write_size
 * @param append
 * @param updated_size
 * @return
 */
int rpc_send_write(const string& path, const size_t in_size, const off_t in_offset, void* buf, size_t& write_size,
                   const bool append, const off_t updated_size) {

@@ -108,7 +119,6 @@ int rpc_send_write(const string& path, const size_t in_size, const off_t in_offs
    hg_return_t ret;
    // fill in
    in.path = path.c_str();
    in.size = in_size;
    in.offset = in_offset;
    in.updated_size = updated_size;
    if (append)
+6 −1
Original line number Diff line number Diff line
@@ -71,7 +71,6 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {

    auto ret = margo_get_input(handle, &in);
    assert(ret == HG_SUCCESS);
    ADAFS_DATA->spdlogger()->debug("Got write RPC with path {} size {} offset {}", in.path, in.size, in.offset);

    auto hgi = margo_get_info(handle);
    auto mid = margo_hg_info_get_instance(hgi);
@@ -81,6 +80,12 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    auto bulk_size = margo_bulk_get_size(in.bulk_handle);
    // is write happening over shared memory on the same node?
    auto local_write = is_handle_sm(mid, hgi->addr);
    if (local_write)
        ADAFS_DATA->spdlogger()->debug("Got local write IPC with path {} size {} offset {}", in.path, bulk_size,
                                       in.offset);
    else
        ADAFS_DATA->spdlogger()->debug("Got write RPC with path {} size {} offset {}", in.path, bulk_size, in.offset);


    // set buffer sizes
    vector<hg_size_t> buf_sizes(segment_count);