Commit 69b26ab6 authored by Marc Vef's avatar Marc Vef
Browse files

Adding ABT_eventual to return written size per thread

parent d5dce0e6
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -23,7 +23,7 @@ struct write_args {
    bool append;
    off_t updated_size;
    std::vector<unsigned long>& chnk_ids;
    size_t write_size;
    ABT_eventual* eventual;
};

void rpc_send_write_abt(void* _arg);
+13 −12
Original line number Diff line number Diff line
@@ -229,7 +229,6 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
        auto adafs_fd = file_map.get(fd);
        auto path = adafs_fd->path();
        auto append_flag = adafs_fd->append_flag();
        size_t write_size;// XXX use after abt eventual is used
        int err = 0;
        long updated_size = 0;

@@ -261,8 +260,10 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
        // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
        auto dest_n = dest_ids.size();
        vector<ABT_thread> threads(dest_n);
        vector<ABT_eventual> eventuals(dest_n);
        vector<struct write_args*> thread_args(dest_n);
        for (unsigned long i = 0; i < dest_n; i++) {
            ABT_eventual_create(sizeof(size_t), &eventuals[i]);
            struct write_args args = {
                    path, // path
                    count, // total size to write
@@ -271,16 +272,21 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
                    append_flag, // append flag when file was opened
                    updated_size, // for append truncate TODO needed?
                    dest_ids[i], // pointer to list of chunk ids that all go to the same destination
                    0, // out: actual written size
                    &eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
            };
            thread_args[i] = &args;
            ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        }
        // yield to one of the threads
//        ABT_thread_yield_to(threads[0]);
        // TODO we need abt_eventual for it to be asynchronous and to be able to return us the written size
        // TODO Errorhandling should also take place here after we implement with abt eventual
        auto write_size_total = static_cast<size_t>(0);
        for (unsigned long i = 0; i < dest_n; i++) {
            size_t* thread_ret_size;
            ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
            if (thread_ret_size == nullptr || *thread_ret_size == 0) {
                // TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted
                ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i);
            } else
                write_size_total += *thread_ret_size;
            ABT_eventual_free(&eventuals[i]);
            ret = ABT_thread_join(threads[i]);
            if (ret != 0) {
                ld_logger->error("{}() Unable to ABT_thread_join()", __func__);
@@ -292,12 +298,7 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
                return -1;
            }
        }
//        if (err != 0) {
//            ld_logger->error("{}() write failed", __func__);
//            return 0;
//        }
//        return write_size;
        return count;
        return write_size_total;
    }
    return (reinterpret_cast<decltype(&pwrite)>(libc_pwrite))(fd, buf, count, offset);
}
+7 −1
Original line number Diff line number Diff line
@@ -77,7 +77,13 @@ void rpc_send_write_abt(void* _arg) {
        /* decode response */
        ret = margo_get_output(handle, &out);
        err = out.res;
        arg->write_size = static_cast<size_t>(out.io_size);
        size_t write_size;
        if (err != 0)
            write_size = static_cast<size_t>(0);
        else
            write_size = static_cast<size_t>(out.io_size);
        // Signal calling process that RPC is finished and put written size into return value
        ABT_eventual_set(*(arg->eventual), &write_size, sizeof(write_size));
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        /* clean up resources consumed by this rpc */
        margo_bulk_free(in.bulk_handle);
+8 −6
Original line number Diff line number Diff line
@@ -102,12 +102,13 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    for (size_t i = 0; i < segment_count; i++) {
        if (i % 2 == 0)
            buf_ptrs[i] = new rpc_chnk_id_t;
        else
        else {
            // On a local operation the buffers are allocated in the client on the same node.
            // Hence no memory allocation is necessary
            if (!local_write)
                buf_ptrs[i] = new char[buf_sizes[i]];
        }
    }
    // If local operation the data does not need to be transferred. We just need access to the data ptrs
    if (local_write) {
        uint32_t actual_count;
@@ -157,11 +158,12 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    for (size_t i = 0; i < segment_count; i++) {
        if (i % 2 == 0)
            delete static_cast<rpc_chnk_id_t*>(buf_ptrs[i]);
        else
        else {
            // On a local operation the data is owned by the client who is responsible to free its buffers
            if (!local_write)
                delete[] static_cast<char*>(buf_ptrs[i]);
        }
    }
    return HG_SUCCESS;
}