diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 6d47717c64a486920f5343507d043723e9374ce4..7398808a8c24f9ab3ec99c823f9c0bc34f2015a0 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -84,8 +84,8 @@ if(SCORD_BUILD_TESTS) set_tests_properties(start_cargo PROPERTIES FIXTURES_SETUP cargo) - add_test(stop_cargo - ${CMAKE_SOURCE_DIR}/scripts/runner.sh stop TERM cargo.pid) + + add_test(stop_cargo ${CARGO_BIN_INSTALL_DIR}/cargo_shutdown --server ${DATA_STAGER_ADDRESS_STRING}) set_tests_properties(stop_cargo PROPERTIES FIXTURES_CLEANUP cargo) diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index 060633b1ba345c3f6885ef05ea5e37a60fcb21ed..dc335e87ae9c7c4feb21936e24273090efdddf43 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -75,15 +75,30 @@ main(int argc, char* argv[]) { server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); + + std::vector ins; + std::vector outs; + + scord::dataset in1; + scord::dataset out1; + + in1 = scord::dataset("lustre:/tmp/input-dataset-1"); + out1 = scord::dataset("gekkofs:/tmp/input-dataset-cp"); + + ins.push_back (in1); + outs.push_back (out1); + scord::job::requirements reqs(inputs, outputs, expected_outputs, adhoc_storage); const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); + const auto transfer = scord::transfer_datasets( - server, job, sources, targets, qos_limits, mapping); + server, job, ins, outs, qos_limits, mapping); - scord::transfer_update(server, transfer.id(), 10.0f); + // scord::transfer_update(server, transfer.id(), 10.0f); + fmt::print(stdout, "ADM_transfer_update() remote procedure completed " "successfully\n"); exit(EXIT_SUCCESS); diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 67bd704db4d63d0eb59a569bd5809aafb66b3b0f..5d47a59bfe722e5c33de03fca9ab698851b67584 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority - ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update + ADM_cancel_transfer ADM_get_pending_transfers # qos ADM_set_qos_constraints ADM_get_qos_constraints # data operations @@ -77,7 +77,7 @@ if(SCORD_BUILD_TESTS) ${SCORD_CTL_ADDRESS_STRING} ${DATA_STAGER_ADDRESS_STRING}) set_tests_properties(run_${TEST_NAME} - PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl" + PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl;cargo" ENVIRONMENT "${TEST_ENV}") add_test(validate_${TEST_NAME} diff --git a/spack/packages/scord/package.py b/spack/packages/scord/package.py index 53616ca2417f09a2e387dbc030180524b62df4df..334911b47bdef3f62ae74123213887869bb01237 100644 --- a/spack/packages/scord/package.py +++ b/spack/packages/scord/package.py @@ -83,18 +83,19 @@ class Scord(CMakePackage): # specific dependencies # v0.2.0+ depends_on("argobots@1.1", when='@0.2.0:') - depends_on("mochi-margo@0.9.8", when='@0.2.0:') - depends_on("mochi-thallium@0.10.1", when='@0.2.0:') + depends_on("mochi-margo@0.9.8:", when='@0.2.0:') + depends_on("mochi-thallium@0.10.1:", when='@0.2.0:') depends_on("boost@1.71 +program_options", when='@0.2.0:') depends_on("redis-plus-plus@1.3.3:", when='@0.2.0:') + depends_on("cargo@0.3.2:", when='@0.3.1:') with when("@0.2.0: +ofi"): depends_on("libfabric@1.14.0 fabrics=sockets,tcp,rxm") - depends_on("mercury@2.1.0 +ofi") + depends_on("mercury@2.1.0: +ofi") with when("@0.2.0: +ucx"): depends_on("ucx@1.12.0") - depends_on("mercury@2.1.0 +ucx") + depends_on("mercury@2.1.0: +ucx") def cmake_args(self): """Setup scord CMake arguments""" diff --git a/src/common/net/server.cpp b/src/common/net/server.cpp index 464a02ea2abf97a3ad3309343944b94d34d4c592..8e8c96769c4f5f1cd878f6879cb985ed23f96ff4 100644 --- a/src/common/net/server.cpp +++ b/src/common/net/server.cpp @@ -391,6 +391,7 @@ server::teardown_and_exit() { void server::shutdown() { + m_shutting_down = true; m_network_engine.finalize(); } diff --git a/src/common/net/server.hpp b/src/common/net/server.hpp index 267ac598d89c3e912b817ddb15a6bdd842c794e2..f405db4817e69583aca07cea9aa7092a99e0e0b8 100644 --- a/src/common/net/server.hpp +++ b/src/common/net/server.hpp @@ -110,6 +110,7 @@ private: protected: thallium::engine m_network_engine; + std::atomic m_shutting_down; private: scord::utils::signal_listener m_signal_listener; diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 71f03dc54e591aa4dbcf1706262a2ad6fa6942ab..1b9f797ae3586f46ea313a8af49bf576c3b99c1d 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -245,15 +245,6 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, return ADM_SUCCESS; } -ADM_return_t -ADM_transfer_update(ADM_server_t server, uint64_t transfer_id, - float obtained_bw) { - - return scord::detail::transfer_update(scord::server{server}, transfer_id, - obtained_bw); -} - - ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index cc37fb09fc60a6301e8aad218aae8c532196ecb9..8d26b4ee68d79097374df28717204395af14dbc0 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -531,36 +531,4 @@ transfer_datasets(const server& srv, const job& job, } -scord::error_code -transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { - - network::client rpc_client{srv.protocol()}; - - const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); - - if(const auto& lookup_rv = rpc_client.lookup(srv.address()); - lookup_rv.has_value()) { - const auto& endp = lookup_rv.value(); - - LOGGER_INFO("rpc {:<} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, - transfer_id, obtained_bw); - - if(const auto& call_rv = - endp.call(rpc.name(), transfer_id, obtained_bw); - call_rv.has_value()) { - - const network::generic_response resp{call_rv.value()}; - - LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, - resp.error_code(), resp.op_id()); - - return resp.error_code(); - } - } - - LOGGER_ERROR("rpc call failed"); - return scord::error_code::other; -} - } // namespace scord::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index d26575c1800f758e0cfac52d38614b0a47f65034..c1c5f054725c0078962d6a1271252dce6792358c 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -86,9 +86,6 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); -scord::error_code -transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); - } // namespace scord::detail diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index 4b341d640d5f2859839a4e98de855c01c419a5de..19e3d58e1344dea2be3928e1bcad1c1428a2fb01 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -378,18 +378,6 @@ transfer_datasets(const server& srv, const job& job, } -void -transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { - - const auto ec = detail::transfer_update(srv, transfer_id, obtained_bw); - - if(!ec) { - throw std::runtime_error( - fmt::format("ADM_transfer_update() error: {}", ec.message())); - } -} - - ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 8ec74eb6e00a65fdae5df9ce1f9f272bb601d26e..98d6a62c3f2dfd38df6b1ecfc2aab5fa292120b3 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -29,6 +29,7 @@ #include #include #include "rpc_server.hpp" +#include template constexpr std::optional @@ -41,13 +42,45 @@ value_or_none(tl::expected&& e) { using namespace std::literals; +namespace { +cargo::dataset +dataset_process(std::string id) { + + cargo::dataset::type type = cargo::dataset::type::posix; + if(id.find("lustre:") != std::string::npos) { + id = id.substr(strlen("lustre:")); + type = cargo::dataset::type::parallel; + } else if(id.find("gekkofs:") != std::string::npos) { + id = id.substr(strlen("gekkofs:")); + type = cargo::dataset::type::posix; + } else if(id.find("hercules:") != std::string::npos) { + id = id.substr(strlen("hercules:")); + type = cargo::dataset::type::hercules; + } else if(id.find("expand:") != std::string::npos) { + id = id.substr(strlen("expand:")); + type = cargo::dataset::type::expand; + } else if(id.find("dataclay:") != std::string::npos) { + id = id.substr(strlen("dataclay:")); + type = cargo::dataset::type::dataclay; + } else + type = cargo::dataset::type::posix; + + return cargo::dataset{id, type}; +} +} // namespace namespace scord { rpc_server::rpc_server(std::string name, std::string address, bool daemonize, std::filesystem::path rundir) : server::server(std::move(name), std::move(address), std::move(daemonize), std::move(rundir)), - provider::provider(m_network_engine, 0) { + provider::provider(m_network_engine, 0), + m_scheduler_ess(thallium::xstream::create()), + m_scheduler_ult( + m_scheduler_ess->make_thread([this]() { scheduler_update(); })) { + + ; + #define EXPAND(rpc_name) "ADM_" #rpc_name##s, &rpc_server::rpc_name @@ -65,11 +98,17 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(update_pfs_storage)); provider::define(EXPAND(remove_pfs_storage)); provider::define(EXPAND(transfer_datasets)); - provider::define(EXPAND(transfer_update)); #undef EXPAND + m_network_engine.push_prefinalize_callback([this]() { + m_scheduler_ult->join(); + m_scheduler_ult = thallium::managed{}; + m_scheduler_ess->join(); + m_scheduler_ess = thallium::managed{}; + }); } + #define RPC_NAME() ("ADM_"s + __FUNCTION__) void @@ -372,8 +411,8 @@ rpc_server::update_adhoc_storage( name, adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", - child_rpc, adhoc_metadata_ptr->uuid(), - adhoc_storage.type(), adhoc_storage.get_resources()); + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), + adhoc_storage.get_resources()); if(const auto call_rv = endp->call( child_rpc.name(), adhoc_metadata_ptr->uuid(), @@ -469,8 +508,8 @@ rpc_server::deploy_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", - child_rpc, adhoc_metadata_ptr->uuid(), - adhoc_storage.type(), adhoc_storage.get_resources()); + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), + adhoc_storage.get_resources()); if(const auto call_rv = endp->call( rpc.name(), adhoc_metadata_ptr->uuid(), adhoc_storage.type(), @@ -548,8 +587,7 @@ rpc_server::terminate_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}}}", child_rpc, - adhoc_metadata_ptr->uuid(), - adhoc_storage.type()); + adhoc_metadata_ptr->uuid(), adhoc_storage.type()); if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata_ptr->uuid(), @@ -724,11 +762,11 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, // TODO: check type of storage tier to enable parallel transfers std::transform(sources.cbegin(), sources.cend(), std::back_inserter(inputs), - [](const auto& src) { return cargo::dataset{src.id()}; }); + [](const auto& src) { return ::dataset_process(src.id()); }); std::transform(targets.cbegin(), targets.cend(), std::back_inserter(outputs), - [](const auto& tgt) { return cargo::dataset{tgt.id()}; }); + [](const auto& tgt) { return ::dataset_process(tgt.id()); }); const auto cargo_tx = cargo::transfer_datasets(srv, inputs, outputs); @@ -755,45 +793,72 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, resp.error_code(), resp.value_or_none()); - req.respond(resp); } +/* Scheduling is done each 0.5 s*/ void -rpc_server::transfer_update(const network::request& req, uint64_t transfer_id, - float obtained_bw) { - - using network::get_address; - using network::response_with_id; - using network::rpc_info; - - const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - - LOGGER_INFO("rpc {:>} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, - transfer_id, obtained_bw); - - scord::error_code ec; - - // TODO: generate a global ID for the transfer and contact Cargo to - // actually request it +rpc_server::scheduler_update() { + std::vector> return_set; + const auto threshold = 0.1f; + while(!m_shutting_down) { + thallium::thread::self().sleep(m_network_engine, 500); + m_transfer_manager.lock(); + const auto transfer = m_transfer_manager.transfer(); + std::vector v_ids; + for(const auto& tr_unit : transfer) { + const auto tr_info = tr_unit.second.get(); + + // Contact for transfer status + const auto status = tr_info->transfer().status(); + + switch(status.state()) { + case cargo::transfer_state::completed: + LOGGER_INFO("Completed"); + v_ids.push_back(tr_unit.first); + continue; + break; + case cargo::transfer_state::failed: + LOGGER_INFO("Failed"); + v_ids.push_back(tr_unit.first); + continue; + break; + case cargo::transfer_state::pending: + continue; + break; + case cargo::transfer_state::running: + break; + } - const auto resp = response_with_id{rpc.id(), ec, transfer_id}; + tr_info->update(status.bw()); + auto bw = tr_info->measured_bandwidth(); + uint64_t qos = 0; + try { + qos = tr_info->qos().front().value(); + } catch(const std::exception& e) { + continue; + } - LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + if(bw == -1) { + continue; + } - // TODO: create a transfer in transfer manager - // We need the contact point, and different qos + if(bw + bw * threshold > qos) { + // Send decrease / slow signal to cargo + tr_info->transfer().bw_control(+1); + } else if(bw - bw * threshold < qos) { + // Send increase / speed up signal to cargo + tr_info->transfer().bw_control(-1); + } + } + m_transfer_manager.unlock(); - ec = m_transfer_manager.update(transfer_id, obtained_bw); - if(ec.no_such_entity) { - LOGGER_ERROR( - "rpc id: {} error_msg: \"Error updating transfer_storage\"", - rpc.id()); + // Remove all failed/done transfers + for(const auto id : v_ids) { + m_transfer_manager.remove(id); + } } - - - req.respond(resp); } diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index ef6b066cca4511ffc065ad59f256face678a58ec..08788fcc4ad82b231ed926db4096c17f7b531631 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -26,6 +26,7 @@ #define SCORD_RPC_SERVER_HPP #include +#include #include #include #include "job_manager.hpp" @@ -105,14 +106,30 @@ private: const std::vector& limits, enum scord::transfer::mapping mapping); - void - transfer_update(const network::request& req, uint64_t transfer_id, - float obtained_bw); job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager; transfer_manager m_transfer_manager; + + // Dedicated execution stream for the MPI listener ULT + thallium::managed m_scheduler_ess; + // ULT for the MPI listener + thallium::managed m_scheduler_ult; + + +public: + /** + * @brief Generates scheduling information, a set of pairs (contact point, + * and action) + * + * It causes a lock-unlock of the transfer_manager structure. + * Is a thread + * + * @return none + */ + void + scheduler_update(); }; } // namespace scord diff --git a/src/scord/transfer_manager.hpp b/src/scord/transfer_manager.hpp index 6abb74f2deb8e87d6bde6343dfcd45beef8800b4..dfe4fd89b60eb3b5b47bf51c881d4516912c05bb 100644 --- a/src/scord/transfer_manager.hpp +++ b/src/scord/transfer_manager.hpp @@ -116,6 +116,23 @@ struct transfer_manager { return tl::make_unexpected(scord::error_code::no_such_entity); } + std::unordered_map< + scord::transfer_id, + std::shared_ptr>> + transfer() { + return m_transfer; + } + + void + lock() { + m_transfer_mutex.lock(); + } + + void + unlock() { + m_transfer_mutex.unlock(); + } + private: mutable abt::shared_mutex m_transfer_mutex; std::unordered_map<