Commit 0521bbb6 authored by Marc Vef's avatar Marc Vef
Browse files

Write rpc format changed to read format + cleanup

parent 8a3f43b1
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -108,9 +108,9 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector
    auto chnk_n = buf_sizes.size() / 2;
    // TODO this can be parallized
    for (size_t i = 0; i < chnk_n; i++) {
        auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i * 2]));
        auto chnk_ptr = static_cast<char*>(buf_ptrs[(i * 2) + 1]);
        auto chnk_size = buf_sizes[(i * 2) + 1];
        auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i]));
        auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]);
        auto chnk_size = buf_sizes[i + chnk_n];
        size_t written_chnk_size;
        // TODO 5,6,7 params (append and offset stuff)
        if (write_file(path, chnk_ptr, chnk_id, chnk_size, 0, false, 0, written_chnk_size) != 0) {
+7 −8
Original line number Diff line number Diff line
@@ -231,6 +231,7 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
        auto append_flag = adafs_fd->append_flag();
        int err = 0;
        long updated_size = 0;
        auto write_size = static_cast<size_t>(0);

//        if (append_flag)
        err = rpc_send_update_metadentry_size(path, count, append_flag, updated_size);
@@ -283,7 +284,7 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
            thread_args[i] = &args;
            ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        }
        auto write_size_total = static_cast<size_t>(0);

        for (unsigned long i = 0; i < dest_n; i++) {
            size_t* thread_ret_size;
            ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
@@ -291,7 +292,7 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
                // TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted
                ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i);
            } else
                write_size_total += *thread_ret_size;
                write_size += *thread_ret_size;
            ABT_eventual_free(&eventuals[i]);
            ret = ABT_thread_join(threads[i]);
            if (ret != 0) {
@@ -304,7 +305,7 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
                return -1;
            }
        }
        return write_size_total;
        return write_size;
    }
    return (reinterpret_cast<decltype(&pwrite)>(libc_pwrite))(fd, buf, count, offset);
}
@@ -324,10 +325,9 @@ ssize_t pread(int fd, void* buf, size_t count, off_t offset) {
        ld_logger->trace("{}() called with fd {}", __func__, fd);
        auto adafs_fd = file_map.get(fd);
        auto path = adafs_fd->path();
        size_t read_size = 0;
        auto read_size = static_cast<size_t>(0);
        auto err = 0;


        // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
        auto chunk_n = static_cast<size_t>(ceil(
                count / static_cast<float>(CHUNKSIZE))); // get number of chunks needed for writing
@@ -377,7 +377,6 @@ ssize_t pread(int fd, void* buf, size_t count, off_t offset) {
            ABT_thread_create(pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
        }

        auto read_size_total = static_cast<size_t>(0);
        for (unsigned long i = 0; i < dest_n; i++) {
            size_t* thread_ret_size;
            ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
@@ -385,7 +384,7 @@ ssize_t pread(int fd, void* buf, size_t count, off_t offset) {
                err = -1;
                ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i);
            } else
                read_size_total += *thread_ret_size;
                read_size += *thread_ret_size;
            ABT_eventual_free(&eventuals[i]);
            ret = ABT_thread_join(threads[i]);
            if (ret != 0) {
@@ -399,7 +398,7 @@ ssize_t pread(int fd, void* buf, size_t count, off_t offset) {
            }
        }
        // XXX check how much we need to deal with the read_size
        return err == 0 ? read_size_total : 0;;
        return err == 0 ? read_size : 0;
    }
    return (reinterpret_cast<decltype(&pread)>(libc_pread))(fd, buf, count, offset);
}
+36 −17
Original line number Diff line number Diff line
@@ -17,26 +17,27 @@ void rpc_send_write_abt(void* _arg) {
    vector<size_t> buf_sizes(recipient_size * 2);
    for (size_t i = 0; i < buf_sizes.size(); i++) {
        // even numbers contain the sizes of ids, while uneven contain the chunksize
        if (i % 2 == 0)
        if (i < buf_sizes.size() / 2)
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
        else {
            if (chnk_ids[i / 2] == chnk_ids.size() - 1) {// if current chunk size is last chunk
            if (i + 1 == buf_sizes.size()) {// if current chunk size is last chunk
                // the last chunk will have the rest of the size, i.e., write size - all applied chunk sizes
                buf_sizes[i] = arg->in_size - (chnk_ids[i / 2] * CHUNKSIZE);
                buf_sizes[i] = arg->in_size - (chnk_ids[i - chnks.size()] * CHUNKSIZE);
            } else {
                buf_sizes[i] = CHUNKSIZE;
            }
            // position the pointer according to the chunk number
            chnks[i / 2] = static_cast<char*>(const_cast<void*>(arg->buf)) + (CHUNKSIZE * chnk_ids[i / 2]);
            chnks[i - chnks.size()] =
                    static_cast<char*>(const_cast<void*>(arg->buf)) + (CHUNKSIZE * chnk_ids[i - chnk_ids.size()]);
        }
    }
    // setting pointers to the ids and to the chunks
    vector<void*> buf_ptrs(recipient_size * 2);
    for (unsigned long i = 0; i < buf_ptrs.size(); i++) {
        if (i % 2 == 0) // id pointer
            buf_ptrs[i] = &chnk_ids[i / 2];
        if (i < buf_sizes.size() / 2) // id pointer
            buf_ptrs[i] = &chnk_ids[i];
        else // data pointer
            buf_ptrs[i] = chnks[i / 2];
            buf_ptrs[i] = chnks[i - chnk_ids.size()];
    }

    // RPC
@@ -46,6 +47,7 @@ void rpc_send_write_abt(void* _arg) {
    rpc_data_out_t out{};
    int err;
    hg_return_t ret;
    auto write_size = static_cast<size_t>(0);
    // fill in
    in.path = arg->path.c_str();
    in.offset = arg->in_offset;
@@ -64,8 +66,11 @@ void rpc_send_write_abt(void* _arg) {
    ret = margo_bulk_create(used_mid, static_cast<uint32_t>(buf_sizes.size()), buf_ptrs.data(), buf_sizes.data(),
                            HG_BULK_READ_ONLY, &in.bulk_handle);

    if (ret != 0)
    if (ret != HG_SUCCESS) {
        ld_logger->error("{}() failed to create bulk on client", __func__);
        ABT_eventual_set(*(arg->eventual), &write_size, sizeof(write_size));
        return;
    }

    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
@@ -74,11 +79,15 @@ void rpc_send_write_abt(void* _arg) {
            break;
        }
    }
    auto write_size = static_cast<size_t>(0);
    if (send_ret == HG_SUCCESS) {

        /* decode response */
        ret = margo_get_output(handle, &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() failed to get rpc output", __func__);
            ABT_eventual_set(*(arg->eventual), &write_size, sizeof(write_size));
            return;
        }
        err = out.res;
        if (err != 0)
            write_size = static_cast<size_t>(0);
@@ -90,6 +99,8 @@ void rpc_send_write_abt(void* _arg) {
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out", __func__);
        ABT_eventual_set(*(arg->eventual), &write_size, sizeof(write_size));
        return;
    }
    // Signal calling process that RPC is finished and put written size into return value
    ABT_eventual_set(*(arg->eventual), &write_size, sizeof(write_size));
@@ -111,7 +122,6 @@ void rpc_send_read_abt(void* _arg) {
        if (i < buf_sizes.size() / 2)
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
        else {

            if (i / 2 == 0) { // First chunk might have an offset
                buf_sizes[i] = CHUNKSIZE - static_cast<unsigned long>(arg->in_offset);
            } else if (i + 1 == buf_sizes.size()) {// if current chunk size is last chunk
@@ -137,8 +147,9 @@ void rpc_send_read_abt(void* _arg) {
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_read_data_in_t in{};
    rpc_data_out_t out{};
    int err;
//    int err; // XXX
    hg_return_t ret;
    auto read_size = static_cast<size_t>(0);
    // fill in
    in.path = arg->path.c_str();
    in.size = arg->in_size;
@@ -151,8 +162,11 @@ void rpc_send_read_abt(void* _arg) {
    /* register local target buffer for bulk access */
    ret = margo_bulk_create(used_mid, static_cast<uint32_t>(buf_sizes.size()), buf_ptrs.data(), buf_sizes.data(),
                            HG_BULK_READWRITE, &in.bulk_handle);
    if (ret != 0)
    if (ret != HG_SUCCESS) {
        ld_logger->error("{}() failed to create bulk on client", __func__);
        ABT_eventual_set(*(arg->eventual), &read_size, sizeof(read_size));
        return;
    }

    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
@@ -161,20 +175,25 @@ void rpc_send_read_abt(void* _arg) {
            break;
        }
    }
    auto read_size = static_cast<size_t>(0);
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        ret = margo_get_output(handle,
                               &out); // XXX handle ret out.res can inidicate a failure with reading on the other side.
        ret = margo_get_output(handle, &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() failed to get rpc output", __func__);
            ABT_eventual_set(*(arg->eventual), &read_size, sizeof(read_size));
            return;
        }
        read_size = static_cast<size_t>(out.io_size);
        err = out.res;
//        err = out.res;
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        /* clean up resources consumed by this rpc */
        margo_bulk_free(in.bulk_handle);
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out", __func__);
        err = EAGAIN;
        ABT_eventual_set(*(arg->eventual), &read_size, sizeof(read_size));
        return;
//        err = EAGAIN;
    }
    // Signal calling process that RPC is finished and put read size into return value
    ABT_eventual_set(*(arg->eventual), &read_size, sizeof(read_size));
+58 −45
Original line number Diff line number Diff line
@@ -28,10 +28,12 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    // is write happening over shared memory on the same node?
    auto local_read = is_handle_sm(mid, hgi->addr);
    if (local_read)
        ADAFS_DATA->spdlogger()->debug("Got local read IPC with path {} size {} offset {}", in.path, bulk_size,
        ADAFS_DATA->spdlogger()->debug("{}() Got local read IPC with path {} size {} offset {}", __func__, in.path,
                                       bulk_size,
                                       in.offset);
    else
        ADAFS_DATA->spdlogger()->debug("Got read RPC with path {} size {} offset {}", in.path, bulk_size, in.offset);
        ADAFS_DATA->spdlogger()->debug("{}() Got read RPC with path {} size {} offset {}", __func__, in.path, bulk_size,
                                       in.offset);

    // set buffer sizes
    vector<hg_size_t> buf_sizes(segment_count);
@@ -64,7 +66,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        }
    }
    if (local_read) {
        // TODO bulk access readwrite doesn't work for some reason ...
        // TODO bulk access readwrite doesn't work for some reason ... Section unfinished
        uint32_t actual_count;
        // The data is not transferred. We directly access the data from the client on the same node
        ret = margo_bulk_access(in.bulk_handle, 0, bulk_size, HG_BULK_READWRITE, segment_count, buf_ptrs.data(),
@@ -108,7 +110,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, id_size, bulk_handle, id_size,
                                  chnk_size);
        if (ret != HG_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("Failed push the data to the client in read operation");
            ADAFS_DATA->spdlogger()->error("{}() Failed push the data to the client in read operation", __func__);
            return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
        }
    }
@@ -117,11 +119,22 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    out.res = 0;
    out.io_size = read_size;

    ADAFS_DATA->spdlogger()->debug("Sending output response {}", out.res);
    if (bulk_handle)
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    else
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    //cleanup
    ADAFS_DATA->spdlogger()->debug(", __func__{}() Sending output response {}", out.res);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    // free memory in buf_ptrs
    // On a local operation the data is owned by the client who is responsible to free its buffers
    if (!local_read) {
        for (size_t i = 0; i < segment_count; i++) {
            if (i < segment_count / 2)
                delete static_cast<rpc_chnk_id_t*>(buf_ptrs[i]);
            else {
                delete[] static_cast<char*>(buf_ptrs[i]);
            }
        }

    }
    return ret;
}

DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data)
@@ -129,7 +142,10 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data)
static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    rpc_write_data_in_t in{};
    rpc_data_out_t out{};
    hg_bulk_t bulk_handle;
    hg_bulk_t bulk_handle = nullptr;
    // default out
    out.res = EIO;
    out.io_size = 0;

    auto ret = margo_get_input(handle, &in);
    assert(ret == HG_SUCCESS);
@@ -143,34 +159,39 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    // is write happening over shared memory on the same node?
    auto local_write = is_handle_sm(mid, hgi->addr);
    if (local_write)
        ADAFS_DATA->spdlogger()->debug("Got local write IPC with path {} size {} offset {}", in.path, bulk_size,
        ADAFS_DATA->spdlogger()->debug("{}() Got local write IPC with path {} size {} offset {}", __func__, in.path,
                                       bulk_size,
                                       in.offset);
    else
        ADAFS_DATA->spdlogger()->debug("Got write RPC with path {} size {} offset {}", in.path, bulk_size, in.offset);
        ADAFS_DATA->spdlogger()->debug("{}() Got write RPC with path {} size {} offset {}", __func__, in.path,
                                       bulk_size, in.offset);


    // set buffer sizes
    vector<hg_size_t> buf_sizes(segment_count);
    size_t chnk_size = 0;
    size_t id_size = 0;
    for (size_t i = 0; i < segment_count; i++) {
        if (i % 2 == 0)
        if (i < segment_count / 2) {
            buf_sizes[i] = sizeof(rpc_chnk_id_t);
        else {
            id_size += sizeof(rpc_chnk_id_t);

        } else {
            // case for last chunk size
            if ((chnk_size + CHUNKSIZE) > bulk_size)
                buf_sizes[i] = bulk_size - chnk_size;
                buf_sizes[i] = bulk_size - chnk_size - id_size;
            else
                buf_sizes[i] = CHUNKSIZE;
        }
            chnk_size += buf_sizes[i];
        }
    }
    // allocate memory for bulk transfer
    vector<void*> buf_ptrs(segment_count);
    // On a local operation the buffers are allocated in the client on the same node.
    // Hence no memory allocation is necessary
    if (!local_write) {
        for (size_t i = 0; i < segment_count; i++) {
            if (i % 2 == 0)
            if (i < segment_count / 2)
                buf_ptrs[i] = new rpc_chnk_id_t;
            else {
                buf_ptrs[i] = new char[buf_sizes[i]];
@@ -183,49 +204,41 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        // The data is not transferred. We directly access the data from the client on the same node
        ret = margo_bulk_access(in.bulk_handle, 0, bulk_size, HG_BULK_READ_ONLY, segment_count, buf_ptrs.data(),
                                buf_sizes.data(), &actual_count);
        if (ret != HG_SUCCESS || segment_count != actual_count)
        if (ret != HG_SUCCESS || segment_count != actual_count) {
            ADAFS_DATA->spdlogger()->error("{}() margo_bulk_access failed with ret {}", __func__, ret);
            return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
        }
    } else {
        // create bulk handle
        ret = margo_bulk_create(mid, segment_count, buf_ptrs.data(), buf_sizes.data(), HG_BULK_WRITE_ONLY,
                                &bulk_handle);
        if (ret == HG_SUCCESS) {
        if (ret != HG_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
            return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
        }
        // pull data from client here
        ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, bulk_size);
            if (ret != HG_SUCCESS)
                ADAFS_DATA->spdlogger()->error("Failed to pull data from client in write operation");
            margo_bulk_free(bulk_handle);
        if (ret != HG_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() Failed to pull data from client", __func__);
            return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
        }
    }

    // do write operation if all is good
    if (ret == HG_SUCCESS) {
    out.res = write_chunks(in.path, buf_ptrs, buf_sizes, out.io_size);
    if (out.res != 0) {
            ADAFS_DATA->spdlogger()->error("Failed to write data to local disk.");
            out.io_size = 0;
        }
    } else {
        ADAFS_DATA->spdlogger()->error("Failed to create bulk handle in write operation");
        out.res = EIO;
        out.io_size = 0;
    }

    ADAFS_DATA->spdlogger()->debug("Sending output response {}", out.res);
    auto hret = margo_respond(handle, &out);
    if (hret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("Failed to respond to write request");
        ADAFS_DATA->spdlogger()->error("{}() Failed to write data to local disk.");
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }

    // Destroy handle when finished
    margo_free_input(handle, &in);
    margo_destroy(handle);
    // respond and cleanup
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);

    // free memory in buf_ptrs
    // On a local operation the data is owned by the client who is responsible to free its buffers
    if (!local_write) {
        for (size_t i = 0; i < segment_count; i++) {
            if (i % 2 == 0)
            if (i < segment_count / 2)
                delete static_cast<rpc_chnk_id_t*>(buf_ptrs[i]);
            else {
                delete[] static_cast<char*>(buf_ptrs[i]);
+7 −7

File changed.

Preview size limit exceeded, changes collapsed.