Commit 8f4a2424 authored by Marc Vef's avatar Marc Vef
Browse files

Rewrite of pread() for clarity and bug fixing, adding offset support for read ops

parent 977f054f
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id,
int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                 const off_t offset, size_t& write_size);

int read_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                size_t& read_size);
int read_chunks(const std::string& path, const off_t offset, const std::vector<void*>& buf_ptrs,
                const std::vector<hg_size_t>& buf_sizes, size_t& read_size);

#endif //IFS_DATA_HPP
+1 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ struct read_args {
    size_t in_size;
    off_t in_offset;
    void* buf;
    size_t chnk_start;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual* eventual;
+4 −3
Original line number Diff line number Diff line
@@ -157,7 +157,8 @@ int read_file(const string& path, const rpc_chnk_id_t chnk_id, const size_t size
    return 0;
}

int read_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_size_t>& buf_sizes,
int read_chunks(const string& path, const off_t offset, const vector<void*>& buf_ptrs,
                const vector<hg_size_t>& buf_sizes,
                size_t& read_size) {
    read_size = 0;
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
@@ -168,8 +169,8 @@ int read_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<
        auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]);
        auto chnk_size = buf_sizes[i + chnk_n];
        size_t read_chnk_size;
        // TODO append
        if (read_file(path, chnk_id, chnk_size, 0, chnk_ptr, read_chnk_size) != 0) {
        // read_file but only first chunk can have an offset
        if (read_file(path, chnk_id, chnk_size, (i == 0) ? offset : 0, chnk_ptr, read_chnk_size) != 0) {
            // TODO How do we handle errors?
            ADAFS_DATA->spdlogger()->error("{}() read chunk failed with path {} and id {}. Aborting ...", __func__,
                                           path, chnk_id);
+17 −11
Original line number Diff line number Diff line
@@ -43,19 +43,19 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {
    auto err = 0;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    auto chunk_n = static_cast<size_t>(ceil(
            count / static_cast<float>(CHUNKSIZE))); // get number of chunks needed for writing
    auto chnk_id_start = offset / CHUNKSIZE;
    auto chnk_start = static_cast<size_t>(offset) / CHUNKSIZE; // first chunk number
    auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
    if ((offset + count) % CHUNKSIZE == 0)
        chnk_end--;
    vector<unsigned long> dest_idx{}; // contains the recipient ids, used to access the dest_ids map
    map<unsigned long, vector<unsigned long>> dest_ids{}; // contains the chnk ids (value list) per recipient (key)
    for (unsigned long i = 0; i < chunk_n; i++) {
        auto chnk_id = i + chnk_id_start;
        auto recipient = get_rpc_node(*path + fmt::FormatInt(chnk_id).str());
    for (unsigned long i = chnk_start; i < chnk_end; i++) {
        auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<unsigned long>{chnk_id}));
            dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
            dest_idx.push_back(recipient);
        } else
            dest_ids[recipient].push_back(chnk_id);
            dest_ids[recipient].push_back(i);
    }

    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
@@ -77,12 +77,18 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
    for (unsigned long i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);

        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= offset % CHUNKSIZE;
        if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
        auto args = make_unique<read_args>();
        args->path = path;
        args->in_size = count;// total size to read
        args->in_offset = (i == 0) ? offset % CHUNKSIZE : 0;// reading offset only for the first chunk
        args->in_size = total_chunk_size;// total size to read
        args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk
        args->buf = buf;
        args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination
        args->chnk_start = chnk_start;
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
+19 −9
Original line number Diff line number Diff line
@@ -124,22 +124,32 @@ void rpc_send_read_abt(void* _arg) {
    auto chnk_ids = *arg->chnk_ids;
    vector<char*> chnks(recipient_size);
    vector<size_t> buf_sizes(recipient_size * 2);

    auto buf_size = 0; // counter for how much of the buffer is already mapped into chunks
    size_t chunk_offset = 0;
    // if the first chunk is not the very first chunk in the buffer, the previous chunksizes have to be set as an offset
    if (chnk_ids[0] != arg->chnk_start)
        chunk_offset = ((chnk_ids[0] - arg->chnk_start) * CHUNKSIZE) - arg->in_offset;
    for (size_t i = 0; i < buf_sizes.size(); i++) {
        // even numbers contain the sizes of ids, while uneven contain the chunksize
        if (i < buf_sizes.size() / 2)
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
        else {
            if (i / 2 == 0) { // First chunk might have an offset
                buf_sizes[i] = CHUNKSIZE - static_cast<unsigned long>(arg->in_offset);
            } else if (i + 1 == buf_sizes.size()) {// if current chunk size is last chunk
                // the last chunk will have the rest of the size, i.e., write size - all applied chunk sizes
                buf_sizes[i] = arg->in_size - (chnk_ids[i / 2] * CHUNKSIZE);
            if (i == buf_sizes.size() / 2) { // first chunk which might have an offset
                if (arg->in_size + arg->in_offset < CHUNKSIZE)
                    buf_sizes[i] = static_cast<size_t>(arg->in_size);
                else if (chunk_offset == 0) // if the first chunk is the very first chunk in the buffer
                    buf_sizes[i] = static_cast<size_t>(CHUNKSIZE - arg->in_offset);
                else
                    buf_sizes[i] = CHUNKSIZE;
            } else if (i + 1 == buf_sizes.size()) {// last chunk has remaining size
                buf_sizes[i] = arg->in_size - buf_size;
            } else {
                buf_sizes[i] = CHUNKSIZE;
            }
            // position the pointer according to the chunk number
            chnks[i - chnks.size()] = static_cast<char*>(arg->buf) + (CHUNKSIZE * chnk_ids[i - chnk_ids.size()]);

            // position the pointer according to the chunk number this code is executed for the second chunk+
            chnks[i - chnks.size()] = static_cast<char*>(const_cast<void*>(arg->buf)) + chunk_offset + buf_size;
            buf_size += buf_sizes[i];
        }
    }
    // setting pointers to the ids and to the chunks
@@ -161,7 +171,7 @@ void rpc_send_read_abt(void* _arg) {
    // fill in
    in.path = arg->path->c_str();
    in.size = arg->in_size;
    in.offset = arg->in_offset;
    in.offset = (chunk_offset == 0) ? arg->in_offset : 0;

    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, svr_addr, false);

Loading