Commit d5ada770 authored by Ramon Nou's avatar Ramon Nou
Browse files

Error mitigation (too many inflight messages)

parent 1dbd5331
Loading
Loading
Loading
Loading
Loading
+10 −1
Original line number Diff line number Diff line
@@ -479,6 +479,11 @@ master_server::transfer_datasets(const network::request& req,
        if(buf.st_mode & S_IFDIR) {
            LOGGER_INFO("Expanding input directory {}", p);
            files = fs->readdir(p);
            // As we need to create a new directory, we need to order the files
            // so that directories are created in the correct order

            // Order the files alphabetically
            std::sort(files.begin(), files.end());


            /*
@@ -548,6 +553,7 @@ master_server::transfer_datasets(const network::request& req,
                       d.supports_parallel_transfer()) {
                        std::filesystem::create_directories(
                                std::filesystem::path(d.path()).parent_path());
                        LOGGER_INFO("Created directory {}", d.path());
                    }

                    // If we are not using ftio start transfer if we are on
@@ -555,12 +561,15 @@ master_server::transfer_datasets(const network::request& req,
                    if(!m_ftio) {
                        // If we are on stage-out


// some sleep here may help ? too many messages to the workers?
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] = make_message(r.tid(), i, s, d);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                            // Wait 1 ms
                            std::this_thread::sleep_for(20ms);

                        }
                    } else {
                        m_ftio_tid = r.tid();
+15 −3
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@
#include "fs_plugin/fs_plugin.hpp"
#include "cargo.hpp"
#include <iostream>
#include "../logger/logger.hpp"
extern "C" {
#include <unistd.h>
};
@@ -69,7 +70,8 @@ public:
    file_handle&
    operator=(const file_handle& other) = delete;

    explicit operator bool() const noexcept {
    explicit
    operator bool() const noexcept {
        return valid();
    }

@@ -161,7 +163,8 @@ public:

    explicit file(std::filesystem::path filepath,
                  cargo::FSPlugin::type t) noexcept
        : m_path(std::move(filepath)), m_fs_plugin(cargo::FSPlugin::make_fs(t)) {}
        : m_path(std::move(filepath)),
          m_fs_plugin(cargo::FSPlugin::make_fs(t)) {}

    file(std::filesystem::path filepath, int fd,
         std::shared_ptr<cargo::FSPlugin> fs_plugin) noexcept
@@ -293,6 +296,14 @@ protected:
    std::shared_ptr<cargo::FSPlugin> m_fs_plugin;
};

static void
recursive_mkdir(const std::filesystem::path& path,
                std::shared_ptr<cargo::FSPlugin> fs_plugin) {
    if(path.has_parent_path() and path != "/") {
        recursive_mkdir(path.parent_path(), fs_plugin);
        fs_plugin->mkdir(path.parent_path().c_str(), 0755);
    }
}

static inline file
open(const std::filesystem::path& filepath, int flags, ::mode_t mode,
@@ -304,12 +315,13 @@ open(const std::filesystem::path& filepath, int flags, ::mode_t mode,
    // We don't check if it exists, we just create it if flags is set to O_CREAT

    if(flags & O_CREAT) {
        fs_plugin->mkdir(filepath.parent_path().c_str(), 0755);
        recursive_mkdir(filepath, fs_plugin);
    }

    int fd = fs_plugin->open(filepath.c_str(), flags, mode);

    if(fd == -1) {
        LOGGER_INFO("[PLUGIN] Open error {}", filepath.c_str());
        throw io_error("posix_file::open ", errno);
    }

+1 −1
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ seq_mixed_operation::operator()() {
        const auto workers_size = m_workers.size();
        const auto workers_rank = m_workers.rank();
        std::size_t block_size = m_kb_size * 1024u;
       
        m_input_file = std::make_unique<posix_file::file>(
                posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type));
        std::size_t file_size = m_input_file->size();
@@ -68,7 +69,6 @@ seq_mixed_operation::operator()() {
                                          block_size);
        }


        m_output_file = std::make_unique<posix_file::file>(posix_file::create(
                m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type));