Commit b57e6328 authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch 'misaligned_ops' into 'master'

Fix misaligned operations

See merge request zdvresearch_bsc/adafs!28
parents d942ca36 98abc985
Loading
Loading
Loading
Loading
+88 −0
Original line number Diff line number Diff line
/**
 * Compute the base2 logarithm for 64 bit integers
 */
inline int log2(uint64_t n){

    /* see http://stackoverflow.com/questions/11376288/fast-computing-of-log2-for-64-bit-integers */
    static const int table[64] = {
        0, 58, 1, 59, 47, 53, 2, 60, 39, 48, 27, 54, 33, 42, 3, 61,
        51, 37, 40, 49, 18, 28, 20, 55, 30, 34, 11, 43, 14, 22, 4, 62,
        57, 46, 52, 38, 26, 32, 41, 50, 36, 17, 19, 29, 10, 13, 21, 56,
        45, 25, 31, 35, 16, 9, 12, 44, 24, 15, 8, 23, 7, 6, 5, 63 };

    assert(n > 0);

    n |= n >> 1;
    n |= n >> 2;
    n |= n >> 4;
    n |= n >> 8;
    n |= n >> 16;
    n |= n >> 32;

    return table[(n * 0x03f6eaf2cd271461) >> 58];
}


/**
 * Align an @offset to the closest left side block boundary
 */
inline off64_t lalign(const off64_t offset, const size_t block_size) {
    return offset & ~(block_size - 1);
}


/**
 * Align an @offset to the closest right side block boundary
 */
inline off64_t ralign(const off64_t offset, const size_t block_size) {
    return lalign(offset + block_size, block_size);
}


/**
 * Return the padding (bytes) that separates the @offset from the closest
 * left side block boundary
 *
 * If @offset is a boundary the resulting padding will be 0
 */
inline size_t lpad(const off64_t offset, const size_t block_size) {
    return offset % block_size;
}


/**
 * Return the padding (bytes) that separates the @offset from the closest
 * right side block boundary
 *
 * If @offset is a boundary the resulting padding will be 0
 */
inline size_t rpad(const off64_t offset, const size_t block_size) {
    return (-offset) % block_size;
}


/**
 * Given an @offset calculates the block number to which the @offset belongs
 *
 * block_num(8,4) = 2;
 * block_num(7,4) = 1;
 * block_num(2,4) = 0;
 * block_num(0,4) = 0;
 */
inline uint64_t block_num(const off64_t offset, const size_t block_size){
    return lalign(offset, block_size) >> log2(block_size);
}


/**
 * Return the number of blocks involved in an operation that operates
 * from @offset for a certain amount of bytes (@count).
 */
inline uint64_t blocks_count(const off64_t offset, const size_t count, const size_t block_size) {

    off64_t block_start = lalign(offset, block_size);
    off64_t block_end = lalign(offset + count - 1, block_size);

    return (block_end >> log2(block_size)) -
        (block_start >> log2(block_size)) + 1;
}
+1 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ set(PRELOAD_HEADERS
    ../../include/global/rpc/ipc_types.hpp
    ../../include/global/rpc/rpc_types.hpp
    ../../include/global/rpc/rpc_utils.hpp
    ../../include/global/blocks_calc_util.hpp
    ../../include/preload/adafs_functions.hpp
    ../../include/preload/margo_ipc.hpp
    ../../include/preload/open_file_map.hpp
+58 −63
Original line number Diff line number Diff line

#include <preload/rpc/ld_rpc_data_ws.hpp>
#include <global/rpc/rpc_utils.hpp>

#include <global/blocks_calc_util.hpp>

using namespace std;

@@ -18,11 +18,8 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
    if (append_flag)
        offset = updated_metadentry_size - write_size;

    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    // last chunk number (right-open) [chnk_start,chnk_end)
    auto chnk_end = static_cast<uint64_t>((offset + write_size) / CHUNKSIZE + 1);
    if ((offset + write_size) % CHUNKSIZE == 0)
        chnk_end--;
    auto chnk_start = block_num(offset, CHUNKSIZE);
    auto chnk_end = block_num((offset + write_size) - 1, CHUNKSIZE);

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> target_chnks{};
@@ -31,7 +28,7 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
    // targets for the first and last chunk as they need special treatment
    uint64_t chnk_start_target = 0;
    uint64_t chnk_end_target = 0;
    for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) {
    for (uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
        auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size);
        if (target_chnks.count(target) == 0) {
            target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id}));
@@ -41,7 +38,7 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
        // set first and last chnk targets
        if (chnk_id == chnk_start)
            chnk_start_target = target;
        if (chnk_id == chnk_end - 1)
        if (chnk_id == chnk_end)
            chnk_end_target = target;
    }
    // some helper variables for async RPC
@@ -67,29 +64,29 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
        return -1;
    }
    // Issue non-blocking RPC requests and wait for the result later
    for (uint64_t target = 0; target < target_n; target++) {
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; // total chunk_size for target
    for (uint64_t i = 0; i < target_n; i++) {
        auto target = targets[i];
        auto total_chunk_size = target_chnks[target].size() * CHUNKSIZE; // total chunk_size for target
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (target == chnk_end_target &&
            ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE));
            total_chunk_size -= lpad(offset, CHUNKSIZE);
        if (target == chnk_end_target) // receiver of last chunk must subtract
            total_chunk_size -= rpad(offset + write_size, CHUNKSIZE);
        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination
        rpc_in[target].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[target].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[target].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_write_data_id, rpc_write_data_id, targets[target], rpc_handles[target], false);
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = lpad(offset, CHUNKSIZE);// first offset in targets is the chunk with a potential offset
        rpc_in[i].chunk_n = target_chnks[target].size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (target == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_write_data_id, rpc_write_data_id, target, rpc_handles[i], false);
        // Send RPC
        ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]);
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             targets[target]);
                             target);
            errno = EBUSY;
            for (uint64_t j = 0; j < target + 1; j++) {
            for (uint64_t j = 0; j < i + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            // free bulk handles for buffer
@@ -103,28 +100,28 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    for (uint64_t target = 0; target < target_n; target++) {
    for (unsigned int i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[target]);
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             targets[target]);
                             targets[i]);
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[target], &out);
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]);
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[i]);
            err = -1;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[target], &out);
        margo_destroy(rpc_handles[target]);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);
@@ -137,11 +134,8 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
 */
ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const size_t read_size) {
    // Calculate chunkid boundaries and numbers so that daemons know in which interval to look for chunks
    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    // last chunk number (right-open) [chnk_start,chnk_end)
    auto chnk_end = static_cast<uint64_t>((offset + read_size) / CHUNKSIZE + 1);
    if ((offset + read_size) % CHUNKSIZE == 0)
        chnk_end--;
    auto chnk_start = block_num(offset, CHUNKSIZE); // first chunk number
    auto chnk_end = block_num((offset + read_size - 1), CHUNKSIZE);

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> target_chnks{};
@@ -150,7 +144,7 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
    // targets for the first and last chunk as they need special treatment
    uint64_t chnk_start_target = 0;
    uint64_t chnk_end_target = 0;
    for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) {
    for (uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
        auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size);
        if (target_chnks.count(target) == 0) {
            target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id}));
@@ -160,7 +154,7 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
        // set first and last chnk targets
        if (chnk_id == chnk_start)
            chnk_start_target = target;
        if (chnk_id == chnk_end - 1)
        if (chnk_id == chnk_end)
            chnk_end_target = target;
    }
    // some helper variables for async RPC
@@ -186,29 +180,30 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
        return -1;
    }
    // Issue non-blocking RPC requests and wait for the result later
    for (uint64_t target = 0; target < target_n; target++) {
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE;
    for (unsigned int i = 0; i < target_n; i++) {
        auto target = targets[i];
        auto total_chunk_size = target_chnks[target].size() * CHUNKSIZE;
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (target == chnk_end_target &&
            ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE));
            total_chunk_size -= lpad(offset, CHUNKSIZE);
        if (target == chnk_end_target) // receiver of last chunk must subtract
            total_chunk_size -= rpad(offset + read_size, CHUNKSIZE);

        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
        rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination
        rpc_in[target].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[target].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[target].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_read_data_id, rpc_read_data_id, targets[target], rpc_handles[target], false);
        rpc_in[i].path = path.c_str();
        rpc_in[i].offset = lpad(offset, CHUNKSIZE);// first offset in targets is the chunk with a potential offset
        rpc_in[i].chunk_n = target_chnks[target].size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = (target == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle;
        margo_create_wrap(ipc_read_data_id, rpc_read_data_id, target, rpc_handles[i], false);
        // Send RPC
        ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]);
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
                             targets[target]);
                             target);
            errno = EBUSY;
            for (uint64_t j = 0; j < target + 1; j++) {
            for (uint64_t j = 0; j < i + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            // free bulk handles for buffer
@@ -222,28 +217,28 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    for (uint64_t target = 0; target < target_n; target++) {
    for (unsigned int i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[target]);
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             targets[target]);
                             targets[i]);
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[target], &out);
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret != HG_SUCCESS) {
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]);
            ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[i]);
            err = -1;
        }
        ld_logger->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[target], &out);
        margo_destroy(rpc_handles[target]);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);