Commit fe56f3d1 authored by Marc Vef's avatar Marc Vef
Browse files

Refactored read/write RPC algorithm and protocol

Only use 1 segment now but with multiple transfers
parent 7f74be02
Loading
Loading
Loading
Loading
+5 −4
Original line number Diff line number Diff line
@@ -31,10 +31,11 @@ void read_file_abt(void* _arg);

void write_file_abt(void* _arg);

int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                 off_t offset, size_t& write_size);
int
write_chunks(const std::string& path, const std::vector<char*>& bulk_buf_ptrs, const std::vector<hg_size_t>& chnk_ids,
             const std::vector<hg_size_t>& chnk_sizes, const off64_t offset, size_t& write_size);

int read_chunks(const std::string& path, off_t offset, const std::vector<void*>& buf_ptrs,
                const std::vector<hg_size_t>& buf_sizes, size_t& read_size);
int read_chunks(const std::string& path, const std::vector<char*>& bulk_buf_ptrs, const std::vector<uint64_t>& chnk_ids,
                const std::vector<uint64_t>& chnk_sizes, const off64_t offset, size_t& read_size);

#endif //IFS_DATA_HPP
+8 −1
Original line number Diff line number Diff line
@@ -68,8 +68,11 @@ MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, ((hg_int32_t) (err))
// data
MERCURY_GEN_PROC(rpc_read_data_in_t,
                 ((hg_const_string_t) (path))\
((hg_size_t) (size))\
((int64_t) (offset))\
((hg_uint64_t) (chunks))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
((hg_uint64_t) (total_chunk_size))\
((hg_bulk_t) (bulk_handle)))

MERCURY_GEN_PROC(rpc_data_out_t,
@@ -79,6 +82,10 @@ MERCURY_GEN_PROC(rpc_data_out_t,
MERCURY_GEN_PROC(rpc_write_data_in_t,
                 ((hg_const_string_t) (path))\
((int64_t) (offset))\
((hg_uint64_t) (chunks))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
((hg_uint64_t) (total_chunk_size))\
((hg_bulk_t) (bulk_handle)))

#endif //LFS_RPC_TYPES_HPP
+4 −0
Original line number Diff line number Diff line
@@ -18,10 +18,12 @@ extern "C" {
// XXX these two structs can be merged. How to deal with const void* then?
struct write_args {
    std::shared_ptr<std::string> path;
    size_t total_chunk_size;
    size_t in_size;
    off_t in_offset;
    const void* buf;
    size_t chnk_start;
    size_t chnk_end;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
@@ -29,10 +31,12 @@ struct write_args {

struct read_args {
    std::shared_ptr<std::string> path;
    size_t total_chunk_size;
    size_t in_size;
    off_t in_offset;
    void* buf;
    size_t chnk_start;
    size_t chnk_end;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
+19 −26
Original line number Diff line number Diff line
@@ -100,29 +100,26 @@ void write_file_abt(void* _arg) {
    close(fd);
}

int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_size_t>& buf_sizes,
                 const off64_t offset, size_t& write_size) {
int write_chunks(const string& path, const vector<char*>& bulk_buf_ptrs, const vector<uint64_t>& chnk_ids,
                 const vector<uint64_t>& chnk_sizes, const off64_t offset, size_t& write_size) {
    write_size = 0;
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
    auto chnk_n = static_cast<unsigned int>(buf_sizes.size() / 2); // Case-safe: There never are so many chunks at once
    auto chnk_n = static_cast<unsigned int>(chnk_ids.size()); // Case-safe: There never are so many chunks at once
    vector<ABT_eventual> eventuals(chnk_n);
    vector<unique_ptr<struct write_chunk_args>> thread_args(chnk_n);
    for (unsigned int i = 0; i < chnk_n; i++) {
        auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i]));
        auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]);
        auto chnk_size = buf_sizes[i + chnk_n];
    vector<unique_ptr<struct write_chunk_args>> task_args(chnk_n);
    for (size_t i = 0; i < chnk_n; i++) {
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &eventuals[i]); // written file return value
        auto args = make_unique<write_chunk_args>();
        args->path = &path;
        args->buf = chnk_ptr;
        args->chnk_id = chnk_id;
        args->size = chnk_size;
        args->buf = bulk_buf_ptrs[i];
        args->chnk_id = chnk_ids[i];
        args->size = chnk_sizes[i];
        // only the first chunk gets the offset. the chunks are sorted on the client side
        args->off = (i == 0 ? offset : 0);
        args->eventual = eventuals[i];
        thread_args[i] = std::move(args);
        auto ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &(*thread_args[i]), nullptr);
        task_args[i] = std::move(args);
        auto ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &(*task_args[i]), nullptr);
        if (ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
        }
@@ -185,30 +182,26 @@ void read_file_abt(void* _arg) {
    ABT_eventual_set(arg->eventual, &read_size, sizeof(size_t));
}

int read_chunks(const string& path, const off64_t offset, const vector<void*>& buf_ptrs,
                const vector<hg_size_t>& buf_sizes,
                size_t& read_size) {
int read_chunks(const string& path, const vector<char*>& bulk_buf_ptrs, const vector<uint64_t>& chnk_ids,
                const vector<uint64_t>& chnk_sizes, const off64_t offset, size_t& read_size) {
    read_size = 0;
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
    auto chnk_n = static_cast<unsigned int>(buf_sizes.size() / 2); // Case-safe: There never are so many chunks at once
    auto chnk_n = static_cast<unsigned int>(chnk_ids.size()); // Case-safe: There never are so many chunks at once
    vector<ABT_eventual> eventuals(chnk_n);
    vector<unique_ptr<struct read_chunk_args>> thread_args(chnk_n);
    vector<unique_ptr<struct read_chunk_args>> task_args(chnk_n);
    for (size_t i = 0; i < chnk_n; i++) {
        auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i]));
        auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]);
        auto chnk_size = buf_sizes[i + chnk_n];
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &eventuals[i]); // written file return value
        auto args = make_unique<read_chunk_args>();
        args->path = &path;
        args->buf = chnk_ptr;
        args->chnk_id = chnk_id;
        args->size = chnk_size;
        args->buf = bulk_buf_ptrs[i];
        args->chnk_id = chnk_ids[i];
        args->size = chnk_sizes[i];
        // only the first chunk gets the offset. the chunks are sorted on the client side
        args->off = (i == 0 ? offset : 0);
        args->eventual = eventuals[i];
        thread_args[i] = std::move(args);
        auto ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*thread_args[i]), nullptr);
        task_args[i] = std::move(args);
        auto ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*task_args[i]), nullptr);
        if (ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
        }
+160 −81
Original line number Diff line number Diff line
@@ -6,6 +6,17 @@

using namespace std;

/**
 * Determines the node id for a given path
 * @param to_hash
 * @return
 */
size_t get_rpc_node(const string& to_hash) {
    //TODO can this be a shared function?
    return std::hash<string>{}(to_hash) % ADAFS_DATA->host_size();
}


static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    rpc_read_data_in_t in{};
    rpc_data_out_t out{};
@@ -29,70 +40,92 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
                                   (margo_get_info(handle)->target_id == ADAFS_DATA->host_id()), in.path, bulk_size,
                                   in.offset);

    // set buffer sizes
    vector<hg_size_t> buf_sizes(segment_count);
    size_t buf_size = 0;
    size_t id_size = 0;
    for (size_t i = 0; i < segment_count; i++) {
        if (i < segment_count / 2) {
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
            id_size += sizeof(rpc_chnk_id_t);
        } else {
            if (i == segment_count / 2) { // first chunk which might have an offset
                if (in.size + in.offset < CHUNKSIZE)
                    buf_sizes[i] = static_cast<size_t>(in.size);
                else if (in.offset > 0) // if the first chunk is the very first chunk in the buffer
                    buf_sizes[i] = static_cast<size_t>(CHUNKSIZE - in.offset);
                else
                    buf_sizes[i] = CHUNKSIZE;
            } else if (i + 1 == buf_sizes.size()) {// last chunk has remaining size
                buf_sizes[i] = in.size - buf_size;
            } else {
                buf_sizes[i] = CHUNKSIZE;
            }
            buf_size += buf_sizes[i];
        }
    }
    // array of pointers for bulk transfer (allocated in bulk_create)
    vector<void*> buf_ptrs(segment_count);
    // create bulk handle for data transfer (buffers are allocated internally)
    ret = margo_bulk_create(mid, segment_count, nullptr, buf_sizes.data(), HG_BULK_READWRITE, &bulk_handle);
    // array of pointers for bulk transfer (allocated in margo_bulk_create)
    // used for bulk transfer
    void* bulk_buf;
    // used to set pointer to offsets in bulk_buf which correspond to chunks
    vector<char*> bulk_buf_ptrs(in.chunks);
    // create bulk handle and allocated memory for buffer with buf_sizes information
    ret = margo_bulk_create(mid, segment_count, nullptr, &in.total_chunk_size, HG_BULK_READ_ONLY, &bulk_handle);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    }
    // access the internally allocated memory buffer and put it into buf_ptrs
    uint32_t actual_count;
    ret = margo_bulk_access(bulk_handle, 0, bulk_size, HG_BULK_READWRITE, segment_count, buf_ptrs.data(),
                            buf_sizes.data(), &actual_count);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated memory segments for bulk transfer", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    }

    // Get the id numbers on the offset 0
    ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, id_size);
    uint32_t actual_count; // XXX dont need?
    ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size, HG_BULK_READWRITE, segment_count, &bulk_buf,
                            &in.total_chunk_size, &actual_count);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to fetch data IDs", __func__);
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }

    auto my_id = ADAFS_DATA->host_id();
    // chnk_ids used by this host
    vector<uint64_t> chnk_ids(in.chunks);
    // chnk sizes per chunk for this host
    vector<uint64_t> chnk_sizes(in.chunks);
    // local and origin offsets
    vector<uint64_t> local_offsets(in.chunks);
    vector<uint64_t> origin_offsets(in.chunks);
    // counter to track how many chunks have been assigned
    auto chnk_count = static_cast<uint64_t>(0);
    // how much is left to pull
    auto chnk_size_left = in.total_chunk_size;
    // temporary traveling pointer
    auto chnk_ptr = static_cast<char*>(bulk_buf);
    auto transfer_size = (bulk_size <= CHUNKSIZE) ? bulk_size : CHUNKSIZE;
    for (auto i = in.chunk_start; i < in.chunk_end || chnk_count < in.chunks; i++) {
        if (get_rpc_node(in.path + fmt::FormatInt(i).str()) == my_id) {
            chnk_ids[chnk_count] = i; // chunk id number
            // offset case
            if (i == in.chunk_start && in.offset > 0) {
                // if only 1 destination and 1 chunk (small read) the transfer_size == bulk_size
                auto offset_transfer_size = (in.offset + bulk_size <= CHUNKSIZE) ? bulk_size : static_cast<size_t>(
                        CHUNKSIZE - in.offset);
                local_offsets[chnk_count] = 0;
                origin_offsets[chnk_count] = 0;
                bulk_buf_ptrs[chnk_count] = chnk_ptr;
                chnk_sizes[chnk_count] = offset_transfer_size;
                // util variables
                chnk_ptr += offset_transfer_size;
                chnk_size_left -= offset_transfer_size;
            } else {
                local_offsets[chnk_count] = in.total_chunk_size - chnk_size_left;
                if (in.offset > 0)
                    origin_offsets[chnk_count] = (CHUNKSIZE - in.offset) + ((i - in.chunk_start) - 1) * CHUNKSIZE;
                else
                    origin_offsets[chnk_count] = (i - in.chunk_start) * CHUNKSIZE;
                // last chunk might have different transfer_size
                if (chnk_count == in.chunks - 1)
                    transfer_size = chnk_size_left;
                bulk_buf_ptrs[chnk_count] = chnk_ptr;
                chnk_sizes[chnk_count] = transfer_size;
                // util variables
                chnk_ptr += transfer_size;
                chnk_size_left -= transfer_size;
            }
            chnk_count++;
        }
    }
    // read the data
    err = read_chunks(in.path, in.offset, buf_ptrs, buf_sizes, read_size);

    if (err != 0 || in.size != read_size) {
    err = read_chunks(in.path, bulk_buf_ptrs, chnk_ids, chnk_sizes, in.offset, read_size);
    if (err != 0 || in.total_chunk_size != read_size) {
        out.res = err;
        ADAFS_DATA->spdlogger()->error("{}() Failed to read chunks on path {}", __func__, in.path);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    // get the data on the offset after the ids
    ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, id_size, bulk_handle, id_size,
                              buf_size);
    for (uint64_t chnk_id = 0; chnk_id < chnk_ids.size(); chnk_id++) {
        ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id],
                                  bulk_handle, local_offsets[chnk_id], chnk_sizes[chnk_id]);
        if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed push the data to the client in read operation", __func__);
            ADAFS_DATA->spdlogger()->error(
                    "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}",
                    __func__, chnk_id, in.path, origin_offsets[chnk_id], local_offsets[chnk_id], chnk_sizes[chnk_id]);
            return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
        }

    }

    out.res = 0;
    out.io_size = read_size;

@@ -126,51 +159,97 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
                                   (margo_get_info(handle)->target_id == ADAFS_DATA->host_id()), in.path, bulk_size,
                                   in.offset);


    // set buffer sizes information
    vector<hg_size_t> buf_sizes(segment_count);
    size_t chnk_size = 0;
    size_t id_size = 0;
    for (size_t i = 0; i < segment_count; i++) {
        if (i < segment_count / 2) {
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
            id_size += sizeof(rpc_chnk_id_t);

        } else {
            if (i == segment_count / 2) { // first chunk
                buf_sizes[i] = static_cast<unsigned long>(CHUNKSIZE - in.offset);
            } else if ((chnk_size + CHUNKSIZE + id_size) > bulk_size) // last chunk
                buf_sizes[i] = bulk_size - chnk_size - id_size;
            else
                buf_sizes[i] = CHUNKSIZE;
            chnk_size += buf_sizes[i];
        }
    }
    // array of pointers for bulk transfer (allocated in margo_bulk_create)
    vector<void*> buf_ptrs(segment_count);
    // used for bulk transfer
    void* bulk_buf;
    // used to set pointer to offsets in bulk_buf which correspond to chunks
    vector<char*> bulk_buf_ptrs(in.chunks);
    // create bulk handle and allocated memory for buffer with buf_sizes information
    ret = margo_bulk_create(mid, segment_count, nullptr, buf_sizes.data(), HG_BULK_WRITE_ONLY, &bulk_handle);
    ret = margo_bulk_create(mid, segment_count, nullptr, &in.total_chunk_size, HG_BULK_WRITE_ONLY, &bulk_handle);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    }
    // access the internally allocated memory buffer and put it into buf_ptrs
    uint32_t actual_count;
    ret = margo_bulk_access(bulk_handle, 0, bulk_size, HG_BULK_READWRITE, segment_count, buf_ptrs.data(),
                            buf_sizes.data(), &actual_count);
    uint32_t actual_count; // XXX dont need?
    ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size, HG_BULK_READWRITE, segment_count, &bulk_buf,
                            &in.total_chunk_size, &actual_count);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    // pull data from client here
    ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, bulk_size);
    auto my_id = ADAFS_DATA->host_id();
    // chnk_ids used by this host
    vector<uint64_t> chnk_ids(in.chunks);
    // chnk sizes per chunk for this host
    vector<uint64_t> chnk_sizes(in.chunks);
    // counter to track how many chunks have been assigned
    auto chnk_count = static_cast<uint64_t>(0);
    // how much is left to pull
    auto chnk_size_left = in.total_chunk_size;
    // temporary traveling pointer
    auto chnk_ptr = static_cast<char*>(bulk_buf);
    /*
     * consider the following cases:
     * 1. Very first chunk has offset or not and is serviced by this node
     * 2. If offset, will still be only 1 chunk written (small IO): (offset + bulk_size <= CHUNKSIZE) ? bulk_size
     * 3. If no offset, will only be 1 chunk written (small IO): (bulk_size <= CHUNKSIZE) ? bulk_size
     * 4. Chunks between start and end chunk have size of the CHUNKSIZE
     * 5. Last chunk (if multiple chunks are written): Don't write CHUNKSIZE but chnk_size_left for this destination
     *    Last chunk can also happen if only one chunk is written. This is covered by 2 and 3.
     */
    // get chunk ids that hash to this node
    auto transfer_size = (bulk_size <= CHUNKSIZE) ? bulk_size : CHUNKSIZE;
    uint64_t origin_offset;
    uint64_t local_offset;
    for (auto i = in.chunk_start; i < in.chunk_end || chnk_count < in.chunks; i++) {
        if (get_rpc_node(in.path + fmt::FormatInt(i).str()) == my_id) {
            chnk_ids[chnk_count] = i; // chunk id number
            // offset case
            if (i == in.chunk_start && in.offset > 0) {
                // if only 1 destination and 1 chunk (small write) the transfer_size == bulk_size
                auto offset_transfer_size = (in.offset + bulk_size <= CHUNKSIZE) ? bulk_size : static_cast<size_t>(
                        CHUNKSIZE - in.offset);
                ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0,
                                          bulk_handle, 0, offset_transfer_size);
                if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to pull data from client", __func__);
                    ADAFS_DATA->spdlogger()->error(
                            "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__,
                            i, in.chunk_start, in.chunk_end - 1);
                    return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
                }

                bulk_buf_ptrs[chnk_count] = chnk_ptr;
                chnk_sizes[chnk_count] = offset_transfer_size;
                chnk_ptr += offset_transfer_size;
                chnk_size_left -= offset_transfer_size;
            } else {
                local_offset = in.total_chunk_size - chnk_size_left;
                if (in.offset > 0)
                    origin_offset = (CHUNKSIZE - in.offset) + ((i - in.chunk_start) - 1) * CHUNKSIZE;
                else
                    origin_offset = (i - in.chunk_start) * CHUNKSIZE;
                // last chunk might have different transfer_size
                if (chnk_count == in.chunks - 1)
                    transfer_size = chnk_size_left;
                ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, origin_offset,
                                          bulk_handle, local_offset, transfer_size);
                if (ret != HG_SUCCESS) {
                    ADAFS_DATA->spdlogger()->error(
                            "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__,
                            i, in.chunk_start, in.chunk_end - 1);
                    return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
                }
                bulk_buf_ptrs[chnk_count] = chnk_ptr;
                chnk_sizes[chnk_count] = transfer_size;
                chnk_ptr += transfer_size;
                chnk_size_left -= transfer_size;
            }
            chnk_count++;
        }
    }
    // XXX check that sizes left is 0 as sanity check
    // do write operation if all is good
    out.res = write_chunks(in.path, buf_ptrs, buf_sizes, in.offset, out.io_size);
    out.res = write_chunks(in.path, bulk_buf_ptrs, chnk_ids, chnk_sizes, in.offset, out.io_size);
    if (out.res != 0) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to write data to local disk.");
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
Loading