Commit f101aaa9 authored by Marc Vef's avatar Marc Vef
Browse files

Node destination hashing is now a global function

parent c3685119
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -51,4 +51,8 @@ hg_bool_t bool_to_merc_bool(bool state);

bool is_handle_sm(margo_instance_id mid, const hg_addr_t& addr);

size_t adafs_hash_path(const std::string& to_hash, const size_t host_size);

size_t adafs_hash_path_chunk(const std::string& path, const size_t chunk_id, const size_t host_size);

#endif //IFS_RPC_UTILS_HPP
+0 −2
Original line number Diff line number Diff line
@@ -130,8 +130,6 @@ bool read_system_hostfile();

bool get_addr_by_hostid(uint64_t hostid, hg_addr_t& svr_addr);

size_t get_rpc_node(const std::string& to_hash);

bool is_local_op(size_t recipient);

template<typename T>
+2 −12
Original line number Diff line number Diff line
@@ -6,16 +6,6 @@

using namespace std;

/**
 * Determines the node id for a given path
 * @param to_hash
 * @return
 */
size_t get_rpc_node(const string& to_hash) {
    //TODO can this be a shared function?
    return std::hash<string>{}(to_hash) % ADAFS_DATA->host_size();
}

/**
 * Free Argobots tasks and eventual constructs in a given vector until max_idx.
 * Nothing is done for a vector if nullptr is given
@@ -115,7 +105,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    // Start to look for a chunk that hashes to this host with the first chunk in the buffer
    for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
        // Continue if chunk does not hash to this host
        if (get_rpc_node(in.path + fmt::FormatInt(chnk_id_file).str()) != ADAFS_DATA->host_id())
        if (adafs_hash_path_chunk(in.path, chnk_id_file, ADAFS_DATA->host_size()) != ADAFS_DATA->host_id())
            continue;
        chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
        // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host
@@ -305,7 +295,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    // Start to look for a chunk that hashes to this host with the first chunk in the buffer
    for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
        // Continue if chunk does not hash to this host
        if (get_rpc_node(in.path + fmt::FormatInt(chnk_id_file).str()) != ADAFS_DATA->host_id())
        if (adafs_hash_path_chunk(in.path, chnk_id_file, ADAFS_DATA->host_size()) != ADAFS_DATA->host_id())
            continue;
        chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
        // Only relevant in the first iteration of the loop and if the chunk hashes to this host
+20 −0
Original line number Diff line number Diff line

#include <global/rpc/rpc_utils.hpp>
#include <extern/spdlog/fmt/fmt.h>

using namespace std;

@@ -26,3 +27,22 @@ bool is_handle_sm(margo_instance_id mid, const hg_addr_t& addr) {
    string addr_str(addr_cstr);
    return addr_str.substr(0, 5) == "na+sm";
}

/**
 * Determines the node id for a given path
 * @param to_hash
 * @return
 */
size_t adafs_hash_path(const string& to_hash, const size_t host_size) {
    return std::hash<string>{}(to_hash) % host_size;
}

/**
 * Determines the node id for a given path and chnk id. Wraps adafs_hash_path()
 * @param path
 * @return
 */
size_t adafs_hash_path_chunk(const string& path, const size_t chunk_id, const size_t host_size) {
    auto to_hash = path + fmt::FormatInt(chunk_id).str();
    return adafs_hash_path(to_hash, host_size);
}
 No newline at end of file
+6 −5
Original line number Diff line number Diff line
#include <preload/adafs_functions.hpp>
#include <preload/rpc/ld_rpc_metadentry.hpp>
#include <preload/rpc/ld_rpc_data_ws.hpp>
#include <global/rpc/rpc_utils.hpp>

using namespace std;

@@ -189,8 +190,8 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
    map<uint64_t, vector<uint64_t>> dest_ids{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<uint64_t> dest_idx{};
    for (auto i = chnk_start; i < chnk_end; i++) {
        auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
    for (uint64_t i = chnk_start; i < chnk_end; i++) {
        auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size);
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
@@ -259,11 +260,11 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
    if ((offset + count) % CHUNKSIZE == 0)
        chnk_end--;
    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    vector<uint64_t> dest_idx{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    map<uint64_t, vector<uint64_t>> dest_ids{};
    // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
    vector<uint64_t> dest_idx{};
    for (uint64_t i = chnk_start; i < chnk_end; i++) {
        auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
        auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size);
        if (dest_ids.count(recipient) == 0) {
            dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
            dest_idx.push_back(recipient);
Loading