Unverified Commit be9d15eb authored by Tommaso Tocci's avatar Tommaso Tocci
Browse files

bugfix: blocks offset and count calculus

The calculus (on clients) about the last chunk involved into a
write/read operation was bugged.

A new blocks_calc_util library has been introduced in order to make
easier to perform this kind of calculus.
parent d942ca36
Loading
Loading
Loading
Loading
+66 −0
Original line number Diff line number Diff line
/**
 * Compute the base2 logarithm for 64 bit integers
 */
inline int log2(uint64_t n){

    /* see http://stackoverflow.com/questions/11376288/fast-computing-of-log2-for-64-bit-integers */
    static const int table[64] = {
        0, 58, 1, 59, 47, 53, 2, 60, 39, 48, 27, 54, 33, 42, 3, 61,
        51, 37, 40, 49, 18, 28, 20, 55, 30, 34, 11, 43, 14, 22, 4, 62,
        57, 46, 52, 38, 26, 32, 41, 50, 36, 17, 19, 29, 10, 13, 21, 56,
        45, 25, 31, 35, 16, 9, 12, 44, 24, 15, 8, 23, 7, 6, 5, 63 };

    assert(n > 0);

    n |= n >> 1;
    n |= n >> 2;
    n |= n >> 4;
    n |= n >> 8;
    n |= n >> 16;
    n |= n >> 32;

    return table[(n * 0x03f6eaf2cd271461) >> 58];
}


/**
 * Align an @offset to the closest left side block boundary
 */
inline off64_t lalign(const off64_t offset, const size_t block_size) {
    return offset & ~(block_size - 1);
}


/**
 * Align an @offset to the closest right side block boundary
 */
inline off64_t ralign(const off64_t offset, const size_t block_size) {
    return lalign(offset + block_size, block_size);
}


/**
 * Given an @offset calculates the block number to which the @offset belongs
 *
 * block_num(8,4) = 2;
 * block_num(7,4) = 1;
 * block_num(2,4) = 0;
 * block_num(0,4) = 0;
 */
inline uint64_t block_num(const off64_t offset, const size_t block_size){
    return lalign(offset, block_size) >> log2(block_size);
}


/**
 * Return the number of blocks involved in an operation that operates
 * from @offset for a certain amount of bytes (@count).
 */
inline uint64_t blocks_count(const off64_t offset, const size_t count, const size_t block_size) {

    off64_t block_start = lalign(offset, block_size);
    off64_t block_end = lalign(offset + count - 1, block_size);

    return (block_end >> log2(block_size)) -
        (block_start >> log2(block_size)) + 1;
}
+1 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ set(PRELOAD_HEADERS
    ../../include/global/rpc/ipc_types.hpp
    ../../include/global/rpc/rpc_types.hpp
    ../../include/global/rpc/rpc_utils.hpp
    ../../include/global/blocks_calc_util.hpp
    ../../include/preload/adafs_functions.hpp
    ../../include/preload/margo_ipc.hpp
    ../../include/preload/open_file_map.hpp
+10 −17
Original line number Diff line number Diff line

#include <preload/rpc/ld_rpc_data_ws.hpp>
#include <global/rpc/rpc_utils.hpp>

#include <global/blocks_calc_util.hpp>

using namespace std;

@@ -18,11 +18,8 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
    if (append_flag)
        offset = updated_metadentry_size - write_size;

    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    // last chunk number (right-open) [chnk_start,chnk_end)
    auto chnk_end = static_cast<uint64_t>((offset + write_size) / CHUNKSIZE + 1);
    if ((offset + write_size) % CHUNKSIZE == 0)
        chnk_end--;
    auto chnk_start = block_num(offset, CHUNKSIZE);
    auto chnk_end = block_num((offset + write_size) - 1, CHUNKSIZE);

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> target_chnks{};
@@ -71,9 +68,8 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; // total chunk_size for target
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (target == chnk_end_target &&
            ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE));
        if (target == chnk_end_target) // receiver of last chunk must subtract
            total_chunk_size -= ((-(offset + write_size)) % CHUNKSIZE);
        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset
@@ -137,11 +133,8 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl
 */
ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const size_t read_size) {
    // Calculate chunkid boundaries and numbers so that daemons know in which interval to look for chunks
    auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
    // last chunk number (right-open) [chnk_start,chnk_end)
    auto chnk_end = static_cast<uint64_t>((offset + read_size) / CHUNKSIZE + 1);
    if ((offset + read_size) % CHUNKSIZE == 0)
        chnk_end--;
    auto chnk_start = block_num(offset, CHUNKSIZE); // first chunk number
    auto chnk_end = block_num((offset + read_size - 1), CHUNKSIZE);

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> target_chnks{};
@@ -190,9 +183,9 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const
        auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE;
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
            total_chunk_size -= (offset % CHUNKSIZE);
        if (target == chnk_end_target &&
            ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
            total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE));
        if (target == chnk_end_target) // receiver of last chunk must subtract
            total_chunk_size -= ((-(offset + read_size)) % CHUNKSIZE);

        // Fill RPC input
        rpc_in[target].path = path.c_str();
        rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset