Commit b9958e5f authored by Marc Vef's avatar Marc Vef
Browse files

first cut on offset write implementation

parent 3df9a6e2
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ struct write_args {
    size_t in_size;
    off_t in_offset;
    const void* buf;
    bool append;
    size_t chnk_start;
    off_t updated_size;
    std::vector<unsigned long>& chnk_ids;
    ABT_eventual* eventual;
+18 −7
Original line number Diff line number Diff line
@@ -239,13 +239,19 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
            ld_logger->error("{}() update_metadentry_size failed", __func__);
            return 0; // ERR
        }
        // TODO this does only work without an offset continue here

        auto chunk_n = static_cast<size_t>(ceil(
                count / static_cast<float>(CHUNKSIZE))); // get number of chunks needed for writing
        auto chnk_start = static_cast<size_t>(offset) / CHUNKSIZE; // first chunk number
        auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
        if ((offset + count) % CHUNKSIZE == 0)
            chnk_end--;

        auto chnk_n = chnk_end - chnk_start; // total number of chunks
        // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
        map<unsigned long, vector<unsigned long>> dest_ids{};
        vector<unsigned long> dest_idx{}; // contains the recipient ids, used to access the dest_ids map
        for (unsigned long i = 0; i < chunk_n; i++) {
        // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
        vector<unsigned long> dest_idx{};
        for (auto i = chnk_start; i < chnk_end; i++) {
            auto recipient = get_rpc_node(path + fmt::FormatInt(i).str());
            if (dest_ids.count(recipient) == 0) {
                dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
@@ -272,17 +278,22 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
        vector<struct write_args*> thread_args(dest_n);
        for (unsigned long i = 0; i < dest_n; i++) {
            ABT_eventual_create(sizeof(size_t), &eventuals[i]);
            auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
            if (i == 0) // receiver of first chunk must subtract the offset from first chunk
                total_chunk_size -= offset % CHUNKSIZE;
            if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
                total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
            struct write_args args = {
                    path, // path
                    count, // total size to write
                    total_chunk_size, // total size to write
                    0, // writing offset only relevant for the first chunk that is written
                    buf, // pointer to write buffer
                    append_flag, // append flag when file was opened
                    chnk_start, // append flag when file was opened
                    updated_size, // for append truncate TODO needed?
                    dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
                    &eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
            };
            if (i == 0)
            if (i == 0) // first offset in dest_idx is the chunk with a potential offset
                args.in_offset = offset % CHUNKSIZE;
            thread_args[i] = &args;
            ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
+16 −8
Original line number Diff line number Diff line
@@ -20,15 +20,26 @@ void rpc_send_write_abt(void* _arg) {
        if (i < buf_sizes.size() / 2)
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
        else {
            if (i + 1 == buf_sizes.size()) {// if current chunk size is last chunk
                // the last chunk will have the rest of the size, i.e., write size - all applied chunk sizes
                buf_sizes[i] = arg->in_size - (chnk_ids[i - chnks.size()] * CHUNKSIZE);
            if (i == buf_sizes.size() / 2) { // first chunk which might have an offset
                if (arg->in_size + arg->in_offset > CHUNKSIZE)
                    buf_sizes[i] = static_cast<size_t>(CHUNKSIZE - arg->in_offset);
                else
                    buf_sizes[i] = static_cast<size_t>(arg->in_size);

                chnks[i - chnks.size()] =
                        static_cast<char*>(const_cast<void*>(arg->buf)) +
                        (CHUNKSIZE * (chnk_ids[i - chnk_ids.size()] - arg->chnk_start));
                continue;
            } else if (i + 1 == buf_sizes.size()) {// last chunk has remaining size
                buf_sizes[i] =
                        arg->in_size - (((chnk_ids[i - chnks.size()] - arg->chnk_start) * CHUNKSIZE) - arg->in_offset);
            } else {
                buf_sizes[i] = CHUNKSIZE;
            }
            // position the pointer according to the chunk number
            chnks[i - chnks.size()] =
                    static_cast<char*>(const_cast<void*>(arg->buf)) + (CHUNKSIZE * chnk_ids[i - chnk_ids.size()]);
                    static_cast<char*>(const_cast<void*>(arg->buf)) +
                    ((CHUNKSIZE * (chnk_ids[i - chnk_ids.size()] - arg->chnk_start)) - arg->in_offset);
        }
    }
    // setting pointers to the ids and to the chunks
@@ -52,10 +63,7 @@ void rpc_send_write_abt(void* _arg) {
    in.path = arg->path.c_str();
    in.offset = arg->in_offset;
    in.updated_size = arg->updated_size;
    if (arg->append)
        in.append = HG_TRUE;
    else
        in.append = HG_FALSE;
    in.append = HG_FALSE; // unused


    margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->path, handle, svr_addr, false);
+9 −20
Original line number Diff line number Diff line
@@ -25,14 +25,8 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {

    auto segment_count = margo_bulk_get_segment_count(in.bulk_handle);
    auto bulk_size = margo_bulk_get_size(in.bulk_handle);
    // is write happening over shared memory on the same node?
    auto local_read = is_handle_sm(mid, hgi->addr);
    if (local_read)
        ADAFS_DATA->spdlogger()->debug("{}() Got local read IPC with path {} size {} offset {}", __func__, in.path,
                                       bulk_size,
                                       in.offset);
    else
        ADAFS_DATA->spdlogger()->debug("{}() Got read RPC with path {} size {} offset {}", __func__, in.path, bulk_size,
    ADAFS_DATA->spdlogger()->debug("{}() Got read RPC (local {}) with path {} size {} offset {}", __func__,
                                   (margo_get_info(handle)->target_id == ADAFS_DATA->host_id()), in.path, bulk_size,
                                   in.offset);

    // set buffer sizes
@@ -121,15 +115,9 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {

    auto segment_count = margo_bulk_get_segment_count(in.bulk_handle);
    auto bulk_size = margo_bulk_get_size(in.bulk_handle);
    // is write happening over shared memory on the same node?
    auto local_write = is_handle_sm(mid, hgi->addr);
    if (local_write)
        ADAFS_DATA->spdlogger()->debug("{}() Got local write IPC with path {} size {} offset {}", __func__, in.path,
                                       bulk_size,
    ADAFS_DATA->spdlogger()->debug("{}() Got write RPC (local {}) with path {} size {} offset {}", __func__,
                                   (margo_get_info(handle)->target_id == ADAFS_DATA->host_id()), in.path, bulk_size,
                                   in.offset);
    else
        ADAFS_DATA->spdlogger()->debug("{}() Got write RPC with path {} size {} offset {}", __func__, in.path,
                                       bulk_size, in.offset);


    // set buffer sizes information
@@ -142,8 +130,9 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
            id_size += sizeof(rpc_chnk_id_t);

        } else {
            // case for last chunk size
            if ((chnk_size + CHUNKSIZE) > bulk_size)
            if (i == segment_count / 2) { // first chunk
                buf_sizes[i] = static_cast<unsigned long>(CHUNKSIZE - in.offset);
            } else if ((chnk_size + CHUNKSIZE + id_size) > bulk_size) // last chunk
                buf_sizes[i] = bulk_size - chnk_size - id_size;
            else
                buf_sizes[i] = CHUNKSIZE;
+12 −8
Original line number Diff line number Diff line
@@ -9,12 +9,16 @@ using namespace std;
int main(int argc, char* argv[]) {

//    char buf[] = "lefthyblubber";
    char buf[] = "13333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333311567892222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222345671\n";
//    char buf[] = "13333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333311567892222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222345671\n";
    // 1613
//    char buf[] = "1333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333331156789222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222234567113333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333311567892222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222345671\n12345blaaaa";
    char buf[] = "penis";
//    char buf[] = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
    string p = "/tmp/mountdir/file"s;
//    auto fd = open(p.c_str(), O_CREAT | O_WRONLY, 0777);
    auto fd = open(p.c_str(), O_CREAT | O_WRONLY, 0777);
//    auto nw = write(fd, &buf, strlen(buf));
////    auto nw = pwrite(fd, &buf, strlen(buf), 395);
//    close(fd);
    auto nw = pwrite(fd, &buf, strlen(buf), 790);
    close(fd);

//    char buf_a[] = "456esta\n";
//    auto fd_a = open(p.c_str(), O_WRONLY | O_APPEND, 0777);
@@ -22,10 +26,10 @@ int main(int argc, char* argv[]) {
//    close(fd);
//
    char read_buf[] = "99999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999\n";
    auto fd = open(p.c_str(), O_RDONLY, 0777);
    auto rs = read(fd, &read_buf, strlen(buf));
    printf("buffer read: %s\n", read_buf);
    close(fd);
//    auto fd = open(p.c_str(), O_RDONLY, 0777);
//    auto rs = read(fd, &read_buf, strlen(buf));
//    printf("buffer read: %s\n", read_buf);
//    close(fd);

    //    auto fd2 = open("/tmp/rootdir/data/chunks/file/data2", O_RDONLY, 0777);
//    char buf_read2[9] = {0};