Commit 3df9a6e2 authored by Marc Vef's avatar Marc Vef
Browse files

Set -DMERCURY_USE_EAGER_BULK:BOOL=OFF in Mercury configuration.

Modified fs code to circumvent Mercury bug with Eager mode.
Migrated to proper use of margo_bulk_access and use it for memory allocation.
parent f8bf573f
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -93,9 +93,10 @@ echo "Installing Mercury"
CURR=$GIT/mercury
prepare_build_dir $CURR
cd $CURR/build
# XXX Note: USE_EAGER_BULK is temporarily disabled due to bugs in Mercury with smaller amounts of data
cmake -DMERCURY_USE_SELF_FORWARD:BOOL=ON -DMERCURY_USE_CHECKSUMS:BOOL=OFF -DBUILD_TESTING:BOOL=ON \
-DMERCURY_USE_BOOST_PP:BOOL=ON -DBUILD_SHARED_LIBS:BOOL=ON -DCMAKE_INSTALL_PREFIX=$INSTALL \
-DCMAKE_BUILD_TYPE:STRING=Release $USE_BMI $USE_CCI $USE_OFI ../  || exit 1
-DCMAKE_BUILD_TYPE:STRING=Release -DMERCURY_USE_EAGER_BULK:BOOL=OFF $USE_BMI $USE_CCI $USE_OFI ../  || exit 1
make -j8  || exit 1
make install  || exit 1

+1 −2
Original line number Diff line number Diff line
@@ -155,8 +155,7 @@ void rpc_send_read_abt(void* _arg) {
    in.size = arg->in_size;
    in.offset = arg->in_offset;

    // TODO bulk_access on the handler site doesn't work with HG_BULK_READWRITE, This is why we force rpc for now ...
    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, in.path, handle, svr_addr, true);
    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, in.path, handle, svr_addr, false);

    auto used_mid = margo_hg_handle_get_instance(handle);
    /* register local target buffer for bulk access */
+54 −117
Original line number Diff line number Diff line
@@ -52,42 +52,20 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
            chnk_size += buf_sizes[i];
        }
    }
    // allocate memory for bulk transfer
    // array of pointers for bulk transfer (allocated in bulk_create)
    vector<void*> buf_ptrs(segment_count);
    // On a local operation the buffers are allocated in the client on the same node.
    // Hence no memory allocation is necessary
    if (!local_read) {
        for (size_t i = 0; i < segment_count; i++) {
            if (i < segment_count / 2)
                buf_ptrs[i] = new rpc_chnk_id_t;
            else {
                buf_ptrs[i] = new char[buf_sizes[i]];
            }
        }
    // create bulk handle for data transfer (buffers are allocated internally)
    ret = margo_bulk_create(mid, segment_count, nullptr, buf_sizes.data(), HG_BULK_READWRITE, &bulk_handle);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    }
    if (local_read) {
        // TODO bulk access readwrite doesn't work for some reason ... Section unfinished
    // access the internally allocated memory buffer and put it into buf_ptrs
    uint32_t actual_count;
        // The data is not transferred. We directly access the data from the client on the same node
        ret = margo_bulk_access(in.bulk_handle, 0, bulk_size, HG_BULK_READWRITE, segment_count, buf_ptrs.data(),
    ret = margo_bulk_access(bulk_handle, 0, bulk_size, HG_BULK_READWRITE, segment_count, buf_ptrs.data(),
                            buf_sizes.data(), &actual_count);
        if (ret != HG_SUCCESS || segment_count != actual_count)
            ADAFS_DATA->spdlogger()->error("{}() margo_bulk_access failed with ret {}", __func__, ret);
        // read the data
        err = read_chunks(in.path, buf_ptrs, buf_sizes, read_size);

        if (err != 0 || in.size != read_size) {
            out.res = err;
            ADAFS_DATA->spdlogger()->error("{}() Failed to read chunks on path {}", __func__, in.path);
            return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
        }

    } else {
        // create bulk handle for data transfer
        ret = margo_bulk_create(mid, segment_count, buf_ptrs.data(), buf_sizes.data(), HG_BULK_READWRITE, &bulk_handle);

    if (ret != HG_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated memory segments for bulk transfer", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    }

@@ -113,27 +91,14 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        ADAFS_DATA->spdlogger()->error("{}() Failed push the data to the client in read operation", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    }


    out.res = 0;
    out.io_size = read_size;

    //cleanup
    ADAFS_DATA->spdlogger()->debug(", __func__{}() Sending output response {}", out.res);
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    // free memory in buf_ptrs
    // On a local operation the data is owned by the client who is responsible to free its buffers
    if (!local_read) {
        for (size_t i = 0; i < segment_count; i++) {
            if (i < segment_count / 2)
                delete static_cast<rpc_chnk_id_t*>(buf_ptrs[i]);
            else {
                delete[] static_cast<char*>(buf_ptrs[i]);
            }
        }

    }
    return ret;
}

@@ -167,7 +132,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
                                       bulk_size, in.offset);


    // set buffer sizes
    // set buffer sizes information
    vector<hg_size_t> buf_sizes(segment_count);
    size_t chnk_size = 0;
    size_t id_size = 0;
@@ -185,36 +150,21 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
            chnk_size += buf_sizes[i];
        }
    }
    // allocate memory for bulk transfer
    // array of pointers for bulk transfer (allocated in margo_bulk_create)
    vector<void*> buf_ptrs(segment_count);
    // On a local operation the buffers are allocated in the client on the same node.
    // Hence no memory allocation is necessary
    if (!local_write) {
        for (size_t i = 0; i < segment_count; i++) {
            if (i < segment_count / 2)
                buf_ptrs[i] = new rpc_chnk_id_t;
            else {
                buf_ptrs[i] = new char[buf_sizes[i]];
            }
        }
    // create bulk handle and allocated memory for buffer with buf_sizes information
    ret = margo_bulk_create(mid, segment_count, nullptr, buf_sizes.data(), HG_BULK_WRITE_ONLY, &bulk_handle);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
    }
    // If local operation the data does not need to be transferred. We just need access to the data ptrs
    if (local_write) {
    // access the internally allocated memory buffer and put it into buf_ptrs
    uint32_t actual_count;
        // The data is not transferred. We directly access the data from the client on the same node
        ret = margo_bulk_access(in.bulk_handle, 0, bulk_size, HG_BULK_READ_ONLY, segment_count, buf_ptrs.data(),
    ret = margo_bulk_access(bulk_handle, 0, bulk_size, HG_BULK_READWRITE, segment_count, buf_ptrs.data(),
                            buf_sizes.data(), &actual_count);
        if (ret != HG_SUCCESS || segment_count != actual_count) {
            ADAFS_DATA->spdlogger()->error("{}() margo_bulk_access failed with ret {}", __func__, ret);
            return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
        }
    } else {
        // create bulk handle
        ret = margo_bulk_create(mid, segment_count, buf_ptrs.data(), buf_sizes.data(), HG_BULK_WRITE_ONLY,
                                &bulk_handle);
    if (ret != HG_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
            return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    // pull data from client here
    ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, bulk_size);
@@ -222,7 +172,6 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to pull data from client", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    }

    // do write operation if all is good
    out.res = write_chunks(in.path, buf_ptrs, buf_sizes, in.offset, out.io_size);
@@ -234,18 +183,6 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);

    // free memory in buf_ptrs
    // On a local operation the data is owned by the client who is responsible to free its buffers
    if (!local_write) {
        for (size_t i = 0; i < segment_count; i++) {
            if (i < segment_count / 2)
                delete static_cast<rpc_chnk_id_t*>(buf_ptrs[i]);
            else {
                delete[] static_cast<char*>(buf_ptrs[i]);
            }
        }

    }
    return HG_SUCCESS;
}

+9 −9
Original line number Diff line number Diff line
@@ -8,13 +8,13 @@ using namespace std;

int main(int argc, char* argv[]) {

    char buf[] = "lefthyblubber";
//    char buf[] = "13333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333311567892222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222345671\n";
//    char buf[] = "lefthyblubber";
    char buf[] = "13333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333311567892222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222345671\n";
    string p = "/tmp/mountdir/file"s;
    auto fd = open(p.c_str(), O_CREAT | O_WRONLY, 0777);
//    auto fd = open(p.c_str(), O_CREAT | O_WRONLY, 0777);
//    auto nw = write(fd, &buf, strlen(buf));
    auto nw = pwrite(fd, &buf, strlen(buf), 395);
    close(fd);
////    auto nw = pwrite(fd, &buf, strlen(buf), 395);
//    close(fd);

//    char buf_a[] = "456esta\n";
//    auto fd_a = open(p.c_str(), O_WRONLY | O_APPEND, 0777);
@@ -22,10 +22,10 @@ int main(int argc, char* argv[]) {
//    close(fd);
//
    char read_buf[] = "99999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999999\n";
//    auto fd = open(p.c_str(), O_RDONLY, 0777);
//    auto rs = read(fd, &read_buf, strlen(buf));
//    printf("buffer read: %s\n", read_buf);
//    close(fd);
    auto fd = open(p.c_str(), O_RDONLY, 0777);
    auto rs = read(fd, &read_buf, strlen(buf));
    printf("buffer read: %s\n", read_buf);
    close(fd);

    //    auto fd2 = open("/tmp/rootdir/data/chunks/file/data2", O_RDONLY, 0777);
//    char buf_read2[9] = {0};