Commit 360f4727 authored by Ramon Nou's avatar Ramon Nou
Browse files

Solved MPIopen bug

parent 1c80c7a9
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19)


project(
project(
  cargo
  cargo
  VERSION 0.4.0
  VERSION 0.3.0
  LANGUAGES C CXX
  LANGUAGES C CXX
)
)


+0 −1
Original line number Original line Diff line number Diff line
@@ -32,7 +32,6 @@
#include <chrono>
#include <chrono>
#include <cargo/error.hpp>
#include <cargo/error.hpp>


constexpr const uint64_t TIMES = 100;
namespace cargo {
namespace cargo {


using transfer_id = std::uint64_t;
using transfer_id = std::uint64_t;
+1 −1
Original line number Original line Diff line number Diff line
@@ -36,7 +36,7 @@ class Cargo(CMakePackage):
    version("latest", branch="main")
    version("latest", branch="main")
    version("0.1.0", sha256="981d00adefbc2ea530f57f8428bd7980e4aab2993a86d8ae4274334c8f055bdb", deprecated=True)
    version("0.1.0", sha256="981d00adefbc2ea530f57f8428bd7980e4aab2993a86d8ae4274334c8f055bdb", deprecated=True)
    version("0.2.0", sha256="fd7fa31891b3961dcb376556ec5fa028bf512d96a7c688a160f9dade58dae36f")
    version("0.2.0", sha256="fd7fa31891b3961dcb376556ec5fa028bf512d96a7c688a160f9dade58dae36f")
    version("0.4.0", sha256="7a3de25165a6c6ce9dc356634d89f7052f8d2bef")
    version("0.3.0", branch="rnou/directory_support")


    # build variants
    # build variants
    variant('build_type',
    variant('build_type',
+79 −66
Original line number Original line Diff line number Diff line
@@ -108,19 +108,15 @@ void
master_server::mpi_listener_ult() {
master_server::mpi_listener_ult() {


    mpi::communicator world;
    mpi::communicator world;
    uint64_t times = 0;
    while(!m_shutting_down) {
    while(!m_shutting_down) {


        auto msg = world.iprobe();
        auto msg = world.iprobe();


        if(!msg) {
        if(!msg) {
            thallium::thread::self().sleep(m_network_engine, 150*times);
            thallium::thread::self().sleep(m_network_engine, 150);
            if (times < TIMES) {
                times++;
            }
            continue;
            continue;
        }
        }
        times=0;

        switch(static_cast<cargo::tag>(msg->tag())) {
        switch(static_cast<cargo::tag>(msg->tag())) {
            case tag::status: {
            case tag::status: {
                status_message m;
                status_message m;
@@ -226,14 +222,12 @@ master_server::transfer_datasets(const network::request& req,
    LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources,
    LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources,
                targets);
                targets);


    m_request_manager.create(sources.size(), world.size() - 1)

            .or_else([&](auto&& ec) {
    // As we accept directories expanding directories should be done before and
                LOGGER_ERROR("Failed to create request: {}", ec);
    // update sources and targets.
                LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec);

                req.respond(generic_response{rpc.id(), ec});
    std::vector<cargo::dataset> v_s_new;
            })
    std::vector<cargo::dataset> v_d_new;
            .map([&](auto&& r) {
                assert(sources.size() == targets.size());


    for(auto i = 0u; i < sources.size(); ++i) {
    for(auto i = 0u; i < sources.size(); ++i) {


@@ -268,32 +262,51 @@ master_server::transfer_datasets(const network::request& req,
                cargo::dataset s_new(s);
                cargo::dataset s_new(s);
                cargo::dataset d_new(d);
                cargo::dataset d_new(d);
                s_new.path(f);
                s_new.path(f);
                            // We need to get filename from the original root path (d.path) plus the path from f, removing the initial path p
                // We need to get filename from the original root
                            d_new.path(d.path() / std::filesystem::path(f.string().substr(p.size() + 1)));
                // path (d.path) plus the path from f, removing the
                // initial path p
                d_new.path(d.path() / std::filesystem::path(
                                              f.string().substr(p.size() + 1)));


                LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                             d_new.path());
                             d_new.path());
                            for(std::size_t rank = 1; rank <= r.nworkers();
                v_s_new.push_back(s_new);
                                ++rank) {
                v_d_new.push_back(d_new);
                                const auto [t, m] =
                                        make_message(r.tid(), i, s_new, d_new);
                                LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                                world.send(static_cast<int>(rank), t, m);
                            }
            }
            }


        } else {
        } else {
                        // normal use case, we are specifying files
            v_s_new.push_back(s);
            v_d_new.push_back(d);
        }
    }

    m_request_manager.create(v_s_new.size(), world.size() - 1)
            .or_else([&](auto&& ec) {
                LOGGER_ERROR("Failed to create request: {}", ec);
                LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec);
                req.respond(generic_response{rpc.id(), ec});
            })
            .map([&](auto&& r) {
                assert(v_s_new.size() == v_d_new.size());



                        for(std::size_t rank = 1; rank <= r.nworkers();
                // For all the files
                            ++rank) {
                for(std::size_t i = 0; i < v_s_new.size(); ++i) {
                    const auto& s = v_s_new[i];
                    const auto& d = v_d_new[i];

                    // Create the directory if it does not exist
                    if(!std::filesystem::path(d.path()).parent_path().empty()) {
                        std::filesystem::create_directories(
                                std::filesystem::path(d.path()).parent_path());
                    }

                    for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) {
                        const auto [t, m] = make_message(r.tid(), i, s, d);
                        const auto [t, m] = make_message(r.tid(), i, s, d);
                        LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                        LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                        world.send(static_cast<int>(rank), t, m);
                        world.send(static_cast<int>(rank), t, m);
                    }
                    }
                }
                }
                }

                LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc,
                LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc,
                            error_code::success, r.tid());
                            error_code::success, r.tid());
                req.respond(response_with_id{rpc.id(), error_code::success,
                req.respond(response_with_id{rpc.id(), error_code::success,
+1 −15
Original line number Original line Diff line number Diff line
@@ -135,20 +135,6 @@ public:


        MPI_File result;
        MPI_File result;


        // At this point we may face the possibility of an unexistent directory
        // The File open semantics will not create the directory and fail.
        // As the operation are done in the prolog, we may not been able to create
        // such directory in the parallel filesystem nor the adhoc fs.

        // We will create the needed directories if we are writing.

        if (mode == file_open_mode::wronly) {
            // Decompose the filepath and create the needed directories.
            const std::filesystem::path dir = filepath.parent_path();
            if (!std::filesystem::exists(dir)) {
                std::filesystem::create_directories(dir);
            }
        }
    
    
        if(const auto ec =
        if(const auto ec =
                   MPI_File_open(comm, filepath.c_str(), static_cast<int>(mode),
                   MPI_File_open(comm, filepath.c_str(), static_cast<int>(mode),
Loading