Commit f8bf573f authored by Marc Vef's avatar Marc Vef
Browse files

Unfinished offset handling for write

parent 0521bbb6
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -17,7 +17,7 @@ int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id,
               bool append, off_t updated_size, size_t& write_size);

int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                 size_t& write_size);
                 const off_t offset, size_t& write_size);

int read_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                size_t& read_size);
+12 −5
Original line number Diff line number Diff line
@@ -94,16 +94,19 @@ int write_file(const string& path, const char* buf, const rpc_chnk_id_t chnk_id,
        // Metadata was already updated by the client before the write operation
        // truncating file
        truncate(chnk_path.c_str(), updated_size);
    } else {
        truncate(chnk_path.c_str(), size); // file is rewritten, thus, only written size is kept
    }
//    else {
//        truncate(chnk_path.c_str(), size); // file is rewritten, thus, only written size is kept
//    }

    return 0;
}

int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_size_t>& buf_sizes,
int
write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_size_t>& buf_sizes, const off_t offset,
             size_t& write_size) {
    write_size = 0;
    auto err = static_cast<int>(0);
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
    auto chnk_n = buf_sizes.size() / 2;
    // TODO this can be parallized
@@ -113,7 +116,11 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector
        auto chnk_size = buf_sizes[i + chnk_n];
        size_t written_chnk_size;
        // TODO 5,6,7 params (append and offset stuff)
        if (write_file(path, chnk_ptr, chnk_id, chnk_size, 0, false, 0, written_chnk_size) != 0) {
        if (i == 0) // only the first chunk gets the offset. the chunks are sorted on the client site
            err = write_file(path, chnk_ptr, chnk_id, chnk_size, offset, false, 0, written_chnk_size);
        else
            err = write_file(path, chnk_ptr, chnk_id, chnk_size, 0, false, 0, written_chnk_size);
        if (err != 0) {
            // TODO How do we handle already written chunks? Ideally, we would need to remove them after failure.
            ADAFS_DATA->spdlogger()->error("{}() Writing chunk failed with path {} and id {}. Aborting ...", __func__,
                                           path, chnk_id);
+17 −14
Original line number Diff line number Diff line
@@ -240,9 +240,20 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
            return 0; // ERR
        }

        // started here // TODO handle offset
        auto chunk_n = static_cast<size_t>(ceil(
                count / static_cast<float>(CHUNKSIZE))); // get number of chunks needed for writing
        // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
        map<unsigned long, vector<unsigned long>> dest_ids{};
        vector<unsigned long> dest_idx{}; // contains the recipient ids, used to access the dest_ids map
        for (unsigned long i = 0; i < chunk_n; i++) {
            auto recipient = get_rpc_node(path + fmt::FormatInt(i).str());
            if (dest_ids.count(recipient) == 0) {
                dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
                dest_idx.push_back(recipient);
            } else
                dest_ids[recipient].push_back(i);
        }
        // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
        ABT_xstream xstream;
        ABT_pool pool;
        auto ret = ABT_xstream_self(&xstream);
@@ -255,17 +266,7 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
            ld_logger->error("{}() Unable to get main pools from ABT xstream", __func__);
            return -1;
        }
        // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
        map<unsigned long, vector<unsigned long>> dest_ids{};
        for (unsigned long i = 0; i < chunk_n; i++) {
            auto recipient = get_rpc_node(path + fmt::FormatInt(i).str());
            if (dest_ids.count(recipient) == 0)
                dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
            else
                dest_ids[recipient].push_back(i);
        }
        // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
        auto dest_n = dest_ids.size();
        auto dest_n = dest_idx.size();
        vector<ABT_thread> threads(dest_n);
        vector<ABT_eventual> eventuals(dest_n);
        vector<struct write_args*> thread_args(dest_n);
@@ -274,13 +275,15 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
            struct write_args args = {
                    path, // path
                    count, // total size to write
                    offset, // writing offset
                    0, // writing offset only relevant for the first chunk that is written
                    buf, // pointer to write buffer
                    append_flag, // append flag when file was opened
                    updated_size, // for append truncate TODO needed?
                    dest_ids[i], // pointer to list of chunk ids that all go to the same destination
                    dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
                    &eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
            };
            if (i == 0)
                args.in_offset = offset % CHUNKSIZE;
            thread_args[i] = &args;
            ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        }
+1 −1
Original line number Diff line number Diff line
@@ -225,7 +225,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    }

    // do write operation if all is good
    out.res = write_chunks(in.path, buf_ptrs, buf_sizes, out.io_size);
    out.res = write_chunks(in.path, buf_ptrs, buf_sizes, in.offset, out.io_size);
    if (out.res != 0) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to write data to local disk.");
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
+4 −4

File changed.

Preview size limit exceeded, changes collapsed.