Commit f165612d authored by Marc Vef's avatar Marc Vef
Browse files

Preload: Preventing potential seg faults in write/read

parent 7128c21a
Loading
Loading
Loading
Loading
+18 −20
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
    auto dest_n = dest_idx.size();
    vector<hg_handle_t> rpc_handles(dest_n);
    vector<margo_request> rpc_waiters(dest_n);
    vector<rpc_write_data_in_t> rpc_in(dest_n);
    // register local target buffer for bulk access for IPC and RPC margo instance
    auto bulk_buf = const_cast<void*>(buf);
    hg_bulk_t ipc_bulk_handle = nullptr;
@@ -67,19 +68,17 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl

        // RPC
        auto chnk_ids = &dest_ids[dest_idx[i]];
        rpc_write_data_in_t in{};
        // fill in
        in.path = path.c_str();
        in.offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        in.chunk_n = chnk_ids->size(); // number of chunks handled by that destination
        in.chunk_start = chnk_start; // chunk start id of this write
        in.chunk_end = chnk_end; // chunk end id of this write
        in.total_chunk_size = total_chunk_size; // total size to write
        in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        // TODO remove svr_addr from this call
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], false);

        ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]);
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             dest_idx[i]);
@@ -154,6 +153,7 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
    auto dest_n = dest_idx.size();
    vector<hg_handle_t> rpc_handles(dest_n);
    vector<margo_request> rpc_waiters(dest_n);
    vector<rpc_read_data_in_t> rpc_in(dest_n);
    // register local target buffer for bulk access for IPC and RPC margo instance
    auto bulk_buf = buf;
    hg_bulk_t ipc_bulk_handle = nullptr;
@@ -181,19 +181,17 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const

        // RPC
        auto chnk_ids = &dest_ids[dest_idx[i]];
        rpc_read_data_in_t in{};
        // fill in
        in.path = path.c_str();
        in.offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        in.chunk_n = chnk_ids->size(); // number of chunks handled by that destination
        in.chunk_start = chnk_start; // chunk start id of this write
        in.chunk_end = chnk_end; // chunk end id of this write
        in.total_chunk_size = total_chunk_size; // total size to write
        in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        // TODO remove svr_addr from this call
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], false);

        ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]);
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             dest_idx[i]);