Unverified Commit 66905b81 authored by Tommaso Tocci's avatar Tommaso Tocci
Browse files

client pass information about hosts size

parent 32ee1a2e
Loading
Loading
Loading
Loading
+0 −7
Original line number Diff line number Diff line
@@ -20,7 +20,6 @@
/* Forward declarations */
class MetadataDB;
class ChunkStorage;
class Distributor;

#include <unordered_map>
#include <map>
@@ -60,8 +59,6 @@ private:
    std::shared_ptr<MetadataDB> mdb_;
    // Storage backend
    std::shared_ptr<ChunkStorage> storage_;
    // Distributor
    std::shared_ptr<Distributor> distributor_;

    // configurable metadata
    bool atime_state_;
@@ -124,10 +121,6 @@ public:

    void storage(const std::shared_ptr<ChunkStorage>& storage);

    void distributor(std::shared_ptr<Distributor> d);

    std::shared_ptr<Distributor> distributor() const;

    const std::string& hosts_raw() const;

    void hosts_raw(const std::string& hosts_raw);
+4 −0
Original line number Diff line number Diff line
@@ -82,6 +82,8 @@ MERCURY_GEN_PROC(rpc_mk_symlink_in_t,
MERCURY_GEN_PROC(rpc_read_data_in_t,
                 ((hg_const_string_t) (path))\
((int64_t) (offset))\
    ((hg_uint64_t) (host_id))\
    ((hg_uint64_t) (host_size))\
((hg_uint64_t) (chunk_n))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
@@ -95,6 +97,8 @@ MERCURY_GEN_PROC(rpc_data_out_t,
MERCURY_GEN_PROC(rpc_write_data_in_t,
                 ((hg_const_string_t) (path))\
((int64_t) (offset))\
    ((hg_uint64_t) (host_id))\
    ((hg_uint64_t) (host_size))\
((hg_uint64_t) (chunk_n))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
+4 −0
Original line number Diff line number Diff line
@@ -89,6 +89,8 @@ ssize_t write(const string& path, const void* buf, const bool append_flag, const
            total_chunk_size -= chnk_rpad(offset + write_size, CHUNKSIZE);
        // Fill RPC input
        rpc_in[i].path = path.c_str();
        rpc_in[i].host_id = target;
        rpc_in[i].host_size = CTX->fs_conf()->host_size;
        rpc_in[i].offset = chnk_lpad(offset, CHUNKSIZE);// first offset in targets is the chunk with a potential offset
        rpc_in[i].chunk_n = target_chnks[target].size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
@@ -199,6 +201,8 @@ ssize_t read(const string& path, void* buf, const off64_t offset, const size_t r

        // Fill RPC input
        rpc_in[i].path = path.c_str();
        rpc_in[i].host_id = target;
        rpc_in[i].host_size = CTX->fs_conf()->host_size;
        rpc_in[i].offset = chnk_lpad(offset, CHUNKSIZE);// first offset in targets is the chunk with a potential offset
        rpc_in[i].chunk_n = target_chnks[target].size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
+0 −8
Original line number Diff line number Diff line
@@ -75,14 +75,6 @@ void FsData::storage(const std::shared_ptr<ChunkStorage>& storage) {
    storage_ = storage;
}

void FsData::distributor(std::shared_ptr<Distributor> d) {
    distributor_ = d;
}

std::shared_ptr<Distributor> FsData::distributor() const {
    return distributor_;
}

const std::string& FsData::rootdir() const {
    return rootdir_;
}
+11 −3
Original line number Diff line number Diff line
@@ -160,6 +160,10 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    auto const host_id = in.host_id;
    auto const host_size = in.host_size;
    SimpleHashDistributor distributor(host_id, host_size);

    auto path = make_shared<string>(in.path);
    // chnk_ids used by this host
    vector<uint64_t> chnk_ids_host(in.chunk_n);
@@ -194,7 +198,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    // Start to look for a chunk that hashes to this host with the first chunk in the buffer
    for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
        // Continue if chunk does not hash to this host
        if (ADAFS_DATA->distributor()->locate_data(in.path, chnk_id_file) != ADAFS_DATA->host_id())
        if (distributor.locate_data(in.path, chnk_id_file) != host_id)
            continue;
        chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
        // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host
@@ -227,7 +231,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
                transfer_size = chnk_size_left_host;
            ADAFS_DATA->spdlogger()->trace(
                    "{}() BULK_TRANSFER hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}",
                    __func__, ADAFS_DATA->host_id(), in.path, chnk_id_file, in.total_chunk_size, chnk_size_left_host,
                    __func__, host_id, in.path, chnk_id_file, in.total_chunk_size, chnk_size_left_host,
                    origin_offset, local_offset, transfer_size);
            // RDMA the data to here
            ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, origin_offset,
@@ -353,6 +357,10 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
        return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    }
    auto const host_id = in.host_id;
    auto const host_size = in.host_size;
    SimpleHashDistributor distributor(host_id, host_size);
    
    auto path = make_shared<string>(in.path);
    // chnk_ids used by this host
    vector<uint64_t> chnk_ids_host(in.chunk_n);
@@ -379,7 +387,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    // Start to look for a chunk that hashes to this host with the first chunk in the buffer
    for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
        // Continue if chunk does not hash to this host
        if (ADAFS_DATA->distributor()->locate_data(in.path, chnk_id_file) != ADAFS_DATA->host_id())
        if (distributor.locate_data(in.path, chnk_id_file) != host_id)
            continue;
        chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
        // Only relevant in the first iteration of the loop and if the chunk hashes to this host
Loading