Loading src/master.cpp +23 −21 Original line number Diff line number Diff line Loading @@ -196,11 +196,11 @@ master_server::ftio_scheduling_ult() { continue; LOGGER_INFO("Waiting period : {}", m_period); // Wait in small periods, just in case we change it, This should be mutexed... // Wait in small periods, just in case we change it, This should be // mutexed... auto elapsed = m_period; while(elapsed > 0) { std::this_thread::sleep_for( std::chrono::seconds((int)(1))); std::this_thread::sleep_for(std::chrono::seconds((int) (1))); elapsed -= 1; if(m_ftio_changed) { elapsed = m_period; Loading Loading @@ -505,7 +505,19 @@ master_server::transfer_datasets(const network::request& req, }) .map([&](auto&& r) { assert(v_s_new.size() == v_d_new.size()); if(m_ftio) { if(sources[0].get_type() == cargo::dataset::type::gekkofs) { // We have only one pendingTransfer for FTIO // that can be updated, the issue is that we // need the tid. m_pending_transfer.m_p = r; m_pending_transfer.m_sources = sources; m_pending_transfer.m_targets = targets; m_pending_transfer.m_work = true; LOGGER_INFO("Stored stage-out information"); } } // For all the transfers for(std::size_t i = 0; i < v_s_new.size(); ++i) { const auto& s = v_s_new[i]; Loading @@ -523,20 +535,10 @@ master_server::transfer_datasets(const network::request& req, // If we are not using ftio start transfer if we are on // stage-out if(m_ftio) { if(!m_ftio) { // If we are on stage-out if(s.get_type() == cargo::dataset::type::gekkofs) { // We have only one pendingTransfer for FTIO // that can be updated, the issue is that we // need the tid. m_pending_transfer.m_p = r; m_pending_transfer.m_sources = sources; m_pending_transfer.m_targets = targets; m_pending_transfer.m_work = true; LOGGER_INFO("Stored stage-out information"); } } else { for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), i, s, d); Loading src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -28,13 +28,13 @@ gekko_plugin::open(const std::string& path, int flags, unsigned int mode) { // Override the pread function ssize_t gekko_plugin::pread(int fd, void* buf, size_t count, off_t offset) { return gkfs::syscall::gkfs_pread_ws(fd, buf, count, offset); return gkfs::syscall::gkfs_pread(fd, buf, count, offset); } // Override the pwrite function ssize_t gekko_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { return gkfs::syscall::gkfs_pwrite_ws(fd, buf, count, offset); return gkfs::syscall::gkfs_pwrite(fd, buf, count, offset); } Loading src/request_manager.cpp +16 −17 Original line number Diff line number Diff line Loading @@ -64,22 +64,21 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { * @return error_code */ error_code request_manager::update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers){ request_manager::update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers) { abt::unique_lock lock(m_mutex); m_requests[tid] = std::vector<file_status>{nfiles, std::vector<part_status>{nworkers}}; if(const auto it = m_requests.find(tid); it != m_requests.end()) { it->second.resize(nfiles, std::vector<part_status>{nworkers}); return error_code::success; } LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); return error_code::no_such_transfer; } error_code request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, std::optional<error_code> ec) { abt::unique_lock lock(m_mutex); Loading Loading
src/master.cpp +23 −21 Original line number Diff line number Diff line Loading @@ -196,11 +196,11 @@ master_server::ftio_scheduling_ult() { continue; LOGGER_INFO("Waiting period : {}", m_period); // Wait in small periods, just in case we change it, This should be mutexed... // Wait in small periods, just in case we change it, This should be // mutexed... auto elapsed = m_period; while(elapsed > 0) { std::this_thread::sleep_for( std::chrono::seconds((int)(1))); std::this_thread::sleep_for(std::chrono::seconds((int) (1))); elapsed -= 1; if(m_ftio_changed) { elapsed = m_period; Loading Loading @@ -505,7 +505,19 @@ master_server::transfer_datasets(const network::request& req, }) .map([&](auto&& r) { assert(v_s_new.size() == v_d_new.size()); if(m_ftio) { if(sources[0].get_type() == cargo::dataset::type::gekkofs) { // We have only one pendingTransfer for FTIO // that can be updated, the issue is that we // need the tid. m_pending_transfer.m_p = r; m_pending_transfer.m_sources = sources; m_pending_transfer.m_targets = targets; m_pending_transfer.m_work = true; LOGGER_INFO("Stored stage-out information"); } } // For all the transfers for(std::size_t i = 0; i < v_s_new.size(); ++i) { const auto& s = v_s_new[i]; Loading @@ -523,20 +535,10 @@ master_server::transfer_datasets(const network::request& req, // If we are not using ftio start transfer if we are on // stage-out if(m_ftio) { if(!m_ftio) { // If we are on stage-out if(s.get_type() == cargo::dataset::type::gekkofs) { // We have only one pendingTransfer for FTIO // that can be updated, the issue is that we // need the tid. m_pending_transfer.m_p = r; m_pending_transfer.m_sources = sources; m_pending_transfer.m_targets = targets; m_pending_transfer.m_work = true; LOGGER_INFO("Stored stage-out information"); } } else { for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), i, s, d); Loading
src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -28,13 +28,13 @@ gekko_plugin::open(const std::string& path, int flags, unsigned int mode) { // Override the pread function ssize_t gekko_plugin::pread(int fd, void* buf, size_t count, off_t offset) { return gkfs::syscall::gkfs_pread_ws(fd, buf, count, offset); return gkfs::syscall::gkfs_pread(fd, buf, count, offset); } // Override the pwrite function ssize_t gekko_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { return gkfs::syscall::gkfs_pwrite_ws(fd, buf, count, offset); return gkfs::syscall::gkfs_pwrite(fd, buf, count, offset); } Loading
src/request_manager.cpp +16 −17 Original line number Diff line number Diff line Loading @@ -64,22 +64,21 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { * @return error_code */ error_code request_manager::update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers){ request_manager::update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers) { abt::unique_lock lock(m_mutex); m_requests[tid] = std::vector<file_status>{nfiles, std::vector<part_status>{nworkers}}; if(const auto it = m_requests.find(tid); it != m_requests.end()) { it->second.resize(nfiles, std::vector<part_status>{nworkers}); return error_code::success; } LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); return error_code::no_such_transfer; } error_code request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, std::optional<error_code> ec) { abt::unique_lock lock(m_mutex); Loading