Program Listing for File malleable_manager.cpp

Return to documentation for file (src/daemon/malleability/malleable_manager.cpp)

/*
  Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This software was partially supported by the
  the European Union’s Horizon 2020 JTI-EuroHPC research and
  innovation programme, by the project ADMIRE (Project ID: 956748,
  admire-eurohpc.eu)

  This project was partially promoted by the Ministry for Digital Transformation
  and the Civil Service, within the framework of the Recovery,
  Transformation and Resilience Plan - Funded by the European Union
  -NextGenerationEU.

  This file is part of GekkoFS.

  GekkoFS is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  GekkoFS is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: GPL-3.0-or-later
*/

#include <daemon/malleability/malleable_manager.hpp>
#include <daemon/malleability/rpc/forward_redistribution.hpp>
#include <daemon/backend/metadata/db.hpp>
#include <daemon/backend/data/chunk_storage.hpp>

#include <common/rpc/rpc_util.hpp>

#include <filesystem>
#include <algorithm>
#include <regex>
#include <random>
#include <thread>

extern "C" {
#include <abt.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
}

using namespace std;
namespace fs = std::filesystem;

namespace gkfs::malleable {

// TODO The following three functions are almost identical to the proxy code
// They should be moved to a common and shared between the proxy and the daemon
vector<pair<string, string>>
MalleableManager::load_hostfile(const std::string& path) {

    GKFS_DATA->spdlogger()->info("{}() Loading hosts file '{}'", __func__,
                                 path);

    ifstream lf(path);
    if(!lf) {
        throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
                                        path, strerror(errno)));
    }
    vector<pair<string, string>> hosts;
    const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$",
                        regex::ECMAScript | regex::optimize);
    string line;
    string host;
    string uri;
    std::smatch match;
    while(getline(lf, line)) {
        // if line starts with #, it indicates the end of current FS instance
        // It is therefore skipped
        if(line[0] == '#')
            continue;
        if(!regex_match(line, match, line_re)) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Unrecognized line format: [path: '{}', line: '{}']",
                    path, line);
            throw runtime_error(
                    fmt::format("unrecognized line format: '{}'", line));
        }
        host = match[1];
        uri = match[2];
        hosts.emplace_back(host, uri);
    }
    if(hosts.empty()) {
        throw runtime_error(
                "Hosts file found but no suitable addresses could be extracted");
    }
    // sort hosts so that data always hashes to the same place
    std::sort(hosts.begin(), hosts.end());
    // remove rootdir suffix from host after sorting as no longer required
    for(auto& h : hosts) {
        auto idx = h.first.rfind("#");
        if(idx != string::npos)
            h.first.erase(idx, h.first.length());
    }
    return hosts;
}

vector<pair<string, string>>
MalleableManager::read_hosts_file() {
    auto hostfile = GKFS_DATA->hosts_file();
    GKFS_DATA->spdlogger()->info("{}() Reading hosts file...", __func__);

    vector<pair<string, string>> hosts;
    try {
        hosts = load_hostfile(hostfile);
    } catch(const exception& e) {
        auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
        throw runtime_error(emsg);
    }

    if(hosts.empty()) {
        throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
    }
    GKFS_DATA->spdlogger()->info("{}() Number of hosts after expansion '{}'",
                                 __func__, hosts.size());
    return hosts;
}

void
MalleableManager::connect_to_hosts(
        const vector<std::pair<std::string, std::string>>& hosts) {
    auto local_hostname = gkfs::rpc::get_my_hostname(true);
    bool local_host_found = false;

    RPC_DATA->hosts_size(hosts.size());
    vector<uint64_t> host_ids(hosts.size());
    // populate vector with [0, ..., host_size - 1]
    ::iota(::begin(host_ids), ::end(host_ids), 0);
    /*
     * Shuffle hosts to balance addr lookups to all hosts
     * Too many concurrent lookups send to same host
     * could overwhelm the server,
     * returning error when addr lookup
     */
    ::random_device rd; // obtain a random number from hardware
    ::mt19937 g(rd());  // seed the random generator
    ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
    // lookup addresses and put abstract server addresses into rpc_addresses
    for(const auto& id : host_ids) {
        const auto& hostname = hosts.at(id).first;
        const auto& uri = hosts.at(id).second;

        hg_addr_t svr_addr = HG_ADDR_NULL;

        // try to look up 3 times before erroring out
        hg_return_t ret;
        for(uint32_t i = 0; i < 4; i++) {
            ret = margo_addr_lookup(RPC_DATA->client_rpc_mid(), uri.c_str(),
                                    &svr_addr);
            if(ret != HG_SUCCESS) {
                // still not working after 5 tries.
                if(i == 3) {
                    auto err_msg =
                            fmt::format("{}() Unable to lookup address '{}'",
                                        __func__, uri);
                    throw runtime_error(err_msg);
                }
                // Wait a random amount of time and try again
                ::mt19937 eng(rd()); // seed the random generator
                ::uniform_int_distribution<> distr(
                        50, 50 * (i + 2)); // define the range
                ::this_thread::sleep_for(std::chrono::milliseconds(distr(eng)));
            } else {
                break;
            }
        }
        if(svr_addr == HG_ADDR_NULL) {
            auto err_msg = fmt::format(
                    "{}() looked up address is NULL for address '{}'", __func__,
                    uri);
            throw runtime_error(err_msg);
        }
        RPC_DATA->rpc_endpoints().insert(make_pair(id, svr_addr));

        if(!local_host_found && hostname == local_hostname) {
            GKFS_DATA->spdlogger()->debug("{}() Found local host: {}", __func__,
                                          hostname);
            RPC_DATA->local_host_id(id);
            local_host_found = true;
        }
        GKFS_DATA->spdlogger()->debug("{}() Found daemon: id '{}' uri '{}'",
                                      __func__, id, uri);
    }
    if(!local_host_found) {
        auto err_msg = fmt::format(
                "{}() Local host '{}' not found in hosts file. This should not happen.",
                __func__, local_hostname);
        throw runtime_error(err_msg);
    }
}

int
MalleableManager::redistribute_metadata() {
    uint64_t count = 0;
    auto estimate_db_size = GKFS_DATA->mdb()->db_size();
    auto percent_interval = estimate_db_size / 100;
    GKFS_DATA->spdlogger()->info(
            "{}() Starting metadata redistribution for '{}' estimated number of KV pairs...",
            __func__, estimate_db_size);
    int migration_err = 0;
    string key, value;
    auto iter =
            static_cast<rocksdb::Iterator*>(GKFS_DATA->mdb()->iterate_all());
    // TODO parallelize
    for(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
        key = iter->key().ToString();
        value = iter->value().ToString();
        if(key == "/") {
            continue;
        }
        auto dest_id = RPC_DATA->distributor()->locate_file_metadata(key, 0);
        GKFS_DATA->spdlogger()->trace(
                "{}() Migration: key {} and value {}. From host {} to host {}",
                __func__, key, value, RPC_DATA->local_host_id(), dest_id);
        if(dest_id == RPC_DATA->local_host_id()) {
            GKFS_DATA->spdlogger()->trace("{}() SKIP", __func__);
            continue;
        }
        auto err = gkfs::malleable::rpc::forward_metadata(key, value, dest_id);
        if(err != 0) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Failed to migrate metadata for key '{}'", __func__,
                    key);
            migration_err++;
        }
        GKFS_DATA->mdb()->remove(key);
        count++;
        if(percent_interval > 0 && count % percent_interval == 0) {
            GKFS_DATA->spdlogger()->info(
                    "{}() Metadata migration {}%/100% completed...", __func__,
                    count / percent_interval);
        }
    }
    GKFS_DATA->spdlogger()->info("{}() Metadata redistribution completed.",
                                 __func__);
    return migration_err;
}

void
MalleableManager::redistribute_data() {
    GKFS_DATA->spdlogger()->info("{}() Starting data redistribution...",
                                 __func__);

    auto chunk_dir = fs::path(GKFS_DATA->storage()->get_chunk_directory());
    auto dir_iterator = GKFS_DATA->storage()->get_all_chunk_files();

    // TODO this can be parallelized, e.g., async chunk I/O
    for(const auto& entry : dir_iterator) {
        if(!entry.is_regular_file()) {
            continue;
        }
        // path under chunkdir as placed in the rootdir
        auto rel_chunk_dir = fs::relative(entry, chunk_dir);
        // chunk id from this entry used for determining destination
        uint64_t chunk_id = stoul(rel_chunk_dir.filename().string());
        // mountdir gekkofs path used for determining destination
        auto gkfs_path = rel_chunk_dir.parent_path().string();
        ::replace(gkfs_path.begin(), gkfs_path.end(), ':', '/');
        gkfs_path = "/" + gkfs_path;
        auto dest_id =
                RPC_DATA->distributor()->locate_data(gkfs_path, chunk_id, 0);
        GKFS_DATA->spdlogger()->trace(
                "{}() Migrating chunkfile: {} for gkfs file {} chnkid {} destid {}",
                __func__, rel_chunk_dir.string(), gkfs_path, chunk_id, dest_id);
        if(dest_id == RPC_DATA->local_host_id()) {
            GKFS_DATA->spdlogger()->trace("{}() SKIPPERS", __func__);
            continue;
        }
        auto fd = open(entry.path().c_str(), O_RDONLY);
        if(fd < 0) {
            GKFS_DATA->spdlogger()->error("{}() Failed to open chunkfile: {}",
                                          __func__, entry.path().c_str());
            continue;
        }
        auto buf = new char[entry.file_size()];
        auto bytes_read = read(fd, buf, entry.file_size());
        if(bytes_read < 0) {
            GKFS_DATA->spdlogger()->error("{}() Failed to read chunkfile: {}",
                                          __func__, entry.path().c_str());
            continue;
        }
        auto err = gkfs::malleable::rpc::forward_data(
                gkfs_path, buf, bytes_read, chunk_id, dest_id);
        if(err != 0) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Failed to migrate data for chunkfile: {}", __func__,
                    entry.path().c_str());
        }
        close(fd);
        GKFS_DATA->spdlogger()->trace(
                "{}() Data migration completed for chunkfile: {}. Removing ...",
                __func__, entry.path().c_str());
        // remove file after migration
        auto entry_dir = entry.path().parent_path();
        try {
            fs::remove(entry);
            if(fs::is_empty(entry_dir)) {
                fs::remove(entry_dir);
            }
        } catch(const fs::filesystem_error& e) {
            GKFS_DATA->spdlogger()->error("{}() Failed to remove chunkfile: {}",
                                          __func__, entry.path().c_str());
        }
        GKFS_DATA->spdlogger()->trace("{}() Done for chunkfile: {}", __func__,
                                      entry.path().c_str());
    }

    GKFS_DATA->spdlogger()->info("{}() Data redistribution completed.",
                                 __func__);
}

void
MalleableManager::expand_abt(void* _arg) {
    GKFS_DATA->spdlogger()->info("{}() Starting expansion process...",
                                 __func__);
    GKFS_DATA->redist_running(true);
    GKFS_DATA->malleable_manager()->redistribute_metadata();
    try {
        GKFS_DATA->malleable_manager()->redistribute_data();
    } catch(const gkfs::data::ChunkStorageException& e) {
        GKFS_DATA->spdlogger()->error("{}() Failed to redistribute data: '{}'",
                                      __func__, e.what());
    }
    GKFS_DATA->redist_running(false);
    GKFS_DATA->spdlogger()->info(
            "{}() Expansion process successfully finished.", __func__);
}

// PUBLIC

void
MalleableManager::expand_start(int old_server_conf, int new_server_conf) {
    auto hosts = read_hosts_file();
    if(hosts.size() != static_cast<size_t>(new_server_conf)) {
        throw runtime_error(
                fmt::format("MalleableManager::{}() Something is wrong. "
                            "Number of hosts in hosts file ({}) "
                            "does not match new server configuration ({})",
                            __func__, hosts.size(), new_server_conf));
    }
    connect_to_hosts(hosts);
    RPC_DATA->distributor()->hosts_size(hosts.size());
    auto abt_err =
            ABT_thread_create(RPC_DATA->io_pool(), expand_abt,
                              ABT_THREAD_ATTR_NULL, nullptr, &redist_thread_);
    if(abt_err != ABT_SUCCESS) {
        auto err_str = fmt::format(
                "MalleableManager::{}() Failed to create ABT thread with abt_err '{}'",
                __func__, abt_err);
        throw runtime_error(err_str);
    }
}

} // namespace gkfs::malleable