Commit d3249536 authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch 'fix_IO_naming' into 'master'

Preload: Better naming in IO process + chnk_end target fix

See merge request zdvresearch_bsc/adafs!25
parents dcefd2a7 aec6afde
Loading
Loading
Loading
Loading
+92 −80
Original line number Diff line number Diff line
@@ -25,22 +25,30 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
        chnk_end--;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> dest_ids{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<uint64_t> dest_idx{};
    for (uint64_t i = chnk_start; i < chnk_end; i++) {
        auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size);
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
    map<uint64_t, vector<uint64_t>> target_chnks{};
    // contains the target ids, used to access the target_chnks map. First idx is chunk with potential offset
    vector<uint64_t> targets{};
    // targets for the first and last chunk as they need special treatment
    uint64_t chnk_start_target = 0;
    uint64_t chnk_end_target = 0;
    for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) {
        auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size);
        if (target_chnks.count(target) == 0) {
            target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id}));
            targets.push_back(target);
        } else
            dest_ids[recipient].push_back(i);
            target_chnks[target].push_back(chnk_id);
        // set first and last chnk targets
        if (chnk_id == chnk_start)
            chnk_start_target = target;
        if (chnk_id == chnk_end - 1)
            chnk_end_target = target;
    }
    // some helper variables for async RPC
    auto dest_n = dest_idx.size();
    vector<hg_handle_t> rpc_handles(dest_n);
    vector<margo_request> rpc_waiters(dest_n);
    vector<rpc_write_data_in_t> rpc_in(dest_n);
    auto target_n = targets.size();
    vector<hg_handle_t> rpc_handles(target_n);
    vector<margo_request> rpc_waiters(target_n);
    vector<rpc_write_data_in_t> rpc_in(target_n);
    // register local target buffer for bulk access for IPC and RPC margo instance
    auto bulk_buf = const_cast<void*>(buf);
    hg_bulk_t ipc_bulk_handle = nullptr;
@@ -59,31 +67,29 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
        return -1;
    }
    // Issue non-blocking RPC requests and wait for the result later
    for (uint64_t i = 0; i < dest_n; i++) {
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
    for (uint64_t target = 0; target < target_n; target++) {
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; // total chunk_size for target
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (i == dest_n - 1 && ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
        if (target == chnk_end_target &&
            ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE));

        // RPC
        auto chnk_ids = &dest_ids[dest_idx[i]];
        // fill in
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], false);

        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination
        rpc_in[target].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[target].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[target].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_write_data_id, rpc_write_data_id, targets[target], rpc_handles[target], false);
        // Send RPC
        ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             dest_idx[i]);
                             targets[target]);
            errno = EBUSY;
            for (uint64_t j = 0; j < i + 1; j++) {
            for (uint64_t j = 0; j < target + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            // free bulk handles for buffer
@@ -97,28 +103,28 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    for (uint64_t i = 0; i < dest_n; i++) {
    for (uint64_t target = 0; target < target_n; target++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[i]);
        ret = margo_wait(rpc_waiters[target]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             dest_idx[i]);
                             targets[target]);
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[i], &out);
        ret = margo_get_output(rpc_handles[target], &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]);
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]);
            err = -1;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
        margo_free_output(rpc_handles[target], &out);
        margo_destroy(rpc_handles[target]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);
@@ -138,22 +144,30 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
        chnk_end--;

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> dest_ids{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<uint64_t> dest_idx{};
    for (uint64_t i = chnk_start; i < chnk_end; i++) {
        auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size);
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
    map<uint64_t, vector<uint64_t>> target_chnks{};
    // contains the recipient ids, used to access the target_chnks map. First idx is chunk with potential offset
    vector<uint64_t> targets{};
    // targets for the first and last chunk as they need special treatment
    uint64_t chnk_start_target = 0;
    uint64_t chnk_end_target = 0;
    for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) {
        auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size);
        if (target_chnks.count(target) == 0) {
            target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id}));
            targets.push_back(target);
        } else
            dest_ids[recipient].push_back(i);
            target_chnks[target].push_back(chnk_id);
        // set first and last chnk targets
        if (chnk_id == chnk_start)
            chnk_start_target = target;
        if (chnk_id == chnk_end - 1)
            chnk_end_target = target;
    }
    // some helper variables for async RPC
    auto dest_n = dest_idx.size();
    vector<hg_handle_t> rpc_handles(dest_n);
    vector<margo_request> rpc_waiters(dest_n);
    vector<rpc_read_data_in_t> rpc_in(dest_n);
    auto target_n = targets.size();
    vector<hg_handle_t> rpc_handles(target_n);
    vector<margo_request> rpc_waiters(target_n);
    vector<rpc_read_data_in_t> rpc_in(target_n);
    // register local target buffer for bulk access for IPC and RPC margo instance
    auto bulk_buf = buf;
    hg_bulk_t ipc_bulk_handle = nullptr;
@@ -172,31 +186,29 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
        return -1;
    }
    // Issue non-blocking RPC requests and wait for the result later
    for (uint64_t i = 0; i < dest_n; i++) {
        auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
        if (i == 0) // receiver of first chunk must subtract the offset from first chunk
    for (uint64_t target = 0; target < target_n; target++) {
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE;
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (i == dest_n - 1 && ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
        if (target == chnk_end_target &&
            ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE));

        // RPC
        auto chnk_ids = &dest_ids[dest_idx[i]];
        // fill in
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], false);

        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination
        rpc_in[target].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[target].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[target].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_read_data_id, rpc_read_data_id, targets[target], rpc_handles[target], false);
        // Send RPC
        ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             dest_idx[i]);
                             targets[target]);
            errno = EBUSY;
            for (uint64_t j = 0; j < i + 1; j++) {
            for (uint64_t j = 0; j < target + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            // free bulk handles for buffer
@@ -210,28 +222,28 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    for (uint64_t i = 0; i < dest_n; i++) {
    for (uint64_t target = 0; target < target_n; target++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[i]);
        ret = margo_wait(rpc_waiters[target]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             dest_idx[i]);
                             targets[target]);
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[i], &out);
        ret = margo_get_output(rpc_handles[target], &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]);
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]);
            err = -1;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
        margo_free_output(rpc_handles[target], &out);
        margo_destroy(rpc_handles[target]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);