Unverified Commit 4a868f35 authored by Tommaso Tocci's avatar Tommaso Tocci
Browse files

bugfix: wrong usage of cycle index

target was the index of the cycle and target[target] was the actual
identifier of the target. In some part of the code target was considered
as the identifier directly.

Now the index of the cycle is i and target is the actual identifier of
the target.
parent 8c91b9cf
Loading
Loading
Loading
Loading
+42 −40
Original line number Diff line number Diff line
@@ -64,28 +64,29 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
        return -1;
    }
    // Issue non-blocking RPC requests and wait for the result later
    for (uint64_t target = 0; target < target_n; target++) {
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; // total chunk_size for target
    for (uint64_t i = 0; i < target_n; i++) {
        auto target = targets[i];
        auto total_chunk_size = target_chnks[target].size() * CHUNKSIZE; // total chunk_size for target
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (target == chnk_end_target) // receiver of last chunk must subtract
            total_chunk_size -= ((-(offset + write_size)) % CHUNKSIZE);
        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination
        rpc_in[target].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[target].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[target].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_write_data_id, rpc_write_data_id, targets[target], rpc_handles[target], false);
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[i].chunk_n = target_chnks[target].size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (target == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_write_data_id, rpc_write_data_id, target, rpc_handles[i], false);
        // Send RPC
        ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]);
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             targets[target]);
                             target);
            errno = EBUSY;
            for (uint64_t j = 0; j < target + 1; j++) {
            for (uint64_t j = 0; j < i + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            // free bulk handles for buffer
@@ -99,28 +100,28 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    for (uint64_t target = 0; target < target_n; target++) {
    for (unsigned int i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[target]);
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             targets[target]);
                             targets[i]);
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[target], &out);
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]);
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[i]);
            err = -1;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[target], &out);
        margo_destroy(rpc_handles[target]);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);
@@ -179,29 +180,30 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
        return -1;
    }
    // Issue non-blocking RPC requests and wait for the result later
    for (uint64_t target = 0; target < target_n; target++) {
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE;
    for (unsigned int i = 0; i < target_n; i++) {
        auto target = targets[i];
        auto total_chunk_size = target_chnks[target].size() * CHUNKSIZE;
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (target == chnk_end_target) // receiver of last chunk must subtract
            total_chunk_size -= ((-(offset + read_size)) % CHUNKSIZE);

        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination
        rpc_in[target].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[target].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[target].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_read_data_id, rpc_read_data_id, targets[target], rpc_handles[target], false);
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[i].chunk_n = target_chnks[target].size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (target == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_read_data_id, rpc_read_data_id, target, rpc_handles[i], false);
        // Send RPC
        ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]);
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             targets[target]);
                             target);
            errno = EBUSY;
            for (uint64_t j = 0; j < target + 1; j++) {
            for (uint64_t j = 0; j < i + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            // free bulk handles for buffer
@@ -215,28 +217,28 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    for (uint64_t target = 0; target < target_n; target++) {
    for (unsigned int i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[target]);
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             targets[target]);
                             targets[i]);
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[target], &out);
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]);
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[i]);
            err = -1;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[target], &out);
        margo_destroy(rpc_handles[target]);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);