Verified Commit bbf31409 authored by Marc Vef's avatar Marc Vef
Browse files

Preload: Use dedicated thread pool to drive IO related RPCs

Previously, an ES was created for each destination in each read or write.
Creating and freeing these ES is costly. Also, these ESs had separate pools,
which is bad practice. Multiple ESs work on a single pool. The ABT_sched
decides which ULT or Tasklet is run by which ES at any time.

This change does also reduce CPU load as library IO RPC calls used CPU time
while waiting for the response.
parent 73506a6b
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -47,8 +47,9 @@

// Margo configuration

// Number of threads used for concurrent I/O
// Number of threads used for concurrent I/O in the daemon and preload library per process
#define IO_THREADS 8
#define IO_LIBRARY_THREADS 8
// Number of threads used for RPC and IPC handlers at the daemon
#define RPC_HANDLER_THREADS 8
#define IPC_HANDLER_THREADS 8
+3 −0
Original line number Diff line number Diff line
@@ -103,6 +103,9 @@ typedef lru11::Cache<uint64_t, hg_addr_t> KVCache;
extern KVCache rpc_address_cache;
// file descriptor index validation flag
extern std::atomic<bool> fd_validation_needed;
// thread pool
extern ABT_pool io_pool;
extern std::vector<ABT_xstream> io_streams;

// function definitions

+5 −52
Original line number Diff line number Diff line
@@ -4,7 +4,6 @@

using namespace std;


int adafs_open(const std::string& path, mode_t mode, int flags) {
    init_ld_env_if_needed();
    auto err = 1;
@@ -187,24 +186,6 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
    }

    auto dest_n = dest_idx.size();
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    vector<ABT_xstream> xstreams(dest_n);
    vector<ABT_pool> pools(dest_n);
    int ret;
    for(unsigned long i = 0; i < dest_n; i++) {
        ret = ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to create xstreams, for parallel read", __func__);
            errno = EAGAIN;
            return -1;
        }
        ret = ABT_xstream_get_main_pools(xstreams[i], 1, &pools[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to get main pool from xstream", __func__);
            errno = EAGAIN;
            return -1;
        }
    }
    vector<ABT_thread> threads(dest_n);
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
@@ -225,7 +206,7 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        ABT_thread_create(pools[i], rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
    }

    for (unsigned long i = 0; i < dest_n; i++) {
@@ -237,7 +218,7 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
        } else
            read_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
        ret = ABT_thread_join(threads[i]);
        auto ret = ABT_thread_join(threads[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to ABT_thread_join()", __func__);
            err = -1;
@@ -246,10 +227,6 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
        if (ret != 0) {
            ld_logger->warn("{}() Unable to ABT_thread_free()", __func__);
        }
        ret = ABT_xstream_free(&xstreams[i]);
        if (ret != 0) {
            ld_logger->warn("{}() Unable to free xstreams", __func__);
        }
    }
    // XXX check how much we need to deal with the read_size
    // XXX check that we don't try to read past end of the file
@@ -292,23 +269,6 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
    }
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    auto dest_n = dest_idx.size();
    vector<ABT_xstream> xstreams(dest_n);
    vector<ABT_pool> pools(dest_n);
    int ret;
    for(unsigned long i = 0; i < dest_n; i++) {
        ret = ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to create xstreams, for parallel read", __func__);
            errno = EAGAIN;
            return -1;
        }
        ret = ABT_xstream_get_main_pools(xstreams[i], 1, &pools[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to get main pool from xstream", __func__);
            errno = EAGAIN;
            return -1;
        }
    }
    vector<ABT_thread> threads(dest_n);
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct write_args>> thread_args(dest_n);
@@ -329,9 +289,9 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        ABT_thread_create(pools[i], rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
    }

    // Sum written sizes
    for (unsigned long i = 0; i < dest_n; i++) {
        size_t* thread_ret_size;
        ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
@@ -341,7 +301,7 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
        } else
            write_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
        ret = ABT_thread_join(threads[i]);
        auto ret = ABT_thread_join(threads[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to ABT_thread_join()", __func__);
            return -1;
@@ -350,13 +310,6 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
        if (ret != 0) {
            ld_logger->warn("{}() Unable to ABT_thread_free()", __func__);
        }
        ret = ABT_xstream_free(&xstreams[i]);
        if (ret != 0) {
            ld_logger->warn("{}() Unable to free xstreams", __func__);
        }
    }
//    ld_logger->info("{}() destn {} count {} offset {} updated size {} written size {} XXX", __func__, dest_n, count,
//                     offset, updated_size, write_size);

    return write_size;
}
 No newline at end of file
+16 −0
Original line number Diff line number Diff line
@@ -46,6 +46,9 @@ margo_instance_id ld_margo_rpc_id;
KVCache rpc_address_cache{32768, 4096}; // XXX Set values are not based on anything...
// local daemon IPC address
hg_addr_t daemon_svr_addr = HG_ADDR_NULL;
// IO RPC driver
ABT_pool io_pool;
std::vector<ABT_xstream> io_streams;

/**
 * Initializes the Argobots environment
@@ -74,6 +77,14 @@ bool init_ld_argobots() {
     * See for reference: https://xgitlab.cels.anl.gov/sds/margo/issues/40
     */
    putenv(const_cast<char*>("ABT_MEM_MAX_NUM_STACKS=8"));
    // Creating pool for driving IO RPCs
    vector<ABT_xstream> io_streams_tmp(IO_LIBRARY_THREADS);
    argo_err = ABT_snoozer_xstream_create(IO_LIBRARY_THREADS, &io_pool, io_streams_tmp.data());
    if (argo_err != ABT_SUCCESS) {
        ld_logger->error("{}() ABT_snoozer_xstream_create()  (client)", __func__);
        return false;
    }
    io_streams = io_streams_tmp;
    ld_logger->debug("{}() Argobots initialization successful.", __func__);
    return true;
}
@@ -277,6 +288,11 @@ void destroy_preload() {
        margo_diag_dump(ld_margo_rpc_id, "-", 0);
    }
#endif
    for (auto& io_stream : io_streams) {
        ABT_xstream_join(io_stream);
        ABT_xstream_free(&io_stream);
    }
    ld_logger->debug("{}() Freeing IO execution streams successful", __func__);
    // Shut down RPC client if used
    if (ld_margo_rpc_id != nullptr) {
        // free all rpc addresses in LRU map and finalize margo rpc