Commit bbedfa37 authored by Marc Vef's avatar Marc Vef
Browse files

Working metadata redistribution on demand

parent 91679852
Loading
Loading
Loading
Loading
+15 −0
Original line number Diff line number Diff line
@@ -56,6 +56,9 @@ public:
    virtual unsigned int
    hosts_size() const = 0;

    virtual void
    hosts_size(unsigned int size) = 0;

    virtual host_t
    locate_data(const std::string& path, const chunkid_t& chnk_id,
                unsigned int hosts_size, const int num_copy) = 0;
@@ -83,6 +86,9 @@ public:
    unsigned int
    hosts_size() const override;

    void
    hosts_size(unsigned int size) override;

    host_t
    localhost() const override;

@@ -116,6 +122,9 @@ public:
    unsigned int
    hosts_size() const override;

    void
    hosts_size(unsigned int size) override;

    host_t
    locate_data(const std::string& path, const chunkid_t& chnk_id,
                const int num_copy) const override;
@@ -144,6 +153,9 @@ public:
    unsigned int
    hosts_size() const override;

    void
    hosts_size(unsigned int size) override;

    host_t
    locate_data(const std::string& path, const chunkid_t& chnk_id,
                const int num_copy) const override final;
@@ -197,6 +209,9 @@ public:
    unsigned int
    hosts_size() const override;

    void
    hosts_size(unsigned int size) override;

    host_t
    locate_data(const std::string& path, const chunkid_t& chnk_id,
                const int num_copy) const override;
+20 −0
Original line number Diff line number Diff line
@@ -52,6 +52,11 @@ SimpleHashDistributor::hosts_size() const {
    return hosts_size_;
}

void
SimpleHashDistributor::hosts_size(unsigned int size) {
    hosts_size_ = size;
}

host_t
SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
                                   const int num_copy) const {
@@ -95,6 +100,11 @@ LocalOnlyDistributor::hosts_size() const {
    return hosts_size_;
}

void
LocalOnlyDistributor::hosts_size(unsigned int size) {
    hosts_size_ = size;
}

host_t
LocalOnlyDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
                                  const int num_copy) const {
@@ -128,6 +138,11 @@ ForwarderDistributor::hosts_size() const {
    return hosts_size_;
}

void
ForwarderDistributor::hosts_size(unsigned int size) {
    hosts_size_ = size;
}

host_t
ForwarderDistributor::locate_data(const std::string& path,
                                  const chunkid_t& chnk_id,
@@ -239,6 +254,11 @@ GuidedDistributor::hosts_size() const {
    return hosts_size_;
}

void
GuidedDistributor::hosts_size(unsigned int size) {
    hosts_size_ = size;
}

host_t
GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
                               unsigned int hosts_size, const int num_copy) {
+21 −17
Original line number Diff line number Diff line
@@ -278,9 +278,12 @@ init_rpc_client() {
    if(gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->rpc_protocol())
        hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto margo_config = "{}";
    auto margo_config = fmt::format(
            R"({{ "use_progress_thread" : true, "rpc_thread_count" : {} }})",
            0);
    //    auto margo_config = "{}";
    struct margo_init_info args = {nullptr};
    args.json_config = margo_config;
    args.json_config = margo_config.c_str();
    args.hg_init_info = &hg_options;
    auto* mid = margo_init_ext(GKFS_DATA->bind_addr().c_str(),
                               MARGO_CLIENT_MODE, &args);
@@ -465,20 +468,6 @@ init_environment() {
    // init margo for proxy RPC

    if(!GKFS_DATA->bind_proxy_addr().empty()) {
        GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ... ",
                                      __func__);
        try {
            auto distributor =
                    std::make_shared<gkfs::rpc::SimpleHashDistributor>();
            RPC_DATA->distributor(distributor);
        } catch(const std::exception& e) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Failed to initialize Distributor: {}", __func__,
                    e.what());
            throw;
        }
        GKFS_DATA->spdlogger()->debug("{}() Distributed running.", __func__);

        GKFS_DATA->spdlogger()->debug(
                "{}() Initializing proxy RPC server: '{}'", __func__,
                GKFS_DATA->bind_proxy_addr());
@@ -538,12 +527,28 @@ init_environment() {
        throw;
    }
    GKFS_DATA->spdlogger()->debug("{}() RPC client running.", __func__);

    // Needed for client
    GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ... ",
                                  __func__);
    try {
        auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>();
        RPC_DATA->distributor(distributor);
    } catch(const std::exception& e) {
        GKFS_DATA->spdlogger()->error(
                "{}() Failed to initialize Distributor: {}", __func__,
                e.what());
        throw;
    }
    GKFS_DATA->spdlogger()->debug("{}() Distributed running.", __func__);

    GKFS_DATA->spdlogger()->debug("{}() Initializing MalleableManager...",
                                  __func__);
    try {
        auto malleable_manager =
                std::make_shared<gkfs::malleable::MalleableManager>();
        GKFS_DATA->malleable_manager(malleable_manager);

    } catch(const std::exception& e) {
        GKFS_DATA->spdlogger()->error(
                "{}() Failed to initialize MalleableManager: {}", __func__,
@@ -552,7 +557,6 @@ init_environment() {
    }
    GKFS_DATA->spdlogger()->debug("{}() MalleableManager running.", __func__);


    GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready.");
}

+8 −1
Original line number Diff line number Diff line
@@ -208,7 +208,10 @@ MalleableManager::expand_start(int old_server_conf, int new_server_conf) {
                            __func__, hosts.size(), new_server_conf));
    }
    connect_to_hosts(hosts);

    RPC_DATA->distributor()->hosts_size(hosts.size());
    GKFS_DATA->spdlogger()->info(
            "{}() Total number of hosts after expansion: {}", __func__,
            RPC_DATA->distributor()->hosts_size());
    auto abt_err =
            ABT_thread_create(RPC_DATA->io_pool(), expand_abt,
                              ABT_THREAD_ATTR_NULL, nullptr, &redist_thread_);
@@ -239,7 +242,11 @@ MalleableManager::redistribute_metadata() {
            continue;
        }
        auto dest_id = RPC_DATA->distributor()->locate_file_metadata(key, 0);
        GKFS_DATA->spdlogger()->info(
                "{}() Migration: key {} and value {}. From host {} to host {}",
                __func__, key, value, RPC_DATA->local_host_id(), dest_id);
        if(dest_id == RPC_DATA->local_host_id()) {
            GKFS_DATA->spdlogger()->info("{}() SKIPPERS", __func__);
            continue;
        }
        auto err = gkfs::malleable::rpc::forward_metadata(key, value, dest_id);
+3 −0
Original line number Diff line number Diff line
@@ -38,6 +38,9 @@ forward_metadata(std::string& key, std::string& value, unsigned int dest_id) {
    rpc_migrate_metadata_in_t in{};
    rpc_err_out_t out{};
    int err;
    // set input
    in.key = key.c_str();
    in.value = value.c_str();
    // Create handle
    GKFS_DATA->spdlogger()->debug("{}() Creating Margo handle ...", __func__);
    auto endp = RPC_DATA->rpc_endpoints().at(dest_id);
Loading