Commit f5b0b9ed authored by Ramon Nou's avatar Ramon Nou
Browse files

First prototype FTIO - ADHOC - CARGO integration

parent 5e8efca3
Loading
Loading
Loading
Loading
Loading
+197 −29
Original line number Diff line number Diff line
@@ -177,18 +177,58 @@ master_server::mpi_listener_ult() {
void
master_server::ftio_scheduling_ult() {


    while(!m_shutting_down) {

        if(!m_pending_transfer.m_work or m_period < 0.0f) {
            std::this_thread::sleep_for(1000ms);
        // thallium::thread::self().sleep(m_network_engine, 10);
        } 


        // Do something with the confidence and probability

        if(m_ftio_changed) {
            m_ftio_changed = false;
        LOGGER_INFO("Confidence is {}, probability is {} and period is {}", m_confidence,
                    m_probability, m_period);
            LOGGER_INFO("Confidence is {}, probability is {} and period is {}",
                        m_confidence, m_probability, m_period);
        }

        if(!m_pending_transfer.m_work)
            continue;

        LOGGER_INFO("Waiting period : {}", m_period);
            std::this_thread::sleep_for(
                    std::chrono::seconds((int)(m_period )));

        LOGGER_INFO("Checking if there is work to do in {}",
                    m_pending_transfer.m_sources);
        transfer_dataset_internal(m_pending_transfer);
        // This launches the workers to do the work...
        // We wait until this transfer is finished
        LOGGER_INFO("Transferring : {}", m_pending_transfer.m_expanded_sources);
        bool finished = false;
        while(!finished) {
            std::this_thread::sleep_for(10ms);
            m_request_manager.lookup(m_pending_transfer.m_p.tid())
                    .or_else([&](auto&& ec) {
                        LOGGER_ERROR("Failed to lookup request: {}", ec);
                    })
                    .map([&](auto&& rs) {
                        if(rs.state() == transfer_state::completed) {
                            finished = true;
                        }
                    });
        }

        if(finished) {
            // Delete all source files
            LOGGER_INFO("Transfer finished for {}",
                    m_pending_transfer.m_expanded_sources);
            auto fs = FSPlugin::make_fs(cargo::FSPlugin::type::gekkofs);
            for(auto& file : m_pending_transfer.m_expanded_sources) {
                LOGGER_INFO("Deleting {}", file.path());
                // We need to use gekkofs to delete
                fs->unlink(file.path());
            }
        }
    }

@@ -252,6 +292,120 @@ master_server::shutdown(const network::request& req) {
    server::shutdown();
}

// Function that gets a pending_request, fills the request and sends the mpi
// message for the transfer We only put files that has mtime < actual
// timestamp , intended for stage-out and ftio
void
master_server::transfer_dataset_internal(pending_transfer& pt) {

    mpi::communicator world;
    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;
    time_t now = time(0);
    now = now - 5; // Threshold for mtime
    for(auto i = 0u; i < pt.m_sources.size(); ++i) {

        const auto& s = pt.m_sources[i];
        const auto& d = pt.m_targets[i];

        // We need to expand directories to single files on the s
        // Then create a new message for each file and append the
        // file to the d prefix
        // We will asume that the path is the original absolute
        // The prefix selects the method of transfer
        // And if not specified then we will use none
        // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then
        // bbb/xxx -> ttt/xxx
        const auto& p = s.path();

        std::vector<std::string> files;
        // Check stat of p using FSPlugin class
        auto fs = FSPlugin::make_fs(
                static_cast<cargo::FSPlugin::type>(s.get_type()));
        struct stat buf;
        fs->stat(p, &buf);
        if(buf.st_mode & S_IFDIR) {
            LOGGER_INFO("Expanding input directory {}", p);
            files = fs->readdir(p);

            /*
            We have all the files expanded. Now create a new
            cargo::dataset for each file as s and a new
            cargo::dataset appending the base directory in d to the
            file name.
            */
            for(const auto& f : files) {
                cargo::dataset s_new(s);
                cargo::dataset d_new(d);
                s_new.path(f);
                // We need to get filename from the original root
                // path (d.path) plus the path from f, removing the
                // initial path p (taking care of the trailing /)
                auto leading = p.size();
                if(leading > 0 and p.back() == '/') {
                    leading--;
                }

                d_new.path(d.path() /
                           std::filesystem::path(f.substr(leading + 1)));

                LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                             d_new.path());
                fs->stat(s_new.path(), &buf);
                if(buf.st_mtime < now) {
                    v_s_new.push_back(s_new);
                    v_d_new.push_back(d_new);
                }
            }
        } else {
            fs->stat(s.path(), &buf);
            if(buf.st_mtime < now) {
                v_s_new.push_back(s);
                v_d_new.push_back(d);
            }
        }
    }

    // empty m_expanded_sources
    pt.m_expanded_sources.assign(v_s_new.begin(), v_s_new.end());
    pt.m_expanded_targets.assign(v_d_new.begin(), v_d_new.end());

    // We have two vectors, so we process the transfer
    // [1] Update request_manager
    // [2] Send message to worker

    auto ec = m_request_manager.update(pt.m_p.tid(), v_s_new.size(),
                                       pt.m_p.nworkers());
    if(ec != error_code::success) {
        LOGGER_ERROR("Failed to update request: {}", ec);
        return;
    };

    assert(v_s_new.size() == v_d_new.size());

    // For all the transfers
    for(std::size_t i = 0; i < v_s_new.size(); ++i) {
        const auto& s = v_s_new[i];
        const auto& d = v_d_new[i];

        // Create the directory if it does not exist (only in
        // parallel transfer)
        if(!std::filesystem::path(d.path()).parent_path().empty() and
           d.supports_parallel_transfer()) {
            std::filesystem::create_directories(
                    std::filesystem::path(d.path()).parent_path());
        }


        // Send message to worker
        for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) {
            const auto [t, m] = make_message(pt.m_p.tid(), i, s, d);
            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
            world.send(static_cast<int>(rank), t, m);
        }
    }
}

void
master_server::transfer_datasets(const network::request& req,
                                 const std::vector<dataset>& sources,
@@ -268,8 +422,8 @@ master_server::transfer_datasets(const network::request& req,
                targets);


    // As we accept directories expanding directories should be done before and
    // update sources and targets.
    // As we accept directories expanding directories should be done before
    // and update sources and targets.

    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;
@@ -292,7 +446,8 @@ master_server::transfer_datasets(const network::request& req,

        std::vector<std::string> files;
        // Check stat of p using FSPlugin class
        auto fs = FSPlugin::make_fs(static_cast<cargo::FSPlugin::type>(s.get_type()));
        auto fs = FSPlugin::make_fs(
                static_cast<cargo::FSPlugin::type>(s.get_type()));
        struct stat buf;
        fs->stat(p, &buf);
        if(buf.st_mode & S_IFDIR) {
@@ -318,8 +473,8 @@ master_server::transfer_datasets(const network::request& req,
                    leading--;
                }

                d_new.path(d.path() / std::filesystem::path(
                                              f.substr(leading + 1)));
                d_new.path(d.path() /
                           std::filesystem::path(f.substr(leading + 1)));

                LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                             d_new.path());
@@ -342,7 +497,7 @@ master_server::transfer_datasets(const network::request& req,
            .map([&](auto&& r) {
                assert(v_s_new.size() == v_d_new.size());

                // For all the files
                // For all the transfers
                for(std::size_t i = 0; i < v_s_new.size(); ++i) {
                    const auto& s = v_s_new[i];
                    const auto& d = v_d_new[i];
@@ -357,12 +512,30 @@ master_server::transfer_datasets(const network::request& req,
                                std::filesystem::path(d.path()).parent_path());
                    }

                    for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) {
                    // If we are not using ftio start transfer if we are on
                    // stage-out
                    if(m_ftio) {
                        // If we are on stage-out
                        if(s.get_type() == cargo::dataset::type::gekkofs) {

                            // We have only one pendingTransfer for FTIO
                            // that can be updated, the issue is that we
                            // need the tid.
                            m_pending_transfer.m_p = r;
                            m_pending_transfer.m_sources = sources;
                            m_pending_transfer.m_targets = targets;
                            m_pending_transfer.m_work = true;
                            LOGGER_INFO("Stored stage-out information");
                        }
                    } else {
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] = make_message(r.tid(), i, s, d);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                        }
                    }
                }
                LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc,
                            error_code::success, r.tid());
                req.respond(response_with_id{rpc.id(), error_code::success,
@@ -451,8 +624,8 @@ master_server::transfer_statuses(const network::request& req,


void
master_server::ftio_int(const network::request& req, float conf,
                        float prob, float period) {
master_server::ftio_int(const network::request& req, float conf, float prob,
                        float period) {
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;
@@ -463,16 +636,11 @@ master_server::ftio_int(const network::request& req, float conf,
    m_probability = prob;
    m_period = period;
    m_ftio_changed = true;
    LOGGER_INFO("rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}", rpc,
                conf, prob, period);

    // do the magic here
    m_ftio = true;

    // 1. Update the confidence and probability values inside cargo

    // Scheduling thread should be running and waiting for them

    //
    LOGGER_INFO(
            "rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}",
            rpc, conf, prob, period);

    const auto resp = generic_response{rpc.id(), error_code::success};

+31 −4
Original line number Diff line number Diff line
@@ -28,9 +28,25 @@
#include "net/server.hpp"
#include "cargo.hpp"
#include "request_manager.hpp"
#include "parallel_request.hpp"

namespace cargo {

class pending_transfer {
public:
    pending_transfer():m_p(cargo::parallel_request(0,0,0)) {
        m_work = false;
    }

    bool m_work;
    cargo::parallel_request m_p;
    std::vector<cargo::dataset> m_sources;
    std::vector<cargo::dataset> m_targets;
    // Expanded sources and targets (those that are being processed by the worker)
    std::vector<cargo::dataset> m_expanded_sources;
    std::vector<cargo::dataset> m_expanded_targets;
};

class master_server : public network::server,
                      public network::provider<master_server> {
public:
@@ -67,11 +83,13 @@ private:
    // Receives a request to increase or decrease BW
    // -1 faster, 0 , +1 slower
    void
    bw_control(const network::request& req, std::uint64_t tid, std::int16_t shaping);
    bw_control(const network::request& req, std::uint64_t tid,
               std::int16_t shaping);


    void
    ftio_int(const network::request& req, float confidence, float probability, float period);
    ftio_int(const network::request& req, float confidence, float probability,
             float period);

private:
    // Dedicated execution stream for the MPI listener ULT
@@ -87,6 +105,15 @@ private:
    float m_probability = -1.0f;
    float m_period = -1.0f;
    bool m_ftio_changed = true;
    // FTIO enabled flag, we need to call ftio once.
    bool m_ftio = false;


    pending_transfer m_pending_transfer;


    void
    transfer_dataset_internal(pending_transfer& pt);
    // Request manager
    request_manager m_request_manager;
};
+3 −2
Original line number Diff line number Diff line
@@ -159,8 +159,9 @@ public:
        m_fs_plugin = cargo::FSPlugin::make_fs(t);
    };

    explicit file(std::filesystem::path filepath) noexcept
        : m_path(std::move(filepath)) {}
    explicit file(std::filesystem::path filepath,
                  cargo::FSPlugin::type t) noexcept
        : m_path(std::move(filepath)), m_fs_plugin(cargo::FSPlugin::make_fs(t)) {}

    file(std::filesystem::path filepath, int fd,
         std::shared_ptr<cargo::FSPlugin> fs_plugin) noexcept
+11 −4
Original line number Diff line number Diff line
@@ -77,17 +77,24 @@ gekko_plugin::readdir(const std::string& path) {
    files = gkfs::syscall::gkfs_get_file_list(path);

    for(auto& file : files) {
        file = "/" + file;

        struct stat buf;
        stat(file, &buf);
        stat("/" + file, &buf);
        if(S_ISDIR(buf.st_mode)) {
            std::vector<std::string> subfiles = readdir(file);

            std::vector<std::string> subfiles = readdir("/" + file);
            final_list.insert(final_list.end(), subfiles.begin(),
                              subfiles.end());
        } else {
            final_list.push_back(file);

            if(path.size() != 1) {
                final_list.push_back(path + "/" + file);
            } else {
                final_list.push_back("/" + file);
            }
        }
    }

    return final_list;
}

+22 −0
Original line number Diff line number Diff line
@@ -55,6 +55,28 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) {
    return parallel_request{tid, nfiles, nworkers};
}

/**
 * @brief Update the request for ftio processing (as it is modified by readdir)
 * 
 * @param request 
 * @param nfiles 
 * @param nworkers 
 * @return error_code 
 */
error_code
request_manager::update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers){
    abt::unique_lock lock(m_mutex);

    if(const auto it = m_requests.find(tid); it != m_requests.end()) {
        it->second.resize(nfiles, std::vector<part_status>{nworkers});
        return error_code::success;
        
    }
    LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid);
    return error_code::no_such_transfer;
}


error_code
request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, 
                        transfer_state s, float bw,
Loading