Commit 977f054f authored by Marc Vef's avatar Marc Vef
Browse files

Rewrite of pwrite() for clarity and bug fixing

parent 4cfad336
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -173,8 +173,7 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
        auto args = make_unique<write_args>();
        args->path = path; // path
        args->in_size = total_chunk_size; // total size to write
        args->in_offset = (i == 0) ? offset % CHUNKSIZE
                                   : 0;// first offset in dest_idx is the chunk with a potential offset
        args->in_offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        args->buf = buf;// pointer to write buffer
        args->chnk_start = chnk_start;// append flag when file was opened
        args->updated_size = updated_size;// for append truncate TODO needed?
@@ -182,6 +181,8 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        ld_logger->info("{}() Starting thread with recipient {} and chnk_ids_n {}", __func__, dest_idx[i],
                        dest_ids[dest_idx[i]].size());
        ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
    }

+15 −18
Original line number Diff line number Diff line
@@ -11,36 +11,36 @@ using namespace std;
void rpc_send_write_abt(void* _arg) {
    auto* arg = static_cast<struct write_args*>(_arg);

    ld_logger->info("{}() recipient {}", __func__, arg->recipient);
    auto recipient_size = arg->chnk_ids->size();
    auto chnk_ids = *arg->chnk_ids;
    vector<char*> chnks(recipient_size);
    vector<size_t> buf_sizes(recipient_size * 2);
    auto buf_size = 0; // counter for how much of the buffer is already mapped into chunks
    size_t chunk_offset = 0;
    // if the first chunk is not the very first chunk in the buffer, the previous chunksizes have to be set as an offset
    if (chnk_ids[0] != arg->chnk_start)
        chunk_offset = ((chnk_ids[0] - arg->chnk_start) * CHUNKSIZE) - arg->in_offset;
    for (size_t i = 0; i < buf_sizes.size(); i++) {
        // even numbers contain the sizes of ids, while uneven contain the chunksize
        if (i < buf_sizes.size() / 2)
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
        else {
            if (i == buf_sizes.size() / 2) { // first chunk which might have an offset
                if (arg->in_size + arg->in_offset > CHUNKSIZE)
                if (arg->in_size < CHUNKSIZE)
                    buf_sizes[i] = static_cast<size_t>(arg->in_size);
                else if (chunk_offset == 0) // if the first chunk is the very first chunk in the buffer
                    buf_sizes[i] = static_cast<size_t>(CHUNKSIZE - arg->in_offset);
                else
                    buf_sizes[i] = static_cast<size_t>(arg->in_size);

                chnks[i - chnks.size()] =
                        static_cast<char*>(const_cast<void*>(arg->buf)) +
                        (CHUNKSIZE * (chnk_ids[i - chnk_ids.size()] - arg->chnk_start));
                continue;
                    buf_sizes[i] = CHUNKSIZE;
            } else if (i + 1 == buf_sizes.size()) {// last chunk has remaining size
                buf_sizes[i] =
                        arg->in_size - (((chnk_ids[i - chnks.size()] - arg->chnk_start) * CHUNKSIZE) - arg->in_offset);
                buf_sizes[i] = arg->in_size - buf_size;
            } else {
                buf_sizes[i] = CHUNKSIZE;
            }
            // position the pointer according to the chunk number
            chnks[i - chnks.size()] =
                    static_cast<char*>(const_cast<void*>(arg->buf)) +
                    ((CHUNKSIZE * (chnk_ids[i - chnk_ids.size()] - arg->chnk_start)) - arg->in_offset);

            // position the pointer according to the chunk number this code is executed for the second chunk+
            chnks[i - chnks.size()] = static_cast<char*>(const_cast<void*>(arg->buf)) + chunk_offset + buf_size;
            buf_size += buf_sizes[i];
        }
    }
    // setting pointers to the ids and to the chunks
@@ -51,7 +51,6 @@ void rpc_send_write_abt(void* _arg) {
        else // data pointer
            buf_ptrs[i] = chnks[i - chnk_ids.size()];
    }

    // RPC
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
@@ -63,14 +62,12 @@ void rpc_send_write_abt(void* _arg) {
    // fill in
    arg->path->c_str();
    in.path = arg->path->c_str();
    in.offset = arg->in_offset;
    in.offset = (chunk_offset == 0) ? arg->in_offset : 0;
    in.updated_size = arg->updated_size;
    in.append = HG_FALSE; // unused


    margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->recipient, handle, svr_addr, false);


    auto used_mid = margo_hg_handle_get_instance(handle);

    /* register local target buffer for bulk access */