Commit 1c80c7a9 authored by Ramon Nou's avatar Ramon Nou Committed by Ramon Nou
Browse files

WIP sleep

updated thread

Moving info from master to worker progress

loop update

rebase task

added chrono fmt

Progress loop extracted

Reduce simultaneous transfers

wrong Error code in write

error control

Missing size for write

Added BW, solved sleep bug (intialization)

BW workflow finished

Support for creating directories

Directory Support

Reduce CPU usage with dynamic wait

Only call create directories, if the path is not 0

Updated Spack for 0.4.0
parent bbb15c70
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19)

project(
  cargo
  VERSION 0.2.0
  VERSION 0.4.0
  LANGUAGES C CXX
)

+4 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
#include <chrono>
#include <cargo/error.hpp>

constexpr const uint64_t TIMES = 100;
namespace cargo {

using transfer_id = std::uint64_t;
@@ -74,6 +75,9 @@ public:
    [[nodiscard]] bool
    supports_parallel_transfer() const noexcept;

    void
    path(std::string path);

    template <typename Archive>
    void
    serialize(Archive& ar) {
+5 −0
Original line number Diff line number Diff line
@@ -72,6 +72,11 @@ dataset::path() const noexcept {
    return m_path;
};

void
dataset::path(std::string path) {
    m_path = std::move(path);
};

bool
dataset::supports_parallel_transfer() const noexcept {
    return m_type == dataset::type::parallel;
+1 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ class Cargo(CMakePackage):
    version("latest", branch="main")
    version("0.1.0", sha256="981d00adefbc2ea530f57f8428bd7980e4aab2993a86d8ae4274334c8f055bdb", deprecated=True)
    version("0.2.0", sha256="fd7fa31891b3961dcb376556ec5fa028bf512d96a7c688a160f9dade58dae36f")
    version("0.4.0", sha256="7a3de25165a6c6ce9dc356634d89f7052f8d2bef")

    # build variants
    variant('build_type',
+58 −7
Original line number Diff line number Diff line
@@ -108,16 +108,19 @@ void
master_server::mpi_listener_ult() {

    mpi::communicator world;

    uint64_t times = 0;
    while(!m_shutting_down) {

        auto msg = world.iprobe();

        if(!msg) {
            thallium::thread::self().sleep(m_network_engine, 150);
            thallium::thread::self().sleep(m_network_engine, 150*times);
            if (times < TIMES) {
                times++;
            }
            continue;
        }

        times=0;
        switch(static_cast<cargo::tag>(msg->tag())) {
            case tag::status: {
                status_message m;
@@ -233,15 +236,63 @@ master_server::transfer_datasets(const network::request& req,
                assert(sources.size() == targets.size());

                for(auto i = 0u; i < sources.size(); ++i) {

                    const auto& s = sources[i];
                    const auto& d = targets[i];

                    for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) {

                    // We need to expand directories to single files on the s
                    // Then create a new message for each file and append the
                    // file to the d prefix
                    // We will asume that the path is the original relative
                    // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then
                    // bbb/xxx -> ttt/xxx
                    const auto& p = s.path();
                    std::vector<std::filesystem::path> files;
                    if(std::filesystem::is_directory(p)) {
                        LOGGER_INFO("Expanding input directory {}", p);
                        for(const auto& f :
                            std::filesystem::recursive_directory_iterator(p)) {
                            if (std::filesystem::is_regular_file(f)) {
                                files.push_back(f.path());
                            }
                        }

                        /*
                        We have all the files expanded. Now create a new
                        cargo::dataset for each file as s and a new
                        cargo::dataset appending the base directory in d to the
                        file name.
                        */
                        for(const auto& f : files) {
                            cargo::dataset s_new(s);
                            cargo::dataset d_new(d);
                            s_new.path(f);
                            // We need to get filename from the original root path (d.path) plus the path from f, removing the initial path p
                            d_new.path(d.path() / std::filesystem::path(f.string().substr(p.size() + 1)));

                            LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                                         d_new.path());
                            for(std::size_t rank = 1; rank <= r.nworkers();
                                ++rank) {
                                const auto [t, m] =
                                        make_message(r.tid(), i, s_new, d_new);
                                LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                                world.send(static_cast<int>(rank), t, m);
                            }
                        }

                    } else {
                        // normal use case, we are specifying files

                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] = make_message(r.tid(), i, s, d);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                        }
                    }
                }

                LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc,
                            error_code::success, r.tid());
Loading