Commit 5803647e authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch 'client_io_cleanup' into 'master'

Client io cleanup

See merge request zdvresearch_bsc/adafs!24
parents fb57de99 585c49d2
Loading
Loading
Loading
Loading
+0 −5
Original line number Diff line number Diff line
@@ -52,11 +52,6 @@
 * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler
 */
#define DAEMON_IO_XSTREAMS 8
/*
 * Sets the number of concurrent progress for sending I/O related RPCs to daemons
 * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler
 */
#define PRELOAD_IORPC_XSTREAMS 8
// Number of threads used for RPC and IPC handlers at the daemon
#define DAEMON_RPC_HANDLER_XSTREAMS 8
#define DAEMON_IPC_HANDLER_XSTREAMS 8
+1 −5
Original line number Diff line number Diff line
@@ -102,9 +102,6 @@ extern std::map<uint64_t, hg_addr_t> rpc_address_cache;
extern ABT_mutex rpc_address_cache_mutex;
// file descriptor index validation flag
extern std::atomic<bool> fd_validation_needed;
// thread pool
extern ABT_pool io_pool;
extern std::vector<ABT_xstream> io_streams;

// function definitions

@@ -132,8 +129,7 @@ bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr);
bool is_local_op(size_t recipient);

template<typename T>
hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const T&, hg_handle_t& handle,
                            hg_addr_t& svr_addr, bool force_rpc);
hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const T&, hg_handle_t& handle, bool force_rpc);


#endif //IFS_PRELOAD_UTIL_HPP
+3 −29
Original line number Diff line number Diff line
@@ -15,35 +15,9 @@ extern "C" {

#include <iostream>

// XXX these two structs can be merged. How to deal with const void* then?
struct write_args {
    std::shared_ptr<std::string> path;
    size_t total_chunk_size;
    size_t in_size;
    off_t in_offset;
    const void* buf;
    size_t chnk_start;
    size_t chnk_end;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
};
ssize_t rpc_send_write(const std::string& path, const void* buf, const bool append_flag, const off64_t in_offset,
                       const size_t write_size, const int64_t updated_metadentry_size);

struct read_args {
    std::shared_ptr<std::string> path;
    size_t total_chunk_size;
    size_t in_size;
    off_t in_offset;
    void* buf;
    size_t chnk_start;
    size_t chnk_end;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual eventual;
};

void rpc_send_write_abt(void* _arg);

void rpc_send_read_abt(void* _arg);
ssize_t rpc_send_read(const std::string& path, void* buf, const off64_t offset, const size_t read_size);

#endif //IFS_PRELOAD_C_DATA_WS_HPP
+12 −119
Original line number Diff line number Diff line
#include <preload/adafs_functions.hpp>
#include <preload/rpc/ld_rpc_metadentry.hpp>
#include <preload/rpc/ld_rpc_data_ws.hpp>
#include <global/rpc/rpc_utils.hpp>

using namespace std;

@@ -169,136 +168,30 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
    auto adafs_fd = file_map.get(fd);
    auto path = make_shared<string>(adafs_fd->path());
    auto append_flag = adafs_fd->get_flag(OpenFile_flags::append);
    int err = 0;
    ssize_t ret = 0;
    long updated_size = 0;
    auto write_size = static_cast<size_t>(0);

    err = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size);
    if (err != 0) {
        ld_logger->error("{}() update_metadentry_size failed with err {}", __func__, err);
    ret = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size);
    if (ret != 0) {
        ld_logger->error("{}() update_metadentry_size failed with ret {}", __func__, ret);
        return 0; // ERR
    }
    if (append_flag)
        offset = updated_size - count;

    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
    if ((offset + count) % CHUNKSIZE == 0)
        chnk_end--;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> dest_ids{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<uint64_t> dest_idx{};
    for (uint64_t i = chnk_start; i < chnk_end; i++) {
        auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size);
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
        } else
            dest_ids[recipient].push_back(i);
    }
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    auto dest_n = dest_idx.size();
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct write_args>> thread_args(dest_n);
    for (uint64_t i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE));
        auto args = make_unique<write_args>();
        args->path = path; // path
        args->total_chunk_size = total_chunk_size; // total size to write
        args->in_size = count;
        args->in_offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        args->buf = buf;// pointer to write buffer
        args->chnk_start = chnk_start;// append flag when file was opened
        args->chnk_end = chnk_end;// append flag when file was opened
        args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination
        args->recipient = dest_idx[i];// recipient
        args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
    ret = rpc_send_write(*path, buf, append_flag, offset, count, updated_size);
    if (ret < 0) {
        ld_logger->warn("{}() rpc_send_write failed with ret {}", __func__, ret);
    }
    // Sum written sizes
    for (uint64_t i = 0; i < dest_n; i++) {
        size_t* thread_ret_size;
        ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
        if (thread_ret_size == nullptr || *thread_ret_size == 0) {
            // TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted
            ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i);
        } else
            write_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
    }
    return write_size;
    return ret; // return written size or -1 as error
}

ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
    init_ld_env_if_needed();
    auto adafs_fd = file_map.get(fd);
    auto path = make_shared<string>(adafs_fd->path());
    auto read_size = static_cast<size_t>(0);
    auto err = 0;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
    if ((offset + count) % CHUNKSIZE == 0)
        chnk_end--;
    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> dest_ids{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<uint64_t> dest_idx{};
    for (uint64_t i = chnk_start; i < chnk_end; i++) {
        auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size);
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
        } else
            dest_ids[recipient].push_back(i);
    }
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    auto dest_n = dest_idx.size();
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
    for (uint64_t i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE));
        auto args = make_unique<read_args>();
        args->path = path;
        args->total_chunk_size = total_chunk_size;
        args->in_size = count;// total size to read
        args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk
        args->buf = buf;
        args->chnk_start = chnk_start;
        args->chnk_end = chnk_end;
        args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination
        args->recipient = dest_idx[i];// recipient
        args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        // Threads are implicitly released once calling function finishes
        ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
    }
    // Sum read sizes
    for (uint64_t i = 0; i < dest_n; i++) {
        size_t* thread_ret_size;
        ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
        if (thread_ret_size == nullptr || *thread_ret_size == 0) {
            err = -1;
            ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i);
        } else
            read_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
    auto ret = rpc_send_read(*path, buf, offset, count);
    if (ret < 0) {
        ld_logger->warn("{}() rpc_send_read failed with ret {}", __func__, ret);
    }
    // XXX check how much we need to deal with the read_size
    // XXX check that we don't try to read past end of the file
    return err == 0 ? read_size : 0;
    return ret; // return read size or -1 as error
}
 No newline at end of file
+2 −17
Original line number Diff line number Diff line
@@ -47,9 +47,6 @@ std::map<uint64_t, hg_addr_t> rpc_address_cache;
ABT_mutex rpc_address_cache_mutex;
// local daemon IPC address
hg_addr_t daemon_svr_addr = HG_ADDR_NULL;
// IO RPC driver
ABT_pool io_pool;
std::vector<ABT_xstream> io_streams;

/**
 * Initializes the Argobots environment
@@ -78,14 +75,6 @@ bool init_ld_argobots() {
     * See for reference: https://xgitlab.cels.anl.gov/sds/margo/issues/40
     */
    putenv(const_cast<char*>("ABT_MEM_MAX_NUM_STACKS=8"));
    // Creating pool for driving IO RPCs
    vector<ABT_xstream> io_streams_tmp(PRELOAD_IORPC_XSTREAMS);
    argo_err = ABT_snoozer_xstream_create(PRELOAD_IORPC_XSTREAMS, &io_pool, io_streams_tmp.data());
    if (argo_err != ABT_SUCCESS) {
        ld_logger->error("{}() ABT_snoozer_xstream_create()  (client)", __func__);
        return false;
    }
    io_streams = io_streams_tmp;
    ld_logger->debug("{}() Argobots initialization successful.", __func__);
    return true;
}
@@ -295,10 +284,6 @@ void destroy_preload() {
#endif
    if (services_used) {
        ld_logger->debug("{}() Freeing ABT constructs ...", __func__);
        for (auto& io_stream : io_streams) {
            ABT_xstream_join(io_stream);
            ABT_xstream_free(&io_stream);
        }
        ABT_mutex_free(&rpc_address_cache_mutex);
        ld_logger->debug("{}() Freeing ABT constructs successful", __func__);
    }
@@ -313,7 +298,7 @@ void destroy_preload() {
            }
        }
        ld_logger->debug("{}() About to finalize the margo RPC client. Actually not doing it XXX", __func__);
        margo_finalize(ld_margo_rpc_id);
//        margo_finalize(ld_margo_rpc_id); // XXX Sometimes this hangs on the cluster. Investigate
        ld_logger->debug("{}() Shut down Margo RPC client successful", __func__);
    }
    // Shut down IPC client if used
@@ -322,7 +307,7 @@ void destroy_preload() {
        if (margo_addr_free(ld_margo_ipc_id, daemon_svr_addr) != HG_SUCCESS)
            ld_logger->warn("{}() Unable to free IPC client's daemon svr address.", __func__);
        ld_logger->debug("{}() About to finalize the margo IPC client. Actually not doing it XXX", __func__);
        margo_finalize(ld_margo_ipc_id);
//        margo_finalize(ld_margo_ipc_id);
        ld_logger->debug("{}() Shut down Margo IPC client successful", __func__);
    }
    if (services_used) {
Loading