Loading src/master.cpp +17 −9 Original line number Diff line number Diff line Loading @@ -348,16 +348,19 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // Read the regular expression pattern from the file auto patternFile = "/lustre/project/nhr-admire/shared/nek_regex4cargo.txt"; std::ifstream file(patternFile); std::string patternStr; std::string patternStr; // default bool filtering = false; if (file.is_open()) { std::getline(file, patternStr); file.close(); filtering = true; } else { LOGGER_ERROR("opening pattern file {}", patternFile); return; //return; } LOGGER_INFO("Using pattern str '{}' for regex from file '{}'", patternStr, patternFile); std::regex pattern(patternStr); std::regex pattern; if (filtering) pattern.assign(patternStr); mpi::communicator world; std::vector<cargo::dataset> v_s_new; Loading Loading @@ -403,10 +406,12 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // path (d.path) plus the path from f, removing the // initial path p (taking care of the trailing /) //LOGGER_INFO("GKFS file {} checking ...", s_new.path()); if (filtering) { if(!std::regex_match(s_new.path(), pattern)) { LOGGER_INFO("GKFS file {} IGNORED", s_new.path()); continue; } } auto leading = p.size(); if(leading > 0 and p.back() == '/') { leading--; Loading Loading @@ -465,12 +470,14 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { } // Send message to worker (seq number is 0) if(v_s_new.size() != 0){ for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) { const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast<int>(rank), t, m); } } } void master_server::transfer_datasets(const network::request& req, Loading Loading @@ -604,6 +611,7 @@ master_server::transfer_datasets(const network::request& req, // some sleep here may help ? too many messages to the workers? // Changed to one message for all the files. seq is 0 if (v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), 0, v_s_new, v_d_new); Loading @@ -611,7 +619,7 @@ master_server::transfer_datasets(const network::request& req, world.send(static_cast<int>(rank), t, m); // Wait 1 ms // std::this_thread::sleep_for(20ms); } } } else { m_ftio_tid = r.tid(); Loading @@ -638,7 +646,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); LOGGER_DEBUG("rpc {:>} body: {{tid: {}}}", rpc, tid); m_request_manager.lookup(tid) .or_else([&](auto&& ec) { Loading Loading
src/master.cpp +17 −9 Original line number Diff line number Diff line Loading @@ -348,16 +348,19 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // Read the regular expression pattern from the file auto patternFile = "/lustre/project/nhr-admire/shared/nek_regex4cargo.txt"; std::ifstream file(patternFile); std::string patternStr; std::string patternStr; // default bool filtering = false; if (file.is_open()) { std::getline(file, patternStr); file.close(); filtering = true; } else { LOGGER_ERROR("opening pattern file {}", patternFile); return; //return; } LOGGER_INFO("Using pattern str '{}' for regex from file '{}'", patternStr, patternFile); std::regex pattern(patternStr); std::regex pattern; if (filtering) pattern.assign(patternStr); mpi::communicator world; std::vector<cargo::dataset> v_s_new; Loading Loading @@ -403,10 +406,12 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // path (d.path) plus the path from f, removing the // initial path p (taking care of the trailing /) //LOGGER_INFO("GKFS file {} checking ...", s_new.path()); if (filtering) { if(!std::regex_match(s_new.path(), pattern)) { LOGGER_INFO("GKFS file {} IGNORED", s_new.path()); continue; } } auto leading = p.size(); if(leading > 0 and p.back() == '/') { leading--; Loading Loading @@ -465,12 +470,14 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { } // Send message to worker (seq number is 0) if(v_s_new.size() != 0){ for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) { const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast<int>(rank), t, m); } } } void master_server::transfer_datasets(const network::request& req, Loading Loading @@ -604,6 +611,7 @@ master_server::transfer_datasets(const network::request& req, // some sleep here may help ? too many messages to the workers? // Changed to one message for all the files. seq is 0 if (v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), 0, v_s_new, v_d_new); Loading @@ -611,7 +619,7 @@ master_server::transfer_datasets(const network::request& req, world.send(static_cast<int>(rank), t, m); // Wait 1 ms // std::this_thread::sleep_for(20ms); } } } else { m_ftio_tid = r.tid(); Loading @@ -638,7 +646,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); LOGGER_DEBUG("rpc {:>} body: {{tid: {}}}", rpc, tid); m_request_manager.lookup(tid) .or_else([&](auto&& ec) { Loading