Verified Commit d80ec9c8 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

server: Initial support for parallel reads

parent 27e69c58
Loading
Loading
Loading
Loading
+114 −4
Original line number Diff line number Diff line
@@ -26,6 +26,9 @@
#include <logger/logger.hpp>
#include <boost/mpi.hpp>
#include <boost/mpi/error_string.hpp>
#include <span>
#include <posix_file/file.hpp>
#include <posix_file/views.hpp>
#include "message.hpp"
#include "mpioxx.hpp"

@@ -53,16 +56,123 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group,

} // namespace

using memory_buffer = std::vector<char>;
using buffer_region = std::span<char>;

void
mpio_read(const mpi::communicator& workers,
          const std::filesystem::path& input_path,
          const std::filesystem::path& output_path) {

    (void) workers;
    (void) input_path;
    (void) output_path;
    using posix_file::views::all_of;
    using posix_file::views::as_blocks;
    using posix_file::views::strided;

    LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__);
    const auto input_file = mpioxx::file::open(workers, input_path,
                                               mpioxx::file_open_mode::rdonly);

    mpioxx::offset file_size = input_file.size();
    std::size_t block_size = 512u;

    // create block type
    MPI_Datatype block_type;
    MPI_Type_contiguous(static_cast<int>(block_size), MPI_BYTE, &block_type);
    MPI_Type_commit(&block_type);

    // compute the number of blocks in the file
    int total_blocks = static_cast<int>(file_size / block_size);

    if(file_size % block_size != 0) {
        ++total_blocks;
    }

    const auto workers_size = workers.size();
    const auto workers_rank = workers.rank();

    // create file type
    MPI_Datatype file_type;
    /*
     * count: number of blocks in the type
     * blocklen: number of elements in each block
     * stride: number of elements between start of each block
     */
    MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1,
            /* stride: */ workers_size, /* oldtype: */ block_type,
                                 &file_type);
    MPI_Type_commit(&file_type);

    MPI_Offset disp = workers_rank * block_size;
    MPI_Datatype etype = block_type;
    MPI_Datatype filetype = file_type;

    if(const auto ec = MPI_File_set_view(input_file, disp, etype, filetype,
                                         "native", MPI_INFO_NULL);
            ec != MPI_SUCCESS) {
        LOGGER_ERROR("MPI_File_set_view() failed: {}", mpi::error_string(ec));
        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
    }

    // find how many blocks this rank is responsible for
    std::size_t blocks_per_rank = total_blocks / workers_size;

    if(int64_t n = total_blocks % workers_size; n != 0 && workers_rank < n) {
        ++blocks_per_rank;
    }

    // step 1. acquire buffers
    memory_buffer buffer;
    buffer.resize(blocks_per_rank * block_size);

    std::vector<buffer_region> buffer_regions;
    buffer_regions.reserve(blocks_per_rank);

    for(std::size_t i = 0; i < blocks_per_rank; ++i) {
        buffer_regions.emplace_back(buffer.data() + i * block_size, block_size);
    }

    MPI_Datatype datatype = block_type;

    // step2. parallel read data into buffers
    if(const auto ec = MPI_File_read_all(input_file, buffer.data(),
                                         static_cast<int>(blocks_per_rank),
                                         datatype, MPI_STATUS_IGNORE);
            ec != MPI_SUCCESS) {
        LOGGER_ERROR("MPI_File_read_all() failed: {}", mpi::error_string(ec));
        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
    }

    // step3. POSIX write data
    if(const auto rv =
                posix_file::create(output_path, O_WRONLY, S_IRUSR | S_IWUSR);
            !rv) {
        LOGGER_ERROR("posix_file::create({}) failed: {}", output_path,
                     rv.error().message());
    } else {

        const auto& output_file = rv.value();

        if(const auto ret = output_file.fallocate(0, 0, file_size); !rv) {
            LOGGER_ERROR("posix_file::fallocate({}, {}, {}) failed: {}", 0, 0,
                         file_size, ret.error().message());
            // TODO  : gracefully fail
        }

        int index = 0;
        for(const auto& file_range :
                all_of(posix_file::file{input_path}) | as_blocks(block_size) |
                strided(workers_size, workers_rank)) {
            assert(buffer_regions[index].size() >= file_range.size());
            const auto ret =
                    output_file.pwrite(buffer_regions[index],
                                       file_range.offset(), file_range.size());

            if(!ret) {
                LOGGER_ERROR("pwrite() failed: {}", ret.error().message());
            }

            ++index;
        }
    }
}

void