Commit 7b03d7d7 authored by Marc Vef's avatar Marc Vef
Browse files

Daemon: Read/Writes to local fs is now done in parallel with RDMA

Before data was pulled first then written or read first and then pushed back
parent fe56f3d1
Loading
Loading
Loading
Loading
+0 −7
Original line number Diff line number Diff line
@@ -31,11 +31,4 @@ void read_file_abt(void* _arg);

void write_file_abt(void* _arg);

int
write_chunks(const std::string& path, const std::vector<char*>& bulk_buf_ptrs, const std::vector<hg_size_t>& chnk_ids,
             const std::vector<hg_size_t>& chnk_sizes, const off64_t offset, size_t& write_size);

int read_chunks(const std::string& path, const std::vector<char*>& bulk_buf_ptrs, const std::vector<uint64_t>& chnk_ids,
                const std::vector<uint64_t>& chnk_sizes, const off64_t offset, size_t& read_size);

#endif //IFS_DATA_HPP
+0 −88
Original line number Diff line number Diff line
@@ -100,50 +100,6 @@ void write_file_abt(void* _arg) {
    close(fd);
}

int write_chunks(const string& path, const vector<char*>& bulk_buf_ptrs, const vector<uint64_t>& chnk_ids,
                 const vector<uint64_t>& chnk_sizes, const off64_t offset, size_t& write_size) {
    write_size = 0;
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
    auto chnk_n = static_cast<unsigned int>(chnk_ids.size()); // Case-safe: There never are so many chunks at once
    vector<ABT_eventual> eventuals(chnk_n);
    vector<unique_ptr<struct write_chunk_args>> task_args(chnk_n);
    for (size_t i = 0; i < chnk_n; i++) {
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &eventuals[i]); // written file return value
        auto args = make_unique<write_chunk_args>();
        args->path = &path;
        args->buf = bulk_buf_ptrs[i];
        args->chnk_id = chnk_ids[i];
        args->size = chnk_sizes[i];
        // only the first chunk gets the offset. the chunks are sorted on the client side
        args->off = (i == 0 ? offset : 0);
        args->eventual = eventuals[i];
        task_args[i] = std::move(args);
        auto ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &(*task_args[i]), nullptr);
        if (ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
        }
    }
    for (unsigned int i = 0; i < chnk_n; i++) {
        size_t* task_written_size;
        // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler
        ABT_eventual_wait(eventuals[i], (void**) &task_written_size);
        if (task_written_size == nullptr || *task_written_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Writing file task {} did return nothing. NO ACTION WAS DONE",
                                           __func__, i);
//            // TODO How do we handle already written chunks? Ideally, we would need to remove them after failure.
//            ADAFS_DATA->spdlogger()->error("{}() Writing chunk failed with path {} and id {}. Aborting ...", __func__,
//                                           path, chnk_id);
            write_size = 0;
            return -1;
        } else {
            write_size += *task_written_size;
        }
        ABT_eventual_free(&eventuals[i]);
    }
    return 0;
}

/**
 * Used by an argobots threads. Argument args has the following fields:
 * const std::string* path;
@@ -181,47 +137,3 @@ void read_file_abt(void* _arg) {
    close(fd);
    ABT_eventual_set(arg->eventual, &read_size, sizeof(size_t));
}
 No newline at end of file

int read_chunks(const string& path, const vector<char*>& bulk_buf_ptrs, const vector<uint64_t>& chnk_ids,
                const vector<uint64_t>& chnk_sizes, const off64_t offset, size_t& read_size) {
    read_size = 0;
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
    auto chnk_n = static_cast<unsigned int>(chnk_ids.size()); // Case-safe: There never are so many chunks at once
    vector<ABT_eventual> eventuals(chnk_n);
    vector<unique_ptr<struct read_chunk_args>> task_args(chnk_n);
    for (size_t i = 0; i < chnk_n; i++) {
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &eventuals[i]); // written file return value
        auto args = make_unique<read_chunk_args>();
        args->path = &path;
        args->buf = bulk_buf_ptrs[i];
        args->chnk_id = chnk_ids[i];
        args->size = chnk_sizes[i];
        // only the first chunk gets the offset. the chunks are sorted on the client side
        args->off = (i == 0 ? offset : 0);
        args->eventual = eventuals[i];
        task_args[i] = std::move(args);
        auto ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*task_args[i]), nullptr);
        if (ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
        }
    }

    for (unsigned int i = 0; i < chnk_n; i++) {
        size_t* task_read_size;
        ABT_eventual_wait(eventuals[i], (void**) &task_read_size);
        if (task_read_size == nullptr || *task_read_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Reading file task {} did return nothing. NO ACTION WAS DONE",
                                           __func__, i);
//            // TODO How do we handle errors?
//            ADAFS_DATA->spdlogger()->error("{}() read chunk failed with path {} and id {}. Aborting ...", __func__,
//                                           path, chnk_id);
            read_size = 0;
            return -1;
        } else {
            read_size += *task_read_size;
        }
        ABT_eventual_free(&eventuals[i]);
    }
    return 0;
}
 No newline at end of file
+162 −92
Original line number Diff line number Diff line
@@ -20,21 +20,14 @@ size_t get_rpc_node(const string& to_hash) {
static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    rpc_read_data_in_t in{};
    rpc_data_out_t out{};
    int err;
    hg_bulk_t bulk_handle = nullptr;
    auto read_size = static_cast<size_t>(0);
    // Set default out for error
    out.res = EIO;
    out.io_size = 0;

    // Getting some information from margo
    auto ret = margo_get_input(handle, &in);
    assert(ret == HG_SUCCESS);

    auto hgi = margo_get_info(handle);
    auto mid = margo_hg_info_get_instance(hgi);


    auto segment_count = margo_bulk_get_segment_count(in.bulk_handle);
    auto bulk_size = margo_bulk_get_size(in.bulk_handle);
    ADAFS_DATA->spdlogger()->debug("{}() Got read RPC (local {}) with path {} size {} offset {}", __func__,
                                   (margo_get_info(handle)->target_id == ADAFS_DATA->host_id()), in.path, bulk_size,
@@ -46,19 +39,20 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    // used to set pointer to offsets in bulk_buf which correspond to chunks
    vector<char*> bulk_buf_ptrs(in.chunks);
    // create bulk handle and allocated memory for buffer with buf_sizes information
    ret = margo_bulk_create(mid, segment_count, nullptr, &in.total_chunk_size, HG_BULK_READ_ONLY, &bulk_handle);
    ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size, HG_BULK_READ_ONLY, &bulk_handle);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    }
    // access the internally allocated memory buffer and put it into buf_ptrs
    uint32_t actual_count; // XXX dont need?
    ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size, HG_BULK_READWRITE, segment_count, &bulk_buf,
    ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size, HG_BULK_READWRITE, 1, &bulk_buf,
                            &in.total_chunk_size, &actual_count);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    auto inpath = make_shared<string>(in.path);
    auto my_id = ADAFS_DATA->host_id();
    // chnk_ids used by this host
    vector<uint64_t> chnk_ids(in.chunks);
@@ -73,6 +67,9 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    auto chnk_size_left = in.total_chunk_size;
    // temporary traveling pointer
    auto chnk_ptr = static_cast<char*>(bulk_buf);
    // tasks structures
    vector<ABT_eventual> task_eventuals(in.chunks);
    vector<unique_ptr<struct read_chunk_args>> task_args(in.chunks);
    auto transfer_size = (bulk_size <= CHUNKSIZE) ? bulk_size : CHUNKSIZE;
    for (auto i = in.chunk_start; i < in.chunk_end || chnk_count < in.chunks; i++) {
        if (get_rpc_node(in.path + fmt::FormatInt(i).str()) == my_id) {
@@ -104,32 +101,61 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
                chnk_ptr += transfer_size;
                chnk_size_left -= transfer_size;
            }
            chnk_count++;
            // Starting tasklets for parallel I/O
            ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_count]); // written file return value
            auto task_arg = make_unique<read_chunk_args>();
            task_arg->path = inpath.get();
            task_arg->buf = bulk_buf_ptrs[chnk_count];
            task_arg->chnk_id = chnk_ids[chnk_count];
            task_arg->size = chnk_sizes[chnk_count];
            // only the first chunk gets the offset. the chunks are sorted on the client side
            task_arg->off = (i == 0 ? in.offset : 0);
            task_arg->eventual = task_eventuals[chnk_count];
            task_args[chnk_count] = std::move(task_arg);
            auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*task_args[chnk_count]), nullptr);
            if (abt_ret != ABT_SUCCESS) {
                ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
                out.res = EBUSY;
                return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
            }
            chnk_count++;
        }
    // read the data
    err = read_chunks(in.path, bulk_buf_ptrs, chnk_ids, chnk_sizes, in.offset, read_size);
    if (err != 0 || in.total_chunk_size != read_size) {
        out.res = err;
        ADAFS_DATA->spdlogger()->error("{}() Failed to read chunks on path {}", __func__, in.path);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    for (uint64_t chnk_id = 0; chnk_id < chnk_ids.size(); chnk_id++) {
        size_t* task_read_size;
        ABT_eventual_wait(task_eventuals[chnk_id], (void**) &task_read_size);
        if (task_read_size == nullptr || *task_read_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Reading chunk id file {} did return nothing. NO ACTION WAS DONE",
                                           __func__, chnk_id);
            // TODO How do we handle errors?
            out.io_size = 0;
            out.res = EIO;
            ADAFS_DATA->spdlogger()->error("{}() Failed to read data to local disk.");
            return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
        } else {
            ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id],
                                      bulk_handle, local_offsets[chnk_id], chnk_sizes[chnk_id]);
            if (ret != HG_SUCCESS) {
                ADAFS_DATA->spdlogger()->error(
                        "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}",
                    __func__, chnk_id, in.path, origin_offsets[chnk_id], local_offsets[chnk_id], chnk_sizes[chnk_id]);
                        __func__, chnk_id, in.path, origin_offsets[chnk_id], local_offsets[chnk_id],
                        chnk_sizes[chnk_id]);
                out.res = EBUSY;
                return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
            }

            out.io_size += *task_read_size;
        }
        ABT_eventual_free(&task_eventuals[chnk_id]);
    }

    if (in.total_chunk_size != out.io_size) {
        out.res = EIO;
        ADAFS_DATA->spdlogger()->error("{}() read chunk size does not match with requested size in path {}", __func__,
                                       in.path);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    // Everything is well, set result to success and send response
    out.res = 0;
    out.io_size = read_size;

    //cleanup
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);

@@ -139,31 +165,30 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data)

static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    /*
     * 1. Setup
     */
    rpc_write_data_in_t in{};
    rpc_data_out_t out{};
    hg_bulk_t bulk_handle = nullptr;
    hg_return_t ret;
    // default out
    out.res = EIO;
    out.io_size = 0;

    auto ret = margo_get_input(handle, &in);
    assert(ret == HG_SUCCESS);

    // get some margo information
    margo_get_input(handle, &in);
    auto hgi = margo_get_info(handle);
    auto mid = margo_hg_info_get_instance(hgi);


    auto segment_count = margo_bulk_get_segment_count(in.bulk_handle);
    auto bulk_size = margo_bulk_get_size(in.bulk_handle);
    ADAFS_DATA->spdlogger()->debug("{}() Got write RPC (local {}) with path {} size {} offset {}", __func__,
    ADAFS_DATA->spdlogger()->info("{}() Got write RPC (local {}) with path {} size {} offset {}", __func__,
                                  (margo_get_info(handle)->target_id == ADAFS_DATA->host_id()), in.path, bulk_size,
                                  in.offset);

    // array of pointers for bulk transfer (allocated in margo_bulk_create)
    // used for bulk transfer
    void* bulk_buf;
    // used to set pointer to offsets in bulk_buf which correspond to chunks
    vector<char*> bulk_buf_ptrs(in.chunks);
    /*
     * 2. Set up buffers for pull bulk transfers
     */
    void* bulk_buf; // buffer for bulk transfer
    vector<char*> bulk_buf_ptrs(in.chunks); // buffer-chunk offsets
    // create bulk handle and allocated memory for buffer with buf_sizes information
    ret = margo_bulk_create(mid, segment_count, nullptr, &in.total_chunk_size, HG_BULK_WRITE_ONLY, &bulk_handle);
    if (ret != HG_SUCCESS) {
@@ -178,14 +203,14 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    auto my_id = ADAFS_DATA->host_id();
    auto inpath = make_shared<std::string>(in.path);
    // chnk_ids used by this host
    vector<uint64_t> chnk_ids(in.chunks);
    // chnk sizes per chunk for this host
    vector<uint64_t> chnk_sizes(in.chunks);
    // counter to track how many chunks have been assigned
    auto chnk_count = static_cast<uint64_t>(0);
    // how much is left to pull
    // how much is left to read
    auto chnk_size_left = in.total_chunk_size;
    // temporary traveling pointer
    auto chnk_ptr = static_cast<char*>(bulk_buf);
@@ -202,20 +227,31 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    auto transfer_size = (bulk_size <= CHUNKSIZE) ? bulk_size : CHUNKSIZE;
    uint64_t origin_offset;
    uint64_t local_offset;
    for (auto i = in.chunk_start; i < in.chunk_end || chnk_count < in.chunks; i++) {
        if (get_rpc_node(in.path + fmt::FormatInt(i).str()) == my_id) {
            chnk_ids[chnk_count] = i; // chunk id number
    // task structures
    vector<ABT_eventual> task_eventuals(in.chunks);
    vector<unique_ptr<struct write_chunk_args>> task_args(in.chunks);
    for (auto chnk_idx = in.chunk_start; chnk_idx < in.chunk_end || chnk_count < in.chunks; chnk_idx++) {
        // Continue if chunk does not hash to this node
        if (get_rpc_node(in.path + fmt::FormatInt(chnk_idx).str()) != ADAFS_DATA->host_id())
            continue;
        chnk_ids[chnk_count] = chnk_idx; // chunk id number
        // offset case
            if (i == in.chunk_start && in.offset > 0) {
        if (chnk_idx == in.chunk_start && in.offset > 0) {
            // if only 1 destination and 1 chunk (small write) the transfer_size == bulk_size
            auto offset_transfer_size = (in.offset + bulk_size <= CHUNKSIZE) ? bulk_size : static_cast<size_t>(
                    CHUNKSIZE - in.offset);
            ADAFS_DATA->spdlogger()->info(
                    "{}() BEGIN HG_BULK_PULL target_id {} origin_offset {} local_offset {} transfer_size {}",
                    __func__, hgi->target_id, 0, 0, offset_transfer_size);
            ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0,
                                      bulk_handle, 0, offset_transfer_size);
            ADAFS_DATA->spdlogger()->info(
                    "{}() END   HG_BULK_PULL target_id {} origin_offset {} local_offset {} transfer_size {}\n",
                    __func__, hgi->target_id, 0, 0, offset_transfer_size);
            if (ret != HG_SUCCESS) {
                ADAFS_DATA->spdlogger()->error(
                        "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__,
                            i, in.chunk_start, in.chunk_end - 1);
                        chnk_idx, in.chunk_start, in.chunk_end - 1);
                return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
            }
            bulk_buf_ptrs[chnk_count] = chnk_ptr;
@@ -225,18 +261,24 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        } else {
            local_offset = in.total_chunk_size - chnk_size_left;
            if (in.offset > 0)
                    origin_offset = (CHUNKSIZE - in.offset) + ((i - in.chunk_start) - 1) * CHUNKSIZE;
                origin_offset = (CHUNKSIZE - in.offset) + ((chnk_idx - in.chunk_start) - 1) * CHUNKSIZE;
            else
                    origin_offset = (i - in.chunk_start) * CHUNKSIZE;
                origin_offset = (chnk_idx - in.chunk_start) * CHUNKSIZE;
            // last chunk might have different transfer_size
            if (chnk_count == in.chunks - 1)
                transfer_size = chnk_size_left;
            ADAFS_DATA->spdlogger()->info(
                    "{}() BEGIN HG_BULK_PULL target_id {} origin_offset {} local_offset {} transfer_size {}",
                    __func__, hgi->target_id, origin_offset, local_offset, transfer_size);
            ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, origin_offset,
                                      bulk_handle, local_offset, transfer_size);
            ADAFS_DATA->spdlogger()->info(
                    "{}() END   HG_BULK_PULL target_id {} origin_offset {} local_offset {} transfer_size {}\n",
                    __func__, hgi->target_id, origin_offset, local_offset, transfer_size);
            if (ret != HG_SUCCESS) {
                ADAFS_DATA->spdlogger()->error(
                        "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__,
                            i, in.chunk_start, in.chunk_end - 1);
                        chnk_idx, in.chunk_start, in.chunk_end - 1);
                return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
            }
            bulk_buf_ptrs[chnk_count] = chnk_ptr;
@@ -244,17 +286,45 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
            chnk_ptr += transfer_size;
            chnk_size_left -= transfer_size;
        }
            chnk_count++;
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_count]); // written file return value
        auto task_arg = make_unique<struct write_chunk_args>();
        task_arg->path = inpath.get();
        task_arg->buf = bulk_buf_ptrs[chnk_count];
        task_arg->chnk_id = chnk_ids[chnk_count];
        task_arg->size = chnk_sizes[chnk_count];
        // only the first chunk gets the offset. the chunks are sorted on the client side
        task_arg->off = (chnk_idx == 0 ? in.offset : 0);
        task_arg->eventual = task_eventuals[chnk_count];
        task_args[chnk_count] = std::move(task_arg);
        auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &(*task_args[chnk_count]), nullptr);
        if (abt_ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
            return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
        }
        // next chunk
        chnk_count++;

    }
    // XXX check that sizes left is 0 as sanity check
    // do write operation if all is good
    out.res = write_chunks(in.path, bulk_buf_ptrs, chnk_ids, chnk_sizes, in.offset, out.io_size);
    if (out.res != 0) {
    for (unsigned int i = 0; i < in.chunks; i++) {
        size_t* task_written_size;
        // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler
        ABT_eventual_wait(task_eventuals[i], (void**) &task_written_size);
        if (task_written_size == nullptr || *task_written_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Writing file task {} did return nothing. NO ACTION WAS DONE",
                                           __func__, i);
//            // TODO How do we handle already written chunks? Ideally, we would need to remove them after failure.
            out.io_size = 0;
            ADAFS_DATA->spdlogger()->error("{}() Failed to write data to local disk.");
            return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
        } else {
            out.io_size += *task_written_size;
        }
        ABT_eventual_free(&task_eventuals[i]);
    }
    // XXX check that sizes left is 0 as sanity check
    // respond and cleanup
    out.res = 0;
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);

+0 −66
Original line number Diff line number Diff line
@@ -132,72 +132,6 @@ void rpc_send_read_abt(void* _arg) {
            break;
        }
    }

//    for (size_t i = 0; i < buf_sizes.size(); i++) {
//        // even numbers contain the sizes of ids, while uneven contain the chunksize
//        if (i < buf_sizes.size() / 2)
//            buf_sizes[i] = sizeof(rpc_chnk_id_t);
//        else {
//            if (i == buf_sizes.size() / 2) { // first chunk which might have an offset
//                if (arg->in_size + arg->in_offset < CHUNKSIZE)
//                    buf_sizes[i] = static_cast<size_t>(arg->in_size);
//                else if (chunk_offset == 0) // if the first chunk is the very first chunk in the buffer
//                    buf_sizes[i] = static_cast<size_t>(CHUNKSIZE - arg->in_offset);
//                else
//                    buf_sizes[i] = CHUNKSIZE;
//            } else if (i + 1 == buf_sizes.size()) {// last chunk has remaining size
//                buf_sizes[i] = arg->in_size - buf_size;
//            } else {
//                buf_sizes[i] = CHUNKSIZE;
//            }
//
//            // position the pointer according to the chunk number this code is executed for the second chunk+
//            chnks[i - chnks.size()] = static_cast<char*>(const_cast<void*>(arg->buf)) + chunk_offset + buf_size;
//            buf_size += buf_sizes[i];
//        }
//    }
//    // setting pointers to the ids and to the chunks
//    vector<void*> buf_ptrs(recipient_size * 2);
//    for (unsigned long i = 0; i < buf_ptrs.size(); i++) {
//        if (i < buf_sizes.size() / 2) // id pointer
//            buf_ptrs[i] = &chnk_ids[i];
//        else // data pointer
//            buf_ptrs[i] = chnks[i - chnk_ids.size()];
//    }
//
//    hg_handle_t handle;
//    hg_addr_t svr_addr = HG_ADDR_NULL;
//    rpc_read_data_in_t in{};
//    rpc_data_out_t out{};
//    hg_return_t ret;
//    auto read_size = static_cast<size_t>(0);
//    // fill in
//    in.path = arg->path->c_str();
//    in.size = arg->in_size;
//    in.offset = (chunk_offset == 0) ? arg->in_offset : 0;
//
//    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, svr_addr, false);
//
//    auto used_mid = margo_hg_handle_get_instance(handle);
//    /* register local target buffer for bulk access */
//    ret = margo_bulk_create(used_mid, static_cast<uint32_t>(buf_sizes.size()), buf_ptrs.data(), buf_sizes.data(),
//                            HG_BULK_READWRITE, &in.bulk_handle);
//    if (ret != HG_SUCCESS) {
//        ld_logger->error("{}() failed to create bulk on client", __func__);
//        ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
//        return;
//    }
//    // Send RPC and wait for response
//    for (int i = 0; i < RPC_TRIES; ++i) {
//        margo_request req;
//        ret = margo_iforward(handle, &in, &req);
//        if (ret == HG_SUCCESS) {
//            // Wait for the RPC response.
//            // This will call eventual_wait internally causing the calling ULT to be BLOCKED and implicitly yields
//            ret = margo_wait(req);
//            break;
//        }
//    }
    if (ret == HG_SUCCESS) {
        /* decode response */
        ret = margo_get_output(handle, &out);