Verified Commit 710cbe92 authored by Marc Vef's avatar Marc Vef
Browse files

Preload: Removed barriers in IO path, use asynchronous RPC sends in IO path

We need to be explicit about I/O as it cannot be simply repeated. A response
from the daemon will come eventually, either success or failure.
parent 47240619
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -25,7 +25,6 @@ struct write_args {
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
    ABT_barrier barrier;
};

struct read_args {
@@ -37,7 +36,6 @@ struct read_args {
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
    ABT_barrier barrier;
};

void rpc_send_write_abt(void* _arg);
+0 −8
Original line number Diff line number Diff line
@@ -188,8 +188,6 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
    auto dest_n = static_cast<unsigned int>(dest_idx.size());
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
    ABT_barrier barrier;
    ABT_barrier_create(dest_n, &barrier);
    for (unsigned int i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
@@ -206,7 +204,6 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
        args->chnk_start = chnk_start;
        args->recipient = dest_idx[i];// recipient
        args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        args->barrier = barrier;
        thread_args[i] = std::move(args);
        // Threads are implicitly released once calling function finishes
        ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
@@ -222,7 +219,6 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
            read_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
    }
    ABT_barrier_free(&barrier);
    // XXX check how much we need to deal with the read_size
    // XXX check that we don't try to read past end of the file
    return err == 0 ? read_size : 0;
@@ -266,8 +262,6 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
    auto dest_n = static_cast<unsigned int>(dest_idx.size());
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct write_args>> thread_args(dest_n);
    ABT_barrier barrier;
    ABT_barrier_create(dest_n, &barrier);
    for (unsigned int i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
@@ -284,7 +278,6 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
        args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination
        args->recipient = dest_idx[i];// recipient
        args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        args->barrier = barrier;
        thread_args[i] = std::move(args);
        ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
    }
@@ -299,6 +292,5 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
            write_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
    }
    ABT_barrier_free(&barrier);
    return write_size;
}
 No newline at end of file
+19 −21
Original line number Diff line number Diff line
@@ -78,16 +78,17 @@ void rpc_send_write_abt(void* _arg) {
        return;
    }

    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
        margo_request req;
        ret = margo_iforward(handle, &in, &req);
        if (ret == HG_SUCCESS) {
            // Wait for the RPC response.
            // This will call eventual_wait internally causing the calling ULT to be BLOCKED and implicitly yields
            margo_wait(req);
            break;
        }
    }
    if (send_ret == HG_SUCCESS) {
        // Make sure all ULTs from a write()'s chunk destination send their request, This yields to the scheduler
        ABT_barrier_wait(arg->barrier);
    if (ret == HG_SUCCESS) {
        /* decode response */
        ret = margo_get_output(handle, &out);
        if (ret != HG_SUCCESS) {
@@ -101,6 +102,8 @@ void rpc_send_write_abt(void* _arg) {
        else
            write_size = static_cast<size_t>(out.io_size);
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        // Signal calling process that RPC is finished and put written size into return value
        ABT_eventual_set(arg->eventual, &write_size, sizeof(write_size));
        /* clean up resources consumed by this rpc */
        margo_bulk_free(in.bulk_handle);
        margo_free_output(handle, &out);
@@ -109,8 +112,6 @@ void rpc_send_write_abt(void* _arg) {
        ABT_eventual_set(arg->eventual, &write_size, sizeof(write_size));
        return;
    }
    // Signal calling process that RPC is finished and put written size into return value
    ABT_eventual_set(arg->eventual, &write_size, sizeof(write_size));
    margo_destroy(handle);
}

@@ -163,7 +164,6 @@ void rpc_send_read_abt(void* _arg) {
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_read_data_in_t in{};
    rpc_data_out_t out{};
//    int err; // XXX
    hg_return_t ret;
    auto read_size = static_cast<size_t>(0);
    // fill in
@@ -182,17 +182,18 @@ void rpc_send_read_abt(void* _arg) {
        ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
        return;
    }

    int send_ret = HG_FALSE;
    // Send RPC and wait for response
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
        margo_request req;
        ret = margo_iforward(handle, &in, &req);
        if (ret == HG_SUCCESS) {
            // Wait for the RPC response.
            // This will call eventual_wait internally causing the calling ULT to be BLOCKED and implicitly yields
            ret = margo_wait(req);
            break;
        }
    }
    if (send_ret == HG_SUCCESS) {
        // Make sure all ULTs from a write()'s chunk destination send their request, This yields to the scheduler
        ABT_barrier_wait(arg->barrier);
    if (ret == HG_SUCCESS) {
        /* decode response */
        ret = margo_get_output(handle, &out);
        if (ret != HG_SUCCESS) {
@@ -201,8 +202,9 @@ void rpc_send_read_abt(void* _arg) {
            return;
        }
        read_size = static_cast<size_t>(out.io_size);
//        err = out.res;
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        // Signal calling process that RPC is finished and put read size into return value
        ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
        /* clean up resources consumed by this rpc */
        margo_bulk_free(in.bulk_handle);
        margo_free_output(handle, &out);
@@ -210,10 +212,6 @@ void rpc_send_read_abt(void* _arg) {
        ld_logger->warn("{}() timed out", __func__);
        ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
        return;
//        err = EAGAIN;
    }
    // Signal calling process that RPC is finished and put read size into return value
    ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));

    margo_destroy(handle);
}
 No newline at end of file