Commit 94b47875 authored by Ramon Nou's avatar Ramon Nou
Browse files

Implemented sequential for potential adhoc types

parent d5ddfc88
Loading
Loading
Loading
Loading
Loading
+31 −17
Original line number Diff line number Diff line
@@ -48,20 +48,29 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
             const cargo::dataset& input, const cargo::dataset& output) {

    if(input.supports_parallel_transfer()) {
        return std::make_tuple(static_cast<int>(cargo::tag::pread),
                               cargo::transfer_message{tid, seqno, input.path(),
                                                       output.path(), static_cast<uint32_t>(output.get_type())});
        return std::make_tuple(
                static_cast<int>(cargo::tag::pread),
                cargo::transfer_message{
                        tid, seqno, input.path(),
                        static_cast<uint32_t>(input.get_type()), output.path(),
                        static_cast<uint32_t>(output.get_type())});
    }

    if(output.supports_parallel_transfer()) {
        return std::make_tuple(static_cast<int>(cargo::tag::pwrite),
                               cargo::transfer_message{tid, seqno, input.path(),
                                                       output.path(), static_cast<uint32_t>(input.get_type())});
        return std::make_tuple(
                static_cast<int>(cargo::tag::pwrite),
                cargo::transfer_message{
                        tid, seqno, input.path(),
                        static_cast<uint32_t>(input.get_type()), output.path(),
                        static_cast<uint32_t>(output.get_type())});
    }

    return std::make_tuple(
            static_cast<int>(cargo::tag::sequential),
            cargo::transfer_message{tid, seqno, input.path(), output.path(), static_cast<uint32_t>(input.get_type())});
            cargo::transfer_message{tid, seqno, input.path(),
                                    static_cast<uint32_t>(input.get_type()),
                                    output.path(),
                                    static_cast<uint32_t>(input.get_type())});
}

} // namespace
@@ -71,10 +80,11 @@ using namespace std::literals;
namespace cargo {

master_server::master_server(std::string name, std::string address,
                             bool daemonize, std::filesystem::path rundir, std::uint64_t block_size,
                             bool daemonize, std::filesystem::path rundir,
                             std::uint64_t block_size,
                             std::optional<std::filesystem::path> pidfile)
    : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(block_size),
             std::move(pidfile)),
    : server(std::move(name), std::move(address), daemonize, std::move(rundir),
             std::move(block_size), std::move(pidfile)),
      provider(m_network_engine, 0),
      m_mpi_listener_ess(thallium::xstream::create()),
      m_mpi_listener_ult(m_mpi_listener_ess->make_thread(
@@ -303,8 +313,12 @@ master_server::transfer_datasets(const network::request& req,
                    const auto& s = v_s_new[i];
                    const auto& d = v_d_new[i];

                    // Create the directory if it does not exist (only in parallel transfer)
                    if(!std::filesystem::path(d.path()).parent_path().empty() and d.supports_parallel_transfer()) {
                    // Create the directory if it does not exist (only in
                    // parallel transfer)
                    if(!std::filesystem::path(d.path())
                                .parent_path()
                                .empty() and
                       d.supports_parallel_transfer()) {
                        std::filesystem::create_directories(
                                std::filesystem::path(d.path()).parent_path());
                    }
+3 −0
Original line number Diff line number Diff line
@@ -6,6 +6,9 @@
#ifdef HERCULES_PLUGIN
#include "hercules_plugin.hpp"
#endif
#ifdef EXPAND_PLUGIN
#include "expand_plugin.hpp"
#endif

namespace cargo {

+16 −6
Original line number Diff line number Diff line
@@ -53,9 +53,11 @@ public:
    transfer_message() = default;

    transfer_message(std::uint64_t tid, std::uint32_t seqno,
                     std::string input_path, std::string output_path, std::uint32_t type)
                     std::string input_path, std::uint32_t i_type,
                     std::string output_path, std::uint32_t o_type)
        : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)),
          m_output_path(std::move(output_path)), m_type(type) {}
          m_i_type(i_type), m_output_path(std::move(output_path)),
          m_o_type(o_type) {}

    [[nodiscard]] std::uint64_t
    tid() const {
@@ -78,8 +80,14 @@ public:
    }
    /* Enum is converted from cargo::dataset::type to cargo::FSPlugin::type */
    [[nodiscard]] cargo::FSPlugin::type
    type() const {
        return static_cast<cargo::FSPlugin::type>(m_type);
    o_type() const {
        return static_cast<cargo::FSPlugin::type>(m_o_type);
    }

    /* Enum is converted from cargo::dataset::type to cargo::FSPlugin::type */
    [[nodiscard]] cargo::FSPlugin::type
    i_type() const {
        return static_cast<cargo::FSPlugin::type>(m_i_type);
    }

private:
@@ -92,14 +100,16 @@ private:
        ar& m_seqno;
        ar& m_input_path;
        ar& m_output_path;
        ar& m_type;
        ar& m_i_type;
        ar& m_o_type;
    }

    std::uint64_t m_tid{};
    std::uint32_t m_seqno{};
    std::string m_input_path;
    std::uint32_t m_i_type{};
    std::string m_output_path;
    std::uint32_t m_type{};
    std::uint32_t m_o_type{};
};

class status_message {
+8 −5
Original line number Diff line number Diff line
@@ -33,9 +33,11 @@ namespace cargo {
mpio_read::mpio_read(mpi::communicator workers,
                     std::filesystem::path input_path,
                     std::filesystem::path output_path,
                     std::uint64_t block_size, FSPlugin::type fs_type)
                     std::uint64_t block_size, FSPlugin::type fs_i_type,
                     FSPlugin::type fs_o_type)
    : m_workers(std::move(workers)), m_input_path(std::move(input_path)),
      m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_type(fs_type) {}
      m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)),
      m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type) {}

cargo::error_code
mpio_read::operator()() {
@@ -123,9 +125,10 @@ mpio_read::operator()() {
        }

        // step3. POSIX write data
        // We need to create the directory if it does not exists (using FSPlugin)
        m_output_file = std::make_unique<posix_file::file>(
                posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_type));
        // We need to create the directory if it does not exists (using
        // FSPlugin)
        m_output_file = std::make_unique<posix_file::file>(posix_file::create(
                m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type));

        m_output_file->fallocate(0, 0, file_size);

+6 −6
Original line number Diff line number Diff line
@@ -38,7 +38,8 @@ class mpio_read : public operation {

public:
    mpio_read(mpi::communicator workers, std::filesystem::path input_path,
              std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type);
              std::filesystem::path output_path, std::uint64_t block_size,
              FSPlugin::type fs_i_type, FSPlugin::type m_fs_o_type);

    cargo::error_code
    operator()() final;
@@ -50,7 +51,6 @@ public:
    progress(int ongoing_index) final;

private:

    mpi::communicator m_workers;
    std::filesystem::path m_input_path;
    std::filesystem::path m_output_path;
@@ -63,8 +63,8 @@ private:
    memory_buffer m_buffer;
    std::vector<buffer_region> m_buffer_regions;
    std::uint64_t m_kb_size;
    FSPlugin::type m_fs_type;
    
    FSPlugin::type m_fs_i_type;
    FSPlugin::type m_fs_o_type;
};

} // namespace cargo
Loading