Commit 4cfad336 authored by Marc Vef's avatar Marc Vef
Browse files

Read/Write destination structs moved to Heap for thread compatibility

parent b88ac598
Loading
Loading
Loading
Loading
+5 −4
Original line number Diff line number Diff line
@@ -16,22 +16,23 @@ extern "C" {
#include <iostream>

struct write_args {
    std::string& path;
    std::shared_ptr<std::string> path;
    size_t in_size;
    off_t in_offset;
    const void* buf;
    size_t chnk_start;
    off_t updated_size;
    std::vector<unsigned long>& chnk_ids;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual* eventual;
};

struct read_args {
    std::string& path;
    std::shared_ptr<std::string> path;
    size_t in_size;
    off_t in_offset;
    void* buf;
    std::vector<unsigned long>& chnk_ids;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual* eventual;
};
+28 −33
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ int adafs_stat64(const std::string& path, struct stat64* buf) {

ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {
    auto adafs_fd = file_map.get(fd);
    auto path = adafs_fd->path();
    auto path = make_shared<string>(adafs_fd->path());
    auto read_size = static_cast<size_t>(0);
    auto err = 0;

@@ -50,7 +50,7 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {
    map<unsigned long, vector<unsigned long>> dest_ids{}; // contains the chnk ids (value list) per recipient (key)
    for (unsigned long i = 0; i < chunk_n; i++) {
        auto chnk_id = i + chnk_id_start;
        auto recipient = get_rpc_node(path + fmt::FormatInt(chnk_id).str());
        auto recipient = get_rpc_node(*path + fmt::FormatInt(chnk_id).str());
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<unsigned long>{chnk_id}));
            dest_idx.push_back(recipient);
@@ -74,21 +74,18 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {
    auto dest_n = dest_idx.size();
    vector<ABT_thread> threads(dest_n);
    vector<ABT_eventual> eventuals(dest_n);
    vector<struct read_args*> thread_args(dest_n);
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
    for (unsigned long i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        struct read_args args = {
                path, // path
                count, // total size to read
                0, // reading offset only for the first chunk
                buf, // pointer to write buffer
                dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
                dest_idx[i], // recipient
                &eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
        };
        if (i == 0)
            args.in_offset = offset % CHUNKSIZE;
        thread_args[i] = &args;

        auto args = make_unique<read_args>();
        args->path = path;
        args->in_size = count;// total size to read
        args->in_offset = (i == 0) ? offset % CHUNKSIZE : 0;// reading offset only for the first chunk
        args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        ABT_thread_create(pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
    }

@@ -118,13 +115,13 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {

ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
    auto adafs_fd = file_map.get(fd);
    auto path = adafs_fd->path();
    auto path = make_shared<string>(adafs_fd->path());
    auto append_flag = adafs_fd->append_flag();
    int err = 0;
    long updated_size = 0;
    auto write_size = static_cast<size_t>(0);

    err = rpc_send_update_metadentry_size(path, count, offset, append_flag, updated_size);
    err = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size);
    if (err != 0) {
        ld_logger->error("{}() update_metadentry_size failed with err {}", __func__, err);
        return 0; // ERR
@@ -142,7 +139,7 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<unsigned long> dest_idx{};
    for (auto i = chnk_start; i < chnk_end; i++) {
        auto recipient = get_rpc_node(path + fmt::FormatInt(i).str());
        auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
            dest_idx.push_back(recipient);
@@ -165,7 +162,7 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
    auto dest_n = dest_idx.size();
    vector<ABT_thread> threads(dest_n);
    vector<ABT_eventual> eventuals(dest_n);
    vector<struct write_args*> thread_args(dest_n);
    vector<unique_ptr<struct write_args>> thread_args(dest_n);
    for (unsigned long i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
@@ -173,20 +170,18 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
            total_chunk_size -= offset % CHUNKSIZE;
        if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
        struct write_args args = {
                path, // path
                total_chunk_size, // total size to write
                0, // writing offset only relevant for the first chunk that is written
                buf, // pointer to write buffer
                chnk_start, // append flag when file was opened
                updated_size, // for append truncate TODO needed?
                dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
                dest_idx[i], // recipient
                &eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
        };
        if (i == 0) // first offset in dest_idx is the chunk with a potential offset
            args.in_offset = offset % CHUNKSIZE;
        thread_args[i] = &args;
        auto args = make_unique<write_args>();
        args->path = path; // path
        args->in_size = total_chunk_size;// total size to write
        args->in_offset = (i == 0) ? offset % CHUNKSIZE
                                   : 0;// first offset in dest_idx is the chunk with a potential offset
        args->buf = buf;// pointer to write buffer
        args->chnk_start = chnk_start;// append flag when file was opened
        args->updated_size = updated_size;// for append truncate TODO needed?
        args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
    }

+8 −6
Original line number Diff line number Diff line
@@ -11,8 +11,9 @@ using namespace std;
void rpc_send_write_abt(void* _arg) {
    auto* arg = static_cast<struct write_args*>(_arg);

    auto recipient_size = arg->chnk_ids.size();
    auto chnk_ids = arg->chnk_ids;
    ld_logger->info("{}() recipient {}", __func__, arg->recipient);
    auto recipient_size = arg->chnk_ids->size();
    auto chnk_ids = *arg->chnk_ids;
    vector<char*> chnks(recipient_size);
    vector<size_t> buf_sizes(recipient_size * 2);
    for (size_t i = 0; i < buf_sizes.size(); i++) {
@@ -60,7 +61,8 @@ void rpc_send_write_abt(void* _arg) {
    hg_return_t ret;
    auto write_size = static_cast<size_t>(0);
    // fill in
    in.path = arg->path.c_str();
    arg->path->c_str();
    in.path = arg->path->c_str();
    in.offset = arg->in_offset;
    in.updated_size = arg->updated_size;
    in.append = HG_FALSE; // unused
@@ -121,8 +123,8 @@ void rpc_send_read_abt(void* _arg) {

    // Prepare buffers
    auto* arg = static_cast<struct read_args*>(_arg);
    auto recipient_size = arg->chnk_ids.size();
    auto chnk_ids = arg->chnk_ids;
    auto recipient_size = arg->chnk_ids->size();
    auto chnk_ids = *arg->chnk_ids;
    vector<char*> chnks(recipient_size);
    vector<size_t> buf_sizes(recipient_size * 2);

@@ -160,7 +162,7 @@ void rpc_send_read_abt(void* _arg) {
    hg_return_t ret;
    auto read_size = static_cast<size_t>(0);
    // fill in
    in.path = arg->path.c_str();
    in.path = arg->path->c_str();
    in.size = arg->in_size;
    in.offset = arg->in_offset;