Commit dc2baf64 authored by Ramon Nou's avatar Ramon Nou
Browse files

fix mtime and mpio_write

parent 595f0032
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -383,7 +383,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
    std::vector<cargo::dataset> v_d_new;
    std::vector<cargo::dataset> v_d_new;
    std::vector<std::size_t> v_size_new;
    std::vector<std::size_t> v_size_new;
    time_t now = time(0);
    time_t now = time(0);
    // now = now - 5; // Threshold for mtime
    now = now - 1; // Threshold for mtime
    for(auto i = 0u; i < pt.m_sources.size(); ++i) {
    for(auto i = 0u; i < pt.m_sources.size(); ++i) {


        const auto& s = pt.m_sources[i];
        const auto& s = pt.m_sources[i];
+58 −25
Original line number Original line Diff line number Diff line
@@ -51,7 +51,10 @@ mpio_write::operator()() {
        m_input_file = std::make_unique<posix_file::file>(
        m_input_file = std::make_unique<posix_file::file>(
                posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type));
                posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type));
        remove(m_output_path.c_str());
        remove(m_output_path.c_str());
        LOGGER_INFO("Worker: Opening {} --> handler {} / fileSize {} --- Output {} ", m_input_path, m_input_file->handle(), m_file_size, m_output_path);     
        LOGGER_INFO(
                "Worker: Opening {} --> handler {} / fileSize {} --- Output {} ",
                m_input_path, m_input_file->handle(), m_file_size,
                m_output_path);
        std::size_t file_size = m_file_size;
        std::size_t file_size = m_file_size;


        // compute the number of blocks in the file
        // compute the number of blocks in the file
@@ -118,7 +121,8 @@ mpio_write::progress(int ongoing_index) {
    using posix_file::views::strided;
    using posix_file::views::strided;


    // compute the number of blocks in the file
    // compute the number of blocks in the file
   // LOGGER_INFO("Worker: Progress {}/{} --> handler {} ", ongoing_index, m_input_path, m_input_file->handle());     
    // LOGGER_INFO("Worker: Progress {}/{} --> handler {} ", ongoing_index,
    // m_input_path, m_input_file->handle());
    int index = 0;
    int index = 0;
    if(ongoing_index == 0) {
    if(ongoing_index == 0) {
        m_bytes_per_rank = 0;
        m_bytes_per_rank = 0;
@@ -151,10 +155,12 @@ mpio_write::progress(int ongoing_index) {




            m_bytes_per_rank += n;
            m_bytes_per_rank += n;

            // Do sleep (But be a bit reactive...)
            // Do sleep (But be a bit reactive...)
            auto total_sleep = sleep_value();
            auto total_sleep = sleep_value();
            auto small_sleep = total_sleep / 100;
            auto small_sleep = total_sleep / 100;
            if (small_sleep == std::chrono::milliseconds(0)) small_sleep = std::chrono::milliseconds(1);
            if(small_sleep == std::chrono::milliseconds(0))
                small_sleep = std::chrono::milliseconds(1);
            while(total_sleep > std::chrono::milliseconds(0)) {
            while(total_sleep > std::chrono::milliseconds(0)) {
                std::this_thread::sleep_for(small_sleep);
                std::this_thread::sleep_for(small_sleep);
                total_sleep -= small_sleep;
                total_sleep -= small_sleep;
@@ -180,7 +186,6 @@ mpio_write::progress(int ongoing_index) {
        }
        }
        m_input_file->close();
        m_input_file->close();
        // step 2. write buffer data in parallel to the PFS
        // step 2. write buffer data in parallel to the PFS
        //LOGGER_INFO("START WRITING file {}", m_output_path);


        const auto output_file =
        const auto output_file =
                mpioxx::file::open(m_workers, m_output_path,
                mpioxx::file::open(m_workers, m_output_path,
@@ -218,18 +223,46 @@ mpio_write::progress(int ongoing_index) {
            return -1;
            return -1;
        }
        }


        // step 3. parallel write data from buffers
        // step 3. parallel write data from buffers in chunks if necessary
        if(const auto ec =
        const std::size_t max_bytes_per_call =
                   MPI_File_write_all(output_file, m_buffer.data(),
                INT_MAX; // Maximum bytes for one MPI call
                                      static_cast<int>(m_bytes_per_rank),
        std::size_t bytes_written_total = 0;
        const char* current_buffer_ptr = m_buffer.data();

        while(bytes_written_total < m_bytes_per_rank) {
            std::size_t bytes_to_write_this_call =
                    m_bytes_per_rank - bytes_written_total;
            if(bytes_to_write_this_call > max_bytes_per_call) {
                bytes_to_write_this_call = max_bytes_per_call;
            }

            // Ensure the cast is now safe
            int count_this_call = static_cast<int>(bytes_to_write_this_call);

            LOGGER_DEBUG("MPI_File_write_all chunk: Writing {} bytes",
                         count_this_call);

            if(const auto ec = MPI_File_write_all(
                       output_file, current_buffer_ptr, count_this_call,
                       MPI_BYTE, MPI_STATUS_IGNORE);
                       MPI_BYTE, MPI_STATUS_IGNORE);
               ec != MPI_SUCCESS) {
               ec != MPI_SUCCESS) {
            LOGGER_ERROR("MPI_File_write_all() failed: {}",
                LOGGER_ERROR("MPI_File_write_all() chunk failed: {}",
                             mpi::error_string(ec));
                             mpi::error_string(ec));
                // Consider freeing MPI types before returning/throwing
                MPI_Type_free(&block_type);
                MPI_Type_free(&file_type);
                m_status = make_mpi_error(ec);
                m_status = make_mpi_error(ec);
            return -1;
                return -1; // Or throw an exception
            }
            }


            bytes_written_total += bytes_to_write_this_call;
            current_buffer_ptr += bytes_to_write_this_call;
        }

        // Don't forget to free the MPI types after they are no longer needed
        MPI_Type_free(&block_type);
        MPI_Type_free(&file_type);



    } catch(const mpioxx::io_error& e) {
    } catch(const mpioxx::io_error& e) {
        LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
        LOGGER_ERROR("{}() failed: {}", e.where(), e.what());