Program Listing for File distributor.cpp

Return to documentation for file (src/common/rpc/distributor.cpp)

/*
  Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This file is part of GekkoFS.

  GekkoFS is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  GekkoFS is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: GPL-3.0-or-later
*/

#include <common/rpc/distributor.hpp>

using namespace std;

namespace gkfs {

namespace rpc {

SimpleHashDistributor::SimpleHashDistributor(host_t localhost,
                                             unsigned int hosts_size)
    : localhost_(localhost), hosts_size_(hosts_size), all_hosts_(hosts_size) {
    ::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}

SimpleHashDistributor::SimpleHashDistributor() {}

host_t
SimpleHashDistributor::localhost() const {
    return localhost_;
}

host_t
SimpleHashDistributor::locate_data(const string& path,
                                   const chunkid_t& chnk_id) const {
    return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
}

host_t
SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
                                   unsigned int hosts_size) {
    if(hosts_size_ != hosts_size) {
        hosts_size_ = hosts_size;
        all_hosts_ = std::vector<unsigned int>(hosts_size);
        ::iota(all_hosts_.begin(), all_hosts_.end(), 0);
    }

    return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
}

host_t
SimpleHashDistributor::locate_file_metadata(const string& path) const {
    return str_hash(path) % hosts_size_;
}

::vector<host_t>
SimpleHashDistributor::locate_directory_metadata(const string& path) const {
    return all_hosts_;
}

LocalOnlyDistributor::LocalOnlyDistributor(host_t localhost)
    : localhost_(localhost) {}

host_t
LocalOnlyDistributor::localhost() const {
    return localhost_;
}

host_t
LocalOnlyDistributor::locate_data(const string& path,
                                  const chunkid_t& chnk_id) const {
    return localhost_;
}

host_t
LocalOnlyDistributor::locate_file_metadata(const string& path) const {
    return localhost_;
}

::vector<host_t>
LocalOnlyDistributor::locate_directory_metadata(const string& path) const {
    return {localhost_};
}

ForwarderDistributor::ForwarderDistributor(host_t fwhost,
                                           unsigned int hosts_size)
    : fwd_host_(fwhost), hosts_size_(hosts_size), all_hosts_(hosts_size) {
    ::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}

host_t
ForwarderDistributor::localhost() const {
    return fwd_host_;
}

host_t
ForwarderDistributor::locate_data(const std::string& path,
                                  const chunkid_t& chnk_id) const {
    return fwd_host_;
}

host_t
ForwarderDistributor::locate_data(const std::string& path,
                                  const chunkid_t& chnk_id,
                                  unsigned int host_size) {
    return fwd_host_;
}

host_t
ForwarderDistributor::locate_file_metadata(const std::string& path) const {
    return str_hash(path) % hosts_size_;
}

std::vector<host_t>
ForwarderDistributor::locate_directory_metadata(const std::string& path) const {
    return all_hosts_;
}

void
IntervalSet::Add(chunkid_t smaller, chunkid_t bigger) {
    const auto next = _intervals.upper_bound(smaller);
    if(next != _intervals.cbegin()) {
        const auto prev = std::prev(next);
        if(next != _intervals.cend() && next->first <= bigger + 1) {
            bigger = next->second;
            _intervals.erase(next);
        }
        if(prev->second + 1 >= smaller) {
            smaller = prev->first;
            _intervals.erase(prev);
        }
    }
    _intervals[smaller] = bigger;
}

bool
IntervalSet::IsInsideInterval(unsigned int v) const {
    const auto suspectNext = _intervals.upper_bound(v);
    const auto suspect = std::prev(suspectNext);
    return suspect->first <= v && v <= suspect->second;
}

bool
GuidedDistributor::init_guided() {
    unsigned int destination_host;
    chunkid_t chunk_id;
    string path;
    std::ifstream mapfile;
    mapfile.open(GKFS_USE_GUIDED_DISTRIBUTION_PATH);
    if((mapfile.rdstate() & std::ifstream::failbit) != 0)
        return false; // If it fails, the mapping will be as the SimpleHash

    while(mapfile >> path >> chunk_id >> destination_host) {
        // We need destination+1, as 0 has an special meaning in the interval
        // map.
        if(path.size() > 0 and path[0] == '#') {
            // Path that has this prefix will have metadata and data in the same
            // place  i.e. #/mdtest-hard/ 10 10 chunk_id and destination_host
            // are not used
            prefix_list.emplace_back(path.substr(1, path.size()));
            continue;
        }

        auto I = map_interval.find(path);
        if(I == map_interval.end()) {
            auto tmp = IntervalSet();
            tmp.Add(chunk_id, chunk_id + 1);
            map_interval[path] = make_pair(tmp, destination_host + 1);
        } else if(I->second.first.IsInsideInterval(chunk_id)) {
            auto is = I->second.first;
            is.Add(chunk_id, chunk_id + 1);
            I->second = (make_pair(is, destination_host + 1));
        }
    }
    mapfile.close();
    return true;
}

GuidedDistributor::GuidedDistributor() {
    init_guided();
}

GuidedDistributor::GuidedDistributor(host_t localhost,
                                     unsigned int hosts_size) {
    if(hosts_size_ != hosts_size) {
        hosts_size_ = hosts_size;
        localhost_ = localhost;
        all_hosts_ = std::vector<unsigned int>(hosts_size);
        ::iota(all_hosts_.begin(), all_hosts_.end(), 0);
    }
    init_guided();
}

host_t
GuidedDistributor::localhost() const {
    return localhost_;
}

host_t
GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
                               unsigned int hosts_size) {
    if(hosts_size_ != hosts_size) {
        hosts_size_ = hosts_size;
        all_hosts_ = std::vector<unsigned int>(hosts_size);
        ::iota(all_hosts_.begin(), all_hosts_.end(), 0);
    }

    return (locate_data(path, chnk_id));
}

host_t
GuidedDistributor::locate_data(const string& path,
                               const chunkid_t& chnk_id) const {
    auto it = map_interval.find(path);
    if(it != map_interval.end()) {
        auto it_f = it->second.first.IsInsideInterval(chnk_id);
        if(it_f) {
            return (it->second.second -
                    1); // Decrement destination host from the interval_map
        }
    }

    for(auto const& it : prefix_list) {
        if(0 == path.compare(0, min(it.length(), path.length()), it, 0,
                             min(it.length(), path.length()))) {
        }
        return str_hash(path) % hosts_size_;
    }

    auto locate = path + ::to_string(chnk_id);
    return str_hash(locate) % hosts_size_;
}

host_t
GuidedDistributor::locate_file_metadata(const string& path) const {
    return str_hash(path) % hosts_size_;
}

::vector<host_t>
GuidedDistributor::locate_directory_metadata(const string& path) const {
    return all_hosts_;
}

} // namespace rpc
} // namespace gkfs