Commit 05bf4edf authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'rnou/directory_support' into 'main'

Directory support

This MR adds two features:

- Creation of directories on the output. As cargo is normally running in the prolog the user has zero capabilities to create directories on the adhoc filesystem (as it is ephemeral). This MR creates the needed directories.

- If the input contains a directory, we get all the files recursively. Before this MR only files can be used as input. The behaviour is that the input is used as prefix, i.e., relative directory, and will be put inside the output.

- Prepares for 0.3.0 release 

 The Merge request goes over !16

See merge request !18
parents b8003af7 360f4727
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19)

project(
  cargo
  VERSION 0.2.0
  VERSION 0.3.0
  LANGUAGES C CXX
)

+3 −0
Original line number Diff line number Diff line
@@ -74,6 +74,9 @@ public:
    [[nodiscard]] bool
    supports_parallel_transfer() const noexcept;

    void
    path(std::string path);

    template <typename Archive>
    void
    serialize(Archive& ar) {
+5 −0
Original line number Diff line number Diff line
@@ -72,6 +72,11 @@ dataset::path() const noexcept {
    return m_path;
};

void
dataset::path(std::string path) {
    m_path = std::move(path);
};

bool
dataset::supports_parallel_transfer() const noexcept {
    return m_type == dataset::type::parallel;
+1 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ class Cargo(CMakePackage):
    version("latest", branch="main")
    version("0.1.0", sha256="981d00adefbc2ea530f57f8428bd7980e4aab2993a86d8ae4274334c8f055bdb", deprecated=True)
    version("0.2.0", sha256="fd7fa31891b3961dcb376556ec5fa028bf512d96a7c688a160f9dade58dae36f")
    version("0.3.0", branch="rnou/directory_support")

    # build variants
    variant('build_type',
+71 −7
Original line number Diff line number Diff line
@@ -108,7 +108,6 @@ void
master_server::mpi_listener_ult() {

    mpi::communicator world;

    while(!m_shutting_down) {

        auto msg = world.iprobe();
@@ -223,18 +222,84 @@ master_server::transfer_datasets(const network::request& req,
    LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources,
                targets);

    m_request_manager.create(sources.size(), world.size() - 1)

    // As we accept directories expanding directories should be done before and
    // update sources and targets.

    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;

    for(auto i = 0u; i < sources.size(); ++i) {

        const auto& s = sources[i];
        const auto& d = targets[i];


        // We need to expand directories to single files on the s
        // Then create a new message for each file and append the
        // file to the d prefix
        // We will asume that the path is the original relative
        // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then
        // bbb/xxx -> ttt/xxx
        const auto& p = s.path();
        std::vector<std::filesystem::path> files;
        if(std::filesystem::is_directory(p)) {
            LOGGER_INFO("Expanding input directory {}", p);
            for(const auto& f :
                std::filesystem::recursive_directory_iterator(p)) {
                if(std::filesystem::is_regular_file(f)) {
                    files.push_back(f.path());
                }
            }

            /*
            We have all the files expanded. Now create a new
            cargo::dataset for each file as s and a new
            cargo::dataset appending the base directory in d to the
            file name.
            */
            for(const auto& f : files) {
                cargo::dataset s_new(s);
                cargo::dataset d_new(d);
                s_new.path(f);
                // We need to get filename from the original root
                // path (d.path) plus the path from f, removing the
                // initial path p
                d_new.path(d.path() / std::filesystem::path(
                                              f.string().substr(p.size() + 1)));

                LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                             d_new.path());
                v_s_new.push_back(s_new);
                v_d_new.push_back(d_new);
            }

        } else {
            v_s_new.push_back(s);
            v_d_new.push_back(d);
        }
    }

    m_request_manager.create(v_s_new.size(), world.size() - 1)
            .or_else([&](auto&& ec) {
                LOGGER_ERROR("Failed to create request: {}", ec);
                LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec);
                req.respond(generic_response{rpc.id(), ec});
            })
            .map([&](auto&& r) {
                assert(sources.size() == targets.size());
                assert(v_s_new.size() == v_d_new.size());

                for(auto i = 0u; i < sources.size(); ++i) {
                    const auto& s = sources[i];
                    const auto& d = targets[i];

                // For all the files
                for(std::size_t i = 0; i < v_s_new.size(); ++i) {
                    const auto& s = v_s_new[i];
                    const auto& d = v_d_new[i];

                    // Create the directory if it does not exist
                    if(!std::filesystem::path(d.path()).parent_path().empty()) {
                        std::filesystem::create_directories(
                                std::filesystem::path(d.path()).parent_path());
                    }

                    for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) {
                        const auto [t, m] = make_message(r.tid(), i, s, d);
@@ -242,7 +307,6 @@ master_server::transfer_datasets(const network::request& req,
                        world.send(static_cast<int>(rank), t, m);
                    }
                }

                LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc,
                            error_code::success, r.tid());
                req.respond(response_with_id{rpc.id(), error_code::success,
Loading