Commit 585c49d2 authored by Marc Vef's avatar Marc Vef
Browse files

Preload: Remove IO pool and ESs, cleanup

parent d268de9d
Loading
Loading
Loading
Loading
+0 −5
Original line number Diff line number Diff line
@@ -52,11 +52,6 @@
 * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler
 */
#define DAEMON_IO_XSTREAMS 8
/*
 * Sets the number of concurrent progress for sending I/O related RPCs to daemons
 * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler
 */
#define PRELOAD_IORPC_XSTREAMS 8
// Number of threads used for RPC and IPC handlers at the daemon
#define DAEMON_RPC_HANDLER_XSTREAMS 8
#define DAEMON_IPC_HANDLER_XSTREAMS 8
+0 −3
Original line number Diff line number Diff line
@@ -102,9 +102,6 @@ extern std::map<uint64_t, hg_addr_t> rpc_address_cache;
extern ABT_mutex rpc_address_cache_mutex;
// file descriptor index validation flag
extern std::atomic<bool> fd_validation_needed;
// thread pool
extern ABT_pool io_pool;
extern std::vector<ABT_xstream> io_streams;

// function definitions

+0 −30
Original line number Diff line number Diff line
@@ -15,39 +15,9 @@ extern "C" {

#include <iostream>

struct write_args {
    std::shared_ptr<std::string> path;
    size_t total_chunk_size;
    size_t in_size;
    off_t in_offset;
    const void* buf;
    size_t chnk_start;
    size_t chnk_end;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
};

struct read_args {
    std::shared_ptr<std::string> path;
    size_t total_chunk_size;
    size_t in_size;
    off_t in_offset;
    void* buf;
    size_t chnk_start;
    size_t chnk_end;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
};

ssize_t rpc_send_write(const std::string& path, const void* buf, const bool append_flag, const off64_t in_offset,
                       const size_t write_size, const int64_t updated_metadentry_size);

ssize_t rpc_send_read(const std::string& path, void* buf, const off64_t offset, const size_t read_size);

void rpc_send_write_abt(void* _arg);

void rpc_send_read_abt(void* _arg);

#endif //IFS_PRELOAD_C_DATA_WS_HPP
+2 −17
Original line number Diff line number Diff line
@@ -47,9 +47,6 @@ std::map<uint64_t, hg_addr_t> rpc_address_cache;
ABT_mutex rpc_address_cache_mutex;
// local daemon IPC address
hg_addr_t daemon_svr_addr = HG_ADDR_NULL;
// IO RPC driver
ABT_pool io_pool;
std::vector<ABT_xstream> io_streams;

/**
 * Initializes the Argobots environment
@@ -78,14 +75,6 @@ bool init_ld_argobots() {
     * See for reference: https://xgitlab.cels.anl.gov/sds/margo/issues/40
     */
    putenv(const_cast<char*>("ABT_MEM_MAX_NUM_STACKS=8"));
    // Creating pool for driving IO RPCs
    vector<ABT_xstream> io_streams_tmp(PRELOAD_IORPC_XSTREAMS);
    argo_err = ABT_snoozer_xstream_create(PRELOAD_IORPC_XSTREAMS, &io_pool, io_streams_tmp.data());
    if (argo_err != ABT_SUCCESS) {
        ld_logger->error("{}() ABT_snoozer_xstream_create()  (client)", __func__);
        return false;
    }
    io_streams = io_streams_tmp;
    ld_logger->debug("{}() Argobots initialization successful.", __func__);
    return true;
}
@@ -295,10 +284,6 @@ void destroy_preload() {
#endif
    if (services_used) {
        ld_logger->debug("{}() Freeing ABT constructs ...", __func__);
        for (auto& io_stream : io_streams) {
            ABT_xstream_join(io_stream);
            ABT_xstream_free(&io_stream);
        }
        ABT_mutex_free(&rpc_address_cache_mutex);
        ld_logger->debug("{}() Freeing ABT constructs successful", __func__);
    }
@@ -313,7 +298,7 @@ void destroy_preload() {
            }
        }
        ld_logger->debug("{}() About to finalize the margo RPC client. Actually not doing it XXX", __func__);
        margo_finalize(ld_margo_rpc_id);
//        margo_finalize(ld_margo_rpc_id); // XXX Sometimes this hangs on the cluster. Investigate
        ld_logger->debug("{}() Shut down Margo RPC client successful", __func__);
    }
    // Shut down IPC client if used
@@ -322,7 +307,7 @@ void destroy_preload() {
        if (margo_addr_free(ld_margo_ipc_id, daemon_svr_addr) != HG_SUCCESS)
            ld_logger->warn("{}() Unable to free IPC client's daemon svr address.", __func__);
        ld_logger->debug("{}() About to finalize the margo IPC client. Actually not doing it XXX", __func__);
        margo_finalize(ld_margo_ipc_id);
//        margo_finalize(ld_margo_ipc_id);
        ld_logger->debug("{}() Shut down Margo IPC client successful", __func__);
    }
    if (services_used) {
+0 −137
Original line number Diff line number Diff line
@@ -66,7 +66,6 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
            total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE));

        // RPC
        hg_addr_t svr_addr = HG_ADDR_NULL;
        auto chnk_ids = &dest_ids[dest_idx[i]];
        rpc_write_data_in_t in{};
        // fill in
@@ -181,7 +180,6 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
            total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE));

        // RPC
        hg_addr_t svr_addr = HG_ADDR_NULL;
        auto chnk_ids = &dest_ids[dest_idx[i]];
        rpc_read_data_in_t in{};
        // fill in
@@ -242,138 +240,3 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
    margo_bulk_free(ipc_bulk_handle);
    return (err < 0) ? err : out_size;
}
 No newline at end of file

/**
 * Called by an argobots thread in pwrite() and sends all chunks that go to the same destination at once
 * @param _arg <struct write_args*>
 */
void rpc_send_write_abt(void* _arg) {
    // Unpack
    auto* arg = static_cast<struct write_args*>(_arg);
    auto chnk_ids = *arg->chnk_ids;
    // RPC
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_write_data_in_t in{};
    rpc_data_out_t out{};
    hg_return_t ret;
    auto write_size = static_cast<size_t>(0);
    // fill in
    in.path = arg->path->c_str();
    in.offset = arg->in_offset;
    in.chunk_n = chnk_ids.size();
    in.chunk_start = arg->chnk_start;
    in.chunk_end = arg->chnk_end;
    in.total_chunk_size = arg->total_chunk_size;

    margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->recipient, handle, false);

    auto used_mid = margo_hg_handle_get_instance(handle);

    // register local target buffer for bulk access
    auto bulk_buf = const_cast<void*>(arg->buf);
    ret = margo_bulk_create(used_mid, 1, &bulk_buf, &arg->in_size, HG_BULK_READ_ONLY, &in.bulk_handle);

    if (ret != HG_SUCCESS) {
        ld_logger->error("{}() failed to create bulk on client", __func__);
        ABT_eventual_set(arg->eventual, &write_size, sizeof(write_size));
        return;
    }

    for (int i = 0; i < RPC_TRIES; ++i) {
        // Wait for the RPC response.
        // This will call eventual_wait internally causing the calling ULT to be BLOCKED and implicitly yields
        ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (ret == HG_SUCCESS) {
            break;
        }
    }
    if (ret == HG_SUCCESS) {
        /* decode response */
        ret = margo_get_output(handle, &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() failed to get rpc output", __func__);
            ABT_eventual_set(arg->eventual, &write_size, sizeof(write_size));
            return;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        write_size = static_cast<size_t>(out.io_size);
        // Signal calling process that RPC is finished and put written size into return value
        ABT_eventual_set(arg->eventual, &write_size, sizeof(write_size));
        // clean up resources consumed by this rpc
        margo_bulk_free(in.bulk_handle);
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out", __func__);
        ABT_eventual_set(arg->eventual, &write_size, sizeof(write_size));
        return;
    }
    margo_destroy(handle);
}

void rpc_send_read_abt(void* _arg) {
    // Unpack
    auto* arg = static_cast<struct read_args*>(_arg);
    auto chnk_ids = *arg->chnk_ids;
    // RPC
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_read_data_in_t in{};
    rpc_data_out_t out{};
    hg_return_t ret;
    auto read_size = static_cast<size_t>(0);
    // fill in
    in.path = arg->path->c_str();
    in.offset = arg->in_offset;
    in.chunk_n = chnk_ids.size();
    in.chunk_start = arg->chnk_start;
    in.chunk_end = arg->chnk_end;
    in.total_chunk_size = arg->total_chunk_size;

    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, false);

    auto used_mid = margo_hg_handle_get_instance(handle);

    // register local target buffer for bulk access
    ret = margo_bulk_create(used_mid, 1, &arg->buf, &arg->in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);

    if (ret != HG_SUCCESS) {
        ld_logger->error("{}() failed to create bulk on client", __func__);
        ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
        return;
    }
    // Send RPC and wait for response
    for (int i = 0; i < RPC_TRIES; ++i) {
        // Wait for the RPC response.
        // This will call eventual_wait internally causing the calling ULT to be BLOCKED and implicitly yields
        ret = margo_forward_timed(handle, &in, RPC_TIMEOUT);
        if (ret == HG_SUCCESS) {
            break;
        }
    }
    if (ret == HG_SUCCESS) {
        /* decode response */
        ret = margo_get_output(handle, &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() failed to get rpc output", __func__);
            ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
            return;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        read_size = static_cast<size_t>(out.io_size);
        // Signal calling process that RPC is finished and put read size into return value
        ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
        // clean up resources consumed by this rpc
        margo_bulk_free(in.bulk_handle);
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out", __func__);
        ABT_eventual_set(arg->eventual, &read_size, sizeof(read_size));
        return;
    }
    margo_destroy(handle);
}
 No newline at end of file