Verified Commit 3d784c3a authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

server: Initial support for parallel writes

parent d80ec9c8
Loading
Loading
Loading
Loading
Loading
+106 −4
Original line number Diff line number Diff line
@@ -180,11 +180,113 @@ mpio_write(const mpi::communicator& workers,
           const std::filesystem::path& input_path,
           const std::filesystem::path& output_path) {

    (void) workers;
    (void) input_path;
    (void) output_path;
    using posix_file::views::all_of;
    using posix_file::views::as_blocks;
    using posix_file::views::strided;

    LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__);
    const auto workers_size = workers.size();
    const auto workers_rank = workers.rank();
    std::size_t block_size = 512u;
    std::size_t file_size = std::filesystem::file_size(input_path);

    // compute the number of blocks in the file
    int total_blocks = static_cast<int>(file_size / block_size);

    if(file_size % block_size != 0) {
        ++total_blocks;
    }

    // find how many blocks this rank is responsible for
    std::size_t blocks_per_rank = total_blocks / workers_size;

    if(int64_t n = total_blocks % workers_size; n != 0 && workers_rank < n) {
        ++blocks_per_rank;
    }

    // step 1. acquire buffers
    memory_buffer buffer;
    buffer.resize(blocks_per_rank * block_size);

    std::vector<buffer_region> buffer_regions;
    buffer_regions.reserve(blocks_per_rank);

    for(std::size_t i = 0; i < blocks_per_rank; ++i) {
        buffer_regions.emplace_back(buffer.data() + i * block_size, block_size);
    }

    const auto rv = posix_file::open(input_path, O_RDONLY);

    if(!rv) {
        LOGGER_ERROR("posix_file::open({}) failed: {} ", input_path,
                     rv.error().message());
        // TODO  : gracefully fail
    }

    int index = 0;
    std::size_t bytes_per_rank = 0;

    for(const auto& input_file = rv.value();
            const auto& file_range : all_of(input_file) | as_blocks(block_size) |
                                     strided(workers_size, workers_rank)) {

        assert(buffer_regions[index].size() >= file_range.size());
        const auto ret = input_file.pread(
                buffer_regions[index], file_range.offset(), file_range.size());

        if(!ret) {
            LOGGER_ERROR("pread() failed: {}", ret.error().message());
        }

        LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]",
                     fmt::join(buffer_regions[index].begin(),
                               buffer_regions[index].begin() + 10, ""),
                     fmt::join(buffer_regions[index].end() - 10,
                               buffer_regions[index].end(), ""));

        bytes_per_rank += ret.value();
        ++index;
    }

    // step 2. write buffer data in parallel to the PFS
    const auto output_file = mpioxx::file::open(
            workers, output_path,
            mpioxx::file_open_mode::create | mpioxx::file_open_mode::wronly);

    // create block type
    MPI_Datatype block_type;
    MPI_Type_contiguous(static_cast<int>(block_size), MPI_BYTE, &block_type);
    MPI_Type_commit(&block_type);

    // create file type
    MPI_Datatype file_type;

    /*
     * count: number of blocks in the type
     * blocklen: number of `oldtype` elements in each block
     * stride: number of `oldtype` elements between start of each block
     */
    MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1,
            /* stride: */ workers_size, /* oldtype: */ block_type,
                                 &file_type);
    MPI_Type_commit(&file_type);

    if(const auto ec = MPI_File_set_view(output_file,
                /* disp: */ workers_rank * block_size,
                /* elementary_type: */ block_type,
                                         file_type, "native", MPI_INFO_NULL);
            ec != MPI_SUCCESS) {
        LOGGER_ERROR("MPI_File_set_view() failed: {}", mpi::error_string(ec));
        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
    }

    // step 3. parallel write data from buffers
    if(const auto ec = MPI_File_write_all(output_file, buffer.data(),
                                          static_cast<int>(bytes_per_rank),
                                          MPI_BYTE, MPI_STATUS_IGNORE);
            ec != MPI_SUCCESS) {
        LOGGER_ERROR("MPI_File_write_all() failed: {}", mpi::error_string(ec));
        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
    }
}

void