Commit c3685119 authored by Marc Vef's avatar Marc Vef
Browse files

Cleanup of read/write protocol

parent 7b03d7d7
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -69,7 +69,7 @@ MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, ((hg_int32_t) (err))
MERCURY_GEN_PROC(rpc_read_data_in_t,
                 ((hg_const_string_t) (path))\
((int64_t) (offset))\
((hg_uint64_t) (chunks))\
((hg_uint64_t) (chunk_n))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
((hg_uint64_t) (total_chunk_size))\
@@ -82,7 +82,7 @@ MERCURY_GEN_PROC(rpc_data_out_t,
MERCURY_GEN_PROC(rpc_write_data_in_t,
                 ((hg_const_string_t) (path))\
((int64_t) (offset))\
((hg_uint64_t) (chunks))\
((hg_uint64_t) (chunk_n))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
((hg_uint64_t) (total_chunk_size))\
+1 −2
Original line number Diff line number Diff line
@@ -48,9 +48,8 @@ int adafs_dup(int oldfd);

int adafs_dup2(int oldfd, int newfd);

ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset);

ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset);

ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset);

#endif //IFS_ADAFS_FUNCTIONS_HPP
+298 −218

File changed.

Preview size limit exceeded, changes collapsed.

+85 −72
Original line number Diff line number Diff line
@@ -162,69 +162,6 @@ int adafs_dup2(const int oldfd, const int newfd) {
    return file_map.dup2(oldfd, newfd);
}

ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
    init_ld_env_if_needed();
    auto adafs_fd = file_map.get(fd);
    auto path = make_shared<string>(adafs_fd->path());
    auto read_size = static_cast<size_t>(0);
    auto err = 0;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    auto chnk_start = static_cast<size_t>(offset) / CHUNKSIZE; // first chunk number
    auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
    if ((offset + count) % CHUNKSIZE == 0)
        chnk_end--;
    vector<unsigned long> dest_idx{}; // contains the recipient ids, used to access the dest_ids map
    map<unsigned long, vector<unsigned long>> dest_ids{}; // contains the chnk ids (value list) per recipient (key)
    for (unsigned long i = chnk_start; i < chnk_end; i++) {
        auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
            dest_idx.push_back(recipient);
        } else
            dest_ids[recipient].push_back(i);
    }

    auto dest_n = static_cast<unsigned int>(dest_idx.size());
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
    for (unsigned int i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= offset % CHUNKSIZE;
        if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
        auto args = make_unique<read_args>();
        args->path = path;
        args->total_chunk_size = total_chunk_size;
        args->in_size = count;// total size to read
        args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk
        args->buf = buf;
        args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination
        args->chnk_start = chnk_start;
        args->chnk_end = chnk_end;
        args->recipient = dest_idx[i];// recipient
        args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        // Threads are implicitly released once calling function finishes
        ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
    }

    for (unsigned int i = 0; i < dest_n; i++) {
        size_t* thread_ret_size;
        ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
        if (thread_ret_size == nullptr || *thread_ret_size == 0) {
            err = -1;
            ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i);
        } else
            read_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
    }
    // XXX check how much we need to deal with the read_size
    // XXX check that we don't try to read past end of the file
    return err == 0 ? read_size : 0;
}

ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
    init_ld_env_if_needed();
@@ -243,35 +180,45 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
    if (append_flag)
        offset = updated_size - count;

    auto chnk_start = static_cast<size_t>(offset) / CHUNKSIZE; // first chunk number
    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
    if ((offset + count) % CHUNKSIZE == 0)
        chnk_end--;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<unsigned long, vector<unsigned long>> dest_ids{};
    map<uint64_t, vector<uint64_t>> dest_ids{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<unsigned long> dest_idx{};
    vector<uint64_t> dest_idx{};
    for (auto i = chnk_start; i < chnk_end; i++) {
        auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
        } else
            dest_ids[recipient].push_back(i);
    }
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    auto dest_n = static_cast<unsigned int>(dest_idx.size());
    auto dest_n = dest_idx.size();
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct write_args>> thread_args(dest_n);
    for (unsigned int i = 0; i < dest_n; i++) {
    for (uint64_t i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= offset % CHUNKSIZE;
            total_chunk_size -= (offset % CHUNKSIZE);
        if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
            total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE));
        auto args = make_unique<write_args>();
        // DEL BEGIN
        string ids = ""s;
        for (auto&& id : dest_ids[dest_idx[i]]) {
            ids += fmt::FormatInt(id).str() + "  "s;
        }
        ld_logger->info(
                "{}() destination {} chnk_offset {} size {} total_chnksize {} div {} mod {} chnkids\n{}",
                __func__, dest_idx[i], offset % CHUNKSIZE, count, total_chunk_size, total_chunk_size / CHUNKSIZE,
                total_chunk_size % CHUNKSIZE, ids);
        // DEL END
        args->path = path; // path
        args->total_chunk_size = total_chunk_size; // total size to write
        args->in_size = count;
@@ -286,7 +233,7 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
        ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
    }
    // Sum written sizes
    for (unsigned int i = 0; i < dest_n; i++) {
    for (uint64_t i = 0; i < dest_n; i++) {
        size_t* thread_ret_size;
        ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
        if (thread_ret_size == nullptr || *thread_ret_size == 0) {
@@ -298,3 +245,69 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
    }
    return write_size;
}

ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
    init_ld_env_if_needed();
    auto adafs_fd = file_map.get(fd);
    auto path = make_shared<string>(adafs_fd->path());
    auto read_size = static_cast<size_t>(0);
    auto err = 0;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
    if ((offset + count) % CHUNKSIZE == 0)
        chnk_end--;
    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    vector<uint64_t> dest_idx{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    map<uint64_t, vector<uint64_t>> dest_ids{};
    for (uint64_t i = chnk_start; i < chnk_end; i++) {
        auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
        } else
            dest_ids[recipient].push_back(i);
    }
    // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
    auto dest_n = dest_idx.size();
    vector<ABT_eventual> eventuals(dest_n);
    vector<unique_ptr<struct read_args>> thread_args(dest_n);
    for (uint64_t i = 0; i < dest_n; i++) {
        ABT_eventual_create(sizeof(size_t), &eventuals[i]);
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE));
        auto args = make_unique<read_args>();
        args->path = path;
        args->total_chunk_size = total_chunk_size;
        args->in_size = count;// total size to read
        args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk
        args->buf = buf;
        args->chnk_start = chnk_start;
        args->chnk_end = chnk_end;
        args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination
        args->recipient = dest_idx[i];// recipient
        args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
        thread_args[i] = std::move(args);
        // Threads are implicitly released once calling function finishes
        ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
    }
    // Sum read sizes
    for (uint64_t i = 0; i < dest_n; i++) {
        size_t* thread_ret_size;
        ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
        if (thread_ret_size == nullptr || *thread_ret_size == 0) {
            err = -1;
            ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i);
        } else
            read_size += *thread_ret_size;
        ABT_eventual_free(&eventuals[i]);
    }
    // XXX check how much we need to deal with the read_size
    // XXX check that we don't try to read past end of the file
    return err == 0 ? read_size : 0;
}
 No newline at end of file
+4 −2
Original line number Diff line number Diff line
@@ -288,7 +288,7 @@ void destroy_preload() {
        margo_diag_dump(ld_margo_rpc_id, "-", 0);
    }
#endif
    if (ld_margo_ipc_id != nullptr || ld_margo_rpc_id != nullptr) {
    if (services_used) {
        for (auto& io_stream : io_streams) {
            ABT_xstream_join(io_stream);
            ABT_xstream_free(&io_stream);
@@ -319,8 +319,10 @@ void destroy_preload() {
//        margo_finalize(ld_margo_ipc_id);
        ld_logger->debug("{}() Shut down Margo IPC client successful", __func__);
    }
    if (services_used)
    if (services_used) {
        rpc_address_cache.clear();
        ld_logger->info("All services shut down. Client shutdown complete.");
    }
    else
        ld_logger->debug("{}() No services in preload library used. Nothing to shut down.", __func__);
}
 No newline at end of file
Loading