diff --git a/CMakeLists.txt b/CMakeLists.txt index f46b19390e4f95789bce763362812f2e191c8f5e..4cd6304d01e2ee32c4da35e2585bc663a4abba4c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( scord - VERSION 0.3.3 + VERSION 0.3.4 LANGUAGES C CXX ) diff --git a/cli/scord_adhoc.cpp b/cli/scord_adhoc.cpp index 8e05de3a2cd7b1706e1965952671dd8a76cb1e00..5a12c8263d248f7e87d819d382684dd2643aed72 100644 --- a/cli/scord_adhoc.cpp +++ b/cli/scord_adhoc.cpp @@ -34,12 +34,14 @@ struct query_config { std::string controller_address; std::string stager_address; std::uint32_t slurm_id{}; + std::uint32_t job_id{}; std::uint32_t adhocid{}; std::string nodes; std::string adhocfs; std::string inputs; std::string outputs; std::string function; + std::uint32_t qos{}; }; @@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) { ->option_text("CARGOADDRESS") ->required(); app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required(); + app.add_option("--job_id", cfg.job_id, "Job ID (for subsequent ops)"); app.add_option("-n,--nodes", cfg.nodes, "Nodes"); @@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) { app.add_option("-o,--outputs", cfg.outputs, "Output dataset"); + app.add_option("-q,--qos", cfg.qos, "QoS MB/s"); + app.add_option("-f,--function", cfg.function, "Function {create, stage-in, stage-out, destroy}") ->required(); @@ -113,6 +118,21 @@ parse_address(const std::string& address) { return std::make_pair(protocol, address); } +auto +create_adhoc_type_from_name(const std::string& name) { + if(name == "gekkofs") { + return scord::adhoc_storage::type::gekkofs; + } else if(name == "hercules") { + return scord::adhoc_storage::type::hercules; + } else if(name == "expand") { + return scord::adhoc_storage::type::expand; + } else if(name == "dataclay") { + return scord::adhoc_storage::type::dataclay; + } else { + throw std::runtime_error( + fmt::format("Invalid adhoc fs type: {}", name)); + } +} int main(int argc, char* argv[]) { @@ -141,24 +161,12 @@ main(int argc, char* argv[]) { scord::job::resources job_resources(nodes); // Step 1b : Define adhoc_storage - /* type needs to process the sctring in cfg.adhocfs */ - auto type = scord::adhoc_storage::type::gekkofs; - if(cfg.adhocfs == "gekkofs") { - type = scord::adhoc_storage::type::gekkofs; - } else if(cfg.adhocfs == "hercules") { - type = scord::adhoc_storage::type::hercules; - } else if(cfg.adhocfs == "expand") { - type = scord::adhoc_storage::type::expand; - } else { - throw std::runtime_error( - fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs)); - } + auto type = create_adhoc_type_from_name(cfg.adhocfs); std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id); scord::adhoc_storage::resources resources{nodes}; - scord::adhoc_storage::ctx ctx{ cfg.controller_address, cfg.stager_address, @@ -167,33 +175,11 @@ main(int argc, char* argv[]) { 100, false}; - scord::adhoc_storage adhoc_storage = register_adhoc_storage( srv, adhoc_name, type, ctx, resources); - - fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id()); - auto path = deploy_adhoc_storage(srv, adhoc_storage); - - - // Step 1c : Define job_requirements - /* - - job::requirements::requirements( - std::vector inputs, - std::vector outputs, - std::vector expected_outputs) - : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)), - m_expected_outputs(std::move(expected_outputs)) {} - - job::requirements::requirements( - std::vector inputs, - std::vector outputs, - std::vector expected_outputs, - scord::adhoc_storage adhoc_storage) - */ - + fmt::print("{},{}\n", path, adhoc_storage.id()); /* Separate inputs into vector of inputs */ @@ -226,20 +212,95 @@ main(int argc, char* argv[]) { auto job = scord::register_job(srv, job_resources, job_requirements, cfg.slurm_id); + return job.id(); + } - // Now Tranfer Datasets + if(cfg.function == "stage-in") { + // As the job is registered, we can now transfer datasets. Get + // inputs from requirements (TODO) + // Step 2 : If function is stage-in, transfer datasets // convert inputs to split inputs (src, dst) std::vector inputs_src, inputs_dst; - for(auto& route : inputs) { - inputs_src.push_back(route.source()); - inputs_dst.push_back(route.destination()); + auto v_routes_in = split(cfg.inputs, ';'); + + for(auto& src_dst : v_routes_in) { + auto route = split(src_dst, ','); + + inputs_src.push_back(scord::dataset{route[0]}); + inputs_dst.push_back(scord::dataset{route[1]}); + } + + scord::job job(cfg.job_id, cfg.slurm_id); + std::vector v_qos; + if(cfg.qos) { + scord::qos::limit limit{scord::qos::subclass::bandwidth, + cfg.qos}; + v_qos.push_back(limit); } - scord::transfer_datasets(srv, job, inputs_src, inputs_dst, - std::vector{}, - scord::transfer::mapping::n_to_n); + auto transfer = scord::transfer_datasets( + srv, job, inputs_src, inputs_dst, v_qos, + scord::transfer::mapping::n_to_n); + return transfer.id(); + } + + if(cfg.function == "wait") { + // Wait for transfer operation to finish + const scord::transfer transfer = scord::transfer{cfg.slurm_id}; + scord::job job(cfg.job_id, cfg.slurm_id); + + auto response = scord::query_transfer(srv, job, transfer); + + while(response.status() != scord::transfer_state::type::finished) { + std::this_thread::sleep_for(1s); + response = scord::query_transfer(srv, job, transfer); + if(scord::transfer_state::type::failed == response.status()) { + fmt::print("Transfer failed\n"); + return EXIT_FAILURE; + } + } + } + + if(cfg.function == "stage-out") { + // Step 3 : If function is stage-out, transfer datasets + // convert inputs to split inputs (src, dst) + std::vector outputs_src, outputs_dst; + + auto v_routes_out = split(cfg.outputs, ';'); + + for(auto& src_dst : v_routes_out) { + auto route = split(src_dst, ','); + + outputs_src.push_back(scord::dataset{route[0]}); + outputs_dst.push_back(scord::dataset{route[1]}); + } + + scord::job job(cfg.job_id, cfg.slurm_id); + std::vector v_qos; + if(cfg.qos) { + scord::qos::limit limit{scord::qos::subclass::bandwidth, + cfg.qos}; + v_qos.push_back(limit); + } + + auto transfer = scord::transfer_datasets( + srv, job, outputs_src, outputs_dst, v_qos, + scord::transfer::mapping::n_to_n); + return transfer.id(); + } + + if(cfg.function == "destroy") { + // Step 4 : If function is destroy, terminate adhoc fs server + // Build a scord::adhoc_storage object with the adhocid + auto type = create_adhoc_type_from_name(cfg.adhocfs); + scord::adhoc_storage::resources resources; + scord::adhoc_storage::ctx ctx; + + scord::adhoc_storage adhoc_storage(type, "", cfg.adhocid, ctx, + resources); + terminate_adhoc_storage(srv, adhoc_storage); } } catch(const std::exception& ex) { diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index 6544724eac3b9c7ac90e22678b9f045bf4269d7c..be65b8f2ae32e4cc8e975bcf82851f5e307e4df2 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -1,3 +1,38 @@ -#!/usr/bin/env bash -echo "GEKKOFS Script Called" +#!/usr/bin/bash +echo "GEKKOFS Script Called" $HOSTNAME $SLURM_JOBID + + +if [ "$1" == "start" ]; then + echo "Starting GEKKOFS" + + nodes=$3 + num_nodes=$(echo $nodes | awk -F, '{print NF}') + # If num_nodes is greater than 40, we are on the testing environment + if [ $num_nodes -gt 40 ]; then + exit 0 + fi + workdir=$5 + datadir=$7 + mountdir=$9 + unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "mkdir -p $mountdir; mkdir -p $datadir" + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=4 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & + sleep 4 +elif [ "$1" == "stop" ]; then + echo "Stopping GEKKOFS" + + nodes=$3 + num_nodes=$(echo $nodes | awk -F, '{print NF}') + # If num_nodes is greater than 40, we are on the testing environment + if [ $num_nodes -gt 40 ]; then + exit 0 + fi + unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 pkill -9 gkfs_daemon +elif [ "$1" == "expand" ]; then + echo "Expand command" +elif [ "$1" == "shrink" ]; then + echo "shrink command" +fi + exit 0 diff --git a/spack/packages/scord/package.py b/spack/packages/scord/package.py index a8f045e896225f2637e064d16b1bf11f649e6e1b..0f40b0008ac77a3a1cd70a4619baa9cdb201d3b7 100644 --- a/spack/packages/scord/package.py +++ b/spack/packages/scord/package.py @@ -56,7 +56,7 @@ class Scord(CMakePackage): sha256="74c51915315e01d8479701d340331641f42c5f5cfae0c08bdea6c2f0b01da665") version("0.3.3", sha256="a8b5a8d05858bee91b9675ca6c929f4c16b5b2562f4e6a8dba3ce0aacb721f48") - + version("0.3.4", branch="rnou/adhoc_integration") # build variants variant('build_type', default='Release', @@ -92,6 +92,7 @@ class Scord(CMakePackage): depends_on("boost@1.71 +program_options", when='@0.2.0:') depends_on("redis-plus-plus@1.3.3:", when='@0.2.0:') depends_on("cargo@0.3.3:", when='@0.3.1:') + depends_on("cargo@0.3.4:", when='@0.3.4:') depends_on("slurm", when='@0.3.1:') diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 8d26b4ee68d79097374df28717204395af14dbc0..db8b58449b2b3e92b627caa18c06a96bcc4abb26 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -531,4 +531,41 @@ transfer_datasets(const server& srv, const job& job, } +tl::expected +query_transfer(const server& srv, const job& job, const transfer& transfer) { + + using response_type = network::response_with_value; + + network::client rpc_client{srv.protocol()}; + + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); + + if(const auto lookup_rv = rpc_client.lookup(srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{job_id: {}, tx_id: {}}}", rpc, job.id(), transfer.id()); + + if(const auto call_rv = endp.call(rpc.name(), job.id(), transfer.id()); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + + LOGGER_EVAL( + resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}, tx_state: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); + + if(!resp.error_code()) { + return tl::make_unexpected(resp.error_code()); + } + + return resp.value(); + } + } + + LOGGER_ERROR("rpc call failed"); + return tl::make_unexpected(scord::error_code::other); +} + } // namespace scord::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index c1c5f054725c0078962d6a1271252dce6792358c..e025e6300d08ff0518010b740ceb7c1c6e11d67c 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +tl::expected +query_transfer(const server& srv, const job& job, const transfer& transfer); + + } // namespace scord::detail diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index 19e3d58e1344dea2be3928e1bcad1c1428a2fb01..d71a3a6fdbbb1006ed6c23c7ef9e583105a74e67 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -377,6 +377,18 @@ transfer_datasets(const server& srv, const job& job, return rv.value(); } +scord::transfer_state +query_transfer(const server& srv, const job& job, const transfer& transfer) { + + const auto rv = detail::query_transfer(srv, job, transfer); + + if(!rv) { + throw std::runtime_error(fmt::format( + "ADM_query_transfer() error: {}", ADM_strerror(rv.error()))); + } + + return rv.value(); +} ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index 045bdf282e616ee71485958243cf75ff6a27ca10..6c35bde45352f0603e1b09707423db9c3d4cdf3c 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -98,6 +98,9 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +scord::transfer_state +query_transfer(const server& srv, const job& job, const transfer& transfer); + void transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 6ec23f138d3d215c63528a616626162e36e3b2d7..009240b6e7ab78dac4d5f1a6b7c7f5f30367e6a8 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -561,6 +561,43 @@ private: std::unique_ptr m_pimpl; }; +struct transfer_state { + +enum class type : std::underlying_type::type { + queued = ADM_TRANSFER_QUEUED, + running = ADM_TRANSFER_RUNNING, + finished = ADM_TRANSFER_FINISHED, + failed = ADM_TRANSFER_FAILED, + cancelled = ADM_TRANSFER_CANCELLED + }; + transfer_state(); + explicit transfer_state(transfer_state::type type); + explicit transfer_state(ADM_transfer_status_t adm_transfer_state); + explicit operator ADM_transfer_status_t() const; + + transfer_state(const transfer_state&) noexcept; + transfer_state(transfer_state&&) noexcept; + transfer_state& + operator=(const transfer_state&) noexcept; + transfer_state& + operator=(transfer_state&&) noexcept; + + ~transfer_state(); + + transfer_state::type + status() const; + + // The implementation for this must be deferred until + // after the declaration of the PIMPL class + template + void + serialize(Archive& ar); + +private: + class impl; + std::unique_ptr m_pimpl; +}; + namespace qos { enum class subclass : std::underlying_type::type { @@ -1240,4 +1277,36 @@ struct fmt::formatter> } }; +template <> +struct fmt::formatter : fmt::formatter { + // parse is inherited from formatter. + template + auto + format(const scord::transfer_state& t, FormatContext& ctx) const -> format_context::iterator { + + using scord::node; + std::string_view name = "unknown"; + + switch(t.status()){ + case scord::transfer_state::type::queued: + name = "queued"; + break; + case scord::transfer_state::type::running: + name = "running"; + break; + case scord::transfer_state::type::finished: + name = "finished"; + break; + case scord::transfer_state::type::failed: + name = "failed"; + break; + case scord::transfer_state::type::cancelled: + name = "cancelled"; + break; + } + + return formatter::format(name, ctx); + } +}; + #endif // SCORD_TYPES_HPP diff --git a/src/lib/types.c b/src/lib/types.c index 1e4269095aa8075fe69ec924c02d52104ba46bbc..826df7e52f92cc01c6881ba587a55841195d66d8 100644 --- a/src/lib/types.c +++ b/src/lib/types.c @@ -1144,6 +1144,54 @@ ADM_transfer_destroy(ADM_transfer_t tx) { return ret; } +/** + * Initialize a transfer state + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param [in] type A Transfer status + * @return A valid TRANSFER STATUS or NULL in case of failure. + */ +ADM_transfer_status_t +ADM_transfer_status_create(ADM_transfer_state_t type) { + + struct adm_transfer_status* adm_transfer_state = + (struct adm_transfer_status*) malloc(sizeof(struct adm_transfer_status)); + + if(!adm_transfer_state) { + LOGGER_ERROR("Could not allocate ADM_transfer_t"); + return NULL; + } + + adm_transfer_state->s_state = type; + + return adm_transfer_state; +} + + +/** + * Destroy a ADM_transfer_status_t created by ADM_transfer_status_create(). + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param[in] tx The ADM_transfer_status_t to destroy. + * @return ADM_SUCCESS or corresponding error code. + */ +ADM_return_t +ADM_transfer_status_destroy(ADM_transfer_status_t tx) { + ADM_return_t ret = ADM_SUCCESS; + + if(!tx) { + LOGGER_ERROR("Invalid ADM_transfer_status_t"); + return ADM_EBADARGS; + } + + free(tx); + return ret; +} + ADM_qos_limit_list_t ADM_qos_limit_list_create(ADM_qos_limit_t limits[], size_t length) { diff --git a/src/lib/types.cpp b/src/lib/types.cpp index a889d336c43ef26477cbae4cd360048b6ebea953..c07b9ef9ce6b01ae9345dbd22dd97d0e91076d6e 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -462,6 +462,100 @@ template void transfer::serialize( network::serialization::input_archive&); + +class transfer_state::impl { + +public: + impl() = default; + explicit impl(transfer_state::type type) : m_type(type) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + transfer_state::type + status() const { + return m_type; + } + + template + void + load(Archive& ar) { + ar(SCORD_SERIALIZATION_NVP(m_type)); + } + + template + void + save(Archive& ar) const { + ar(SCORD_SERIALIZATION_NVP(m_type)); + } + +private: + transfer_state::type m_type; +}; + +transfer_state::transfer_state() = default; + +transfer_state::transfer_state(transfer_state::type type) + : m_pimpl(std::make_unique(type)) {} + +transfer_state::transfer_state(ADM_transfer_status_t adm_transfer_state) + : transfer_state::transfer_state(static_cast(adm_transfer_state->s_state)) {} + +transfer_state::operator ADM_transfer_status_t() const { + return ADM_transfer_status_create((ADM_transfer_state_t)m_pimpl->status()); +} + +transfer_state::transfer_state(transfer_state&&) noexcept = default; + +transfer_state& +transfer_state::operator=(transfer_state&&) noexcept = default; + +transfer_state::transfer_state(const transfer_state& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +transfer_state& +transfer_state::operator=(const transfer_state& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +transfer_state::~transfer_state() = default; + +transfer_state::type +transfer_state::status() const { + return m_pimpl->status(); +} + +// since the PIMPL class is fully defined at this point, we can now +// define the serialization function +template +inline void +transfer_state::serialize(Archive& ar) { + ar(SCORD_SERIALIZATION_NVP(m_pimpl)); +} + +// we must also explicitly instantiate our template functions for +// serialization in the desired archives +template void +transfer_state::impl::save( + network::serialization::output_archive&) const; + +template void +transfer_state::impl::load( + network::serialization::input_archive&); + +template void +transfer_state::serialize( + network::serialization::output_archive&); + +template void +transfer_state::serialize( + network::serialization::input_archive&); + class dataset::impl { public: impl() = default; @@ -1214,6 +1308,7 @@ entity::data() const { return m_pimpl->data(); } + // since the PIMPL class is fully defined at this point, we can now // define the serialization function template diff --git a/src/lib/types_private.h b/src/lib/types_private.h index 3e0ba099bf1e3ec0c3448d9e71bb20793e4a6983..834c58756ca0fb78cb70df6a48a9e5859c49fe43 100644 --- a/src/lib/types_private.h +++ b/src/lib/types_private.h @@ -183,6 +183,9 @@ ADM_job_create(uint64_t id, uint64_t slurm_id); ADM_transfer_t ADM_transfer_create(uint64_t id); +ADM_transfer_status_t +ADM_transfer_status_create(ADM_transfer_state_t type); + #ifdef __cplusplus } // extern "C" #endif diff --git a/src/scord-ctl/command.cpp b/src/scord-ctl/command.cpp index c45b5b5cd52193584c3d32f9495dfb536f35804a..387778c999c553f71395ac3cf5c0936b687b131c 100644 --- a/src/scord-ctl/command.cpp +++ b/src/scord-ctl/command.cpp @@ -195,6 +195,38 @@ command::as_vector() const { return tmp; } +// Function to join two sets of environment variables +char** +joinEnvironments(char** env1, const char** env2) { + // Count the number of variables in each environment + int count1 = 0; + while(env1[count1] != nullptr) { + ++count1; + } + + int count2 = 0; + while(env2[count2] != nullptr) { + ++count2; + } + + // Allocate memory for the combined environment + char** combinedEnv = new char*[count1 + count2 + 1]; + + // Copy the variables from the first environment + for(int i = 0; i < count1; ++i) { + combinedEnv[i] = strdup(env1[i]); + } + + // Copy the variables from the second environment + for(int i = 0; i < count2; ++i) { + combinedEnv[count1 + i] = strdup(env2[i]); + } + + // Null-terminate the combined environment + combinedEnv[count1 + count2] = nullptr; + + return combinedEnv; +} void command::exec() const { @@ -207,8 +239,11 @@ command::exec() const { switch(const auto pid = ::fork()) { case 0: { + + // Join the environments + char** combinedEnv = joinEnvironments(environ, envp.get()); ::execvpe(argv[0], const_cast(argv.get()), - const_cast(envp.get())); + const_cast(combinedEnv)); // We cannot use the default logger in the child process because it // is not fork-safe, and even though we received a copy of the // global logger, it is not valid because the child process does diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index 13e6a938fa1afc20fb63ee4874f0eba5336f0f6b..c3dad566fdae25a6fa31afb9886de573563772ca 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -28,6 +28,7 @@ #include #include "rpc_server.hpp" +extern char** environ; using namespace std::literals; @@ -142,7 +143,7 @@ rpc_server::deploy_adhoc_storage( const auto& adhoc_cfg = it->second; LOGGER_DEBUG("deploy \"{:e}\" (ID: {})", adhoc_type, adhoc_uuid); - + // 1. Create a working directory for the adhoc storage instance adhoc_dir = adhoc_cfg.working_directory() / adhoc_uuid; @@ -172,6 +173,8 @@ rpc_server::deploy_adhoc_storage( const auto cmd = adhoc_cfg.startup_command().eval( adhoc_uuid, *adhoc_dir, hostnames); + // Fill environment + // 4. Execute the startup command try { diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 10bb1e41de766d6c352a31753c3997a8546e9452..d706b3349f339ec46721505189803b69eaf51315 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -97,6 +97,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(update_pfs_storage)); provider::define(EXPAND(remove_pfs_storage)); provider::define(EXPAND(transfer_datasets)); + provider::define(EXPAND(query_transfer)); #undef EXPAND m_network_engine.push_prefinalize_callback([this]() { @@ -875,13 +876,82 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, } +void +rpc_server::query_transfer(const network::request& req, scord::job_id job_id, scord::transfer_id tx_id) { + + using network::get_address; + using network::response_with_value; + using network::rpc_info; + using response_with_status = response_with_value; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{job_id: {}, tx_id{}}}", + rpc, job_id, tx_id); + + const auto jm_result = m_job_manager.find(job_id); + + if(!jm_result) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error finding job: {}\"", + rpc.id(), job_id); + const auto resp = response_with_status{rpc.id(), jm_result.error()}; + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + req.respond(resp); + return; + } + + const auto& job_metadata_ptr = jm_result.value(); + + if(!job_metadata_ptr->adhoc_storage_metadata()) { + LOGGER_ERROR("rpc id: {} error_msg: \"Job has no adhoc storage\"", + rpc.id(), job_id); + const auto resp = response_with_status{rpc.id(), error_code::no_resources}; + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + req.respond(resp); + return; + } + + const auto data_stager_address = + job_metadata_ptr->adhoc_storage_metadata()->data_stager_address(); + + // Transform the `scord::dataset`s into `cargo::dataset`s and contact the + // Cargo service associated with the job's adhoc storage instance to + // execute the transfers. + cargo::server srv{data_stager_address}; + + // Register the transfer into the `tranfer_manager`. + // We embed the generated `cargo::transfer` object into + // scord's `transfer_metadata` so that we can later query the Cargo + // service for the transfer's status. + const auto rv = + m_transfer_manager.find(tx_id) + .or_else([&](auto&& ec) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error finding " + "transfer: {}\"", + rpc.id(), ec); + }) + .and_then([&](auto&& transfer_metadata_ptr) + -> tl::expected { + return scord::transfer_state(static_cast(transfer_metadata_ptr->transfer().status().state())); + }); + + const auto resp = + rv ? response_with_status{rpc.id(), error_code::success, rv.value()} + : response_with_status{rpc.id(), rv.error()}; + + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:<} body: {{retval: {}, status: {}}}", rpc, + resp.error_code(), resp.value_or_none()); + req.respond(resp); +} + /* Scheduling is done each 0.5 s*/ void rpc_server::scheduler_update() { std::vector> return_set; const auto threshold = 0.1f; while(!m_shutting_down) { - thallium::thread::self().sleep(m_network_engine, 500); + thallium::thread::self().sleep(m_network_engine, 1000); m_transfer_manager.lock(); const auto transfer = m_transfer_manager.transfer(); std::vector v_ids; diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index e4257081fa93f170927ccc5f9a3a6a8418620cff..33f4d01b86245c09da090f2b774cd73365b06ea0 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -109,6 +109,10 @@ private: const std::vector& targets, const std::vector& limits, enum scord::transfer::mapping mapping); + + void + query_transfer(const network::request& req, scord::job_id job_id, + scord::transfer_id transfer_id); job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager;