Commit 818d6896 authored by Marc Vef's avatar Marc Vef
Browse files

read/write threads use an exclusive ES now each

parent 3e11fba3
Loading
Loading
Loading
Loading
+46 −30
Original line number Diff line number Diff line
@@ -152,20 +152,25 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
            dest_ids[recipient].push_back(i);
    }

    auto dest_n = dest_idx.size();
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    ABT_xstream xstream;
    ABT_pool pool;
    auto ret = ABT_xstream_self(&xstream);
    vector<ABT_xstream> xstreams(dest_n);
    vector<ABT_pool> pools(dest_n);
    int ret;
    for(unsigned long i = 0; i < dest_n; i++) {
        ret = ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]);
        if (ret != 0) {
        ld_logger->error("{}() Unable to get self xstream. Is Argobots initialized?", __func__);
            ld_logger->error("{}() Unable to create xstreams, for parallel read", __func__);
            errno = EAGAIN;
            return -1;
        }
    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
        ret = ABT_xstream_get_main_pools(xstreams[i], 1, &pools[i]);
        if (ret != 0) {
        ld_logger->error("{}() Unable to get main pools from ABT xstream", __func__);
            ld_logger->error("{}() Unable to get main pool from xstream", __func__);
            errno = EAGAIN;
            return -1;
        }
    auto dest_n = dest_idx.size();
    }
    vector<ABT_thread> threads(dest_n);
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
@@ -186,7 +191,7 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        ABT_thread_create(pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        ABT_thread_create(pools[i], rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
    }

    for (unsigned long i = 0; i < dest_n; i++) {
@@ -205,8 +210,11 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
        }
        ret = ABT_thread_free(&threads[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to ABT_thread_free()", __func__);
            err = -1;
            ld_logger->warn("{}() Unable to ABT_thread_free()", __func__);
        }
        ret = ABT_xstream_free(&xstreams[i]);
        if (ret != 0) {
            ld_logger->warn("{}() Unable to free xstreams", __func__);
        }
    }
    // XXX check how much we need to deal with the read_size
@@ -249,19 +257,24 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
            dest_ids[recipient].push_back(i);
    }
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    ABT_xstream xstream;
    ABT_pool pool;
    auto ret = ABT_xstream_self(&xstream);
    auto dest_n = dest_idx.size();
    vector<ABT_xstream> xstreams(dest_n);
    vector<ABT_pool> pools(dest_n);
    int ret;
    for(unsigned long i = 0; i < dest_n; i++) {
        ret = ABT_xstream_create(ABT_SCHED_NULL, &xstreams[i]);
        if (ret != 0) {
        ld_logger->error("{}() Unable to get self xstream. Is Argobots initialized?", __func__);
            ld_logger->error("{}() Unable to create xstreams, for parallel read", __func__);
            errno = EAGAIN;
            return -1;
        }
    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
        ret = ABT_xstream_get_main_pools(xstreams[i], 1, &pools[i]);
        if (ret != 0) {
        ld_logger->error("{}() Unable to get main pools from ABT xstream", __func__);
            ld_logger->error("{}() Unable to get main pool from xstream", __func__);
            errno = EAGAIN;
            return -1;
        }
    auto dest_n = dest_idx.size();
    }
    vector<ABT_thread> threads(dest_n);
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct write_args>> thread_args(dest_n);
@@ -284,7 +297,7 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
        thread_args[i] = std::move(args);
        ld_logger->info("{}() Starting thread with recipient {} and chnk_ids_n {}", __func__, dest_idx[i],
                        dest_ids[dest_idx[i]].size());
        ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        ABT_thread_create(pools[i], rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
    }

    for (unsigned long i = 0; i < dest_n; i++) {
@@ -303,8 +316,11 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
        }
        ret = ABT_thread_free(&threads[i]);
        if (ret != 0) {
            ld_logger->error("{}() Unable to ABT_thread_free()", __func__);
            return -1;
            ld_logger->warn("{}() Unable to ABT_thread_free()", __func__);
        }
        ret = ABT_xstream_free(&xstreams[i]);
        if (ret != 0) {
            ld_logger->warn("{}() Unable to free xstreams", __func__);
        }
    }
    return write_size;