Loading src/mpioxx.hpp +1 −2 Original line number Diff line number Diff line Loading @@ -135,7 +135,6 @@ public: MPI_File result; if(const auto ec = MPI_File_open(comm, filepath.c_str(), static_cast<int>(mode), MPI_INFO_NULL, &result); Loading src/worker/mpio_read.cpp +3 −7 Original line number Diff line number Diff line Loading @@ -37,7 +37,8 @@ mpio_read::mpio_read(mpi::communicator workers, FSPlugin::type fs_o_type, std::size_t size, bool single) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} cargo::error_code mpio_read::operator()() { Loading @@ -48,13 +49,11 @@ mpio_read::operator()() { m_status = error_code::transfer_in_progress; try { std::cout << "Trying to open file " << m_input_path << std::endl; const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); mpioxx::offset file_size = m_file_size; std::size_t block_size = m_kb_size * 1024u; // create block type MPI_Datatype block_type; MPI_Type_contiguous(static_cast<int>(block_size), MPI_BYTE, Loading @@ -75,7 +74,6 @@ mpio_read::operator()() { workers_size = 1; workers_rank = 0; } // create file type MPI_Datatype file_type; /* Loading Loading @@ -144,7 +142,6 @@ mpio_read::operator()() { m_workers_rank = workers_rank; m_block_size = block_size; } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_mpi_error(e.error_code()); Loading Loading @@ -175,7 +172,6 @@ mpio_read::progress(int ongoing_index) { try { int index = 0; // TODO : FS not defined... m_status = error_code::transfer_in_progress; for(const auto& file_range : all_of(posix_file::file{m_input_path, m_fs_i_type}) | Loading src/worker/worker.cpp +37 −19 Original line number Diff line number Diff line Loading @@ -194,18 +194,35 @@ worker::run() { if(size <= m_block_size * 1024) { // Optimize and process only to one worker // the one that is processing is i%worker == 0 if(((i%workers.size()) == (unsigned int)workers.rank())) { if(((i % workers.size()) == (unsigned int) workers.rank())) { update_state(msg->source(), m.tid(), i, output_path, transfer_state::pending, -1.0f); std::vector<int> ranks_to_exclude; // Exclude all for(auto id = 0; id < workers.size(); id++) { if(id != workers.rank()) { ranks_to_exclude.push_back(id); } } const auto tempworkers = ::make_communicator( workers, workers.group().exclude( ranks_to_exclude.begin(), ranks_to_exclude.end()), 0); m_ops.emplace(std::make_pair( make_pair(input_path, output_path), make_pair(operation::make_operation( t, workers, input_path, output_path, m_block_size, m.i_type(), m.o_type(), size, true), t, tempworkers, input_path, output_path, m_block_size, m.i_type(), m.o_type(), size, true), -1))); // TODO : Issue 1, seqno is not different from each file // TODO : Issue 1, seqno is not different from each // file // -(we use i) const auto op = m_ops[make_pair(input_path, output_path)] Loading @@ -226,7 +243,8 @@ worker::run() { make_pair(operation::make_operation( t, workers, input_path, output_path, m_block_size, m.i_type(), m.o_type(), size, false), m.i_type(), m.o_type(), size, false), -1))); // TODO : Issue 1, seqno is not different from each file // -(we use i) Loading Loading
src/mpioxx.hpp +1 −2 Original line number Diff line number Diff line Loading @@ -135,7 +135,6 @@ public: MPI_File result; if(const auto ec = MPI_File_open(comm, filepath.c_str(), static_cast<int>(mode), MPI_INFO_NULL, &result); Loading
src/worker/mpio_read.cpp +3 −7 Original line number Diff line number Diff line Loading @@ -37,7 +37,8 @@ mpio_read::mpio_read(mpi::communicator workers, FSPlugin::type fs_o_type, std::size_t size, bool single) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} cargo::error_code mpio_read::operator()() { Loading @@ -48,13 +49,11 @@ mpio_read::operator()() { m_status = error_code::transfer_in_progress; try { std::cout << "Trying to open file " << m_input_path << std::endl; const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); mpioxx::offset file_size = m_file_size; std::size_t block_size = m_kb_size * 1024u; // create block type MPI_Datatype block_type; MPI_Type_contiguous(static_cast<int>(block_size), MPI_BYTE, Loading @@ -75,7 +74,6 @@ mpio_read::operator()() { workers_size = 1; workers_rank = 0; } // create file type MPI_Datatype file_type; /* Loading Loading @@ -144,7 +142,6 @@ mpio_read::operator()() { m_workers_rank = workers_rank; m_block_size = block_size; } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_mpi_error(e.error_code()); Loading Loading @@ -175,7 +172,6 @@ mpio_read::progress(int ongoing_index) { try { int index = 0; // TODO : FS not defined... m_status = error_code::transfer_in_progress; for(const auto& file_range : all_of(posix_file::file{m_input_path, m_fs_i_type}) | Loading
src/worker/worker.cpp +37 −19 Original line number Diff line number Diff line Loading @@ -194,18 +194,35 @@ worker::run() { if(size <= m_block_size * 1024) { // Optimize and process only to one worker // the one that is processing is i%worker == 0 if(((i%workers.size()) == (unsigned int)workers.rank())) { if(((i % workers.size()) == (unsigned int) workers.rank())) { update_state(msg->source(), m.tid(), i, output_path, transfer_state::pending, -1.0f); std::vector<int> ranks_to_exclude; // Exclude all for(auto id = 0; id < workers.size(); id++) { if(id != workers.rank()) { ranks_to_exclude.push_back(id); } } const auto tempworkers = ::make_communicator( workers, workers.group().exclude( ranks_to_exclude.begin(), ranks_to_exclude.end()), 0); m_ops.emplace(std::make_pair( make_pair(input_path, output_path), make_pair(operation::make_operation( t, workers, input_path, output_path, m_block_size, m.i_type(), m.o_type(), size, true), t, tempworkers, input_path, output_path, m_block_size, m.i_type(), m.o_type(), size, true), -1))); // TODO : Issue 1, seqno is not different from each file // TODO : Issue 1, seqno is not different from each // file // -(we use i) const auto op = m_ops[make_pair(input_path, output_path)] Loading @@ -226,7 +243,8 @@ worker::run() { make_pair(operation::make_operation( t, workers, input_path, output_path, m_block_size, m.i_type(), m.o_type(), size, false), m.i_type(), m.o_type(), size, false), -1))); // TODO : Issue 1, seqno is not different from each file // -(we use i) Loading