From f462bf65d333734856b597133fb8e9803d61217d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 07:31:20 +0100 Subject: [PATCH 01/10] Add query_transfer function to scord library Add ADM_transfer_status_create and ADM_transfer_status_destroy functions to types.c --- cli/scord_adhoc.cpp | 147 ++++++++++++++++++++-------- plugins/adhoc_services.d/gekkofs.sh | 26 +++++ src/lib/detail/impl.cpp | 37 +++++++ src/lib/detail/impl.hpp | 4 + src/lib/libscord.cpp | 12 +++ src/lib/scord/scord.hpp | 3 + src/lib/scord/types.hpp | 69 +++++++++++++ src/lib/types.c | 48 +++++++++ src/lib/types.cpp | 95 ++++++++++++++++++ src/lib/types_private.h | 3 + src/scord/rpc_server.cpp | 70 +++++++++++++ src/scord/rpc_server.hpp | 4 + 12 files changed, 475 insertions(+), 43 deletions(-) diff --git a/cli/scord_adhoc.cpp b/cli/scord_adhoc.cpp index 8e05de3a..ebefc670 100644 --- a/cli/scord_adhoc.cpp +++ b/cli/scord_adhoc.cpp @@ -34,12 +34,14 @@ struct query_config { std::string controller_address; std::string stager_address; std::uint32_t slurm_id{}; + std::uint32_t job_id{}; std::uint32_t adhocid{}; std::string nodes; std::string adhocfs; std::string inputs; std::string outputs; std::string function; + std::uint32_t qos{}; }; @@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) { ->option_text("CARGOADDRESS") ->required(); app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required(); + app.add_option("--job_id", cfg.job_id, "Job ID (for subsequent ops)"); app.add_option("-n,--nodes", cfg.nodes, "Nodes"); @@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) { app.add_option("-o,--outputs", cfg.outputs, "Output dataset"); + app.add_option("-q,--qos", cfg.qos, "QoS MB/s"); + app.add_option("-f,--function", cfg.function, "Function {create, stage-in, stage-out, destroy}") ->required(); @@ -113,6 +118,21 @@ parse_address(const std::string& address) { return std::make_pair(protocol, address); } +auto +create_adhoc_type_from_name(const std::string& name) { + if(name == "gekkofs") { + return scord::adhoc_storage::type::gekkofs; + } else if(name == "hercules") { + return scord::adhoc_storage::type::hercules; + } else if(name == "expand") { + return scord::adhoc_storage::type::expand; + } else if(name == "dataclay") { + return scord::adhoc_storage::type::dataclay; + } else { + throw std::runtime_error( + fmt::format("Invalid adhoc fs type: {}", name)); + } +} int main(int argc, char* argv[]) { @@ -141,24 +161,12 @@ main(int argc, char* argv[]) { scord::job::resources job_resources(nodes); // Step 1b : Define adhoc_storage - /* type needs to process the sctring in cfg.adhocfs */ - auto type = scord::adhoc_storage::type::gekkofs; - if(cfg.adhocfs == "gekkofs") { - type = scord::adhoc_storage::type::gekkofs; - } else if(cfg.adhocfs == "hercules") { - type = scord::adhoc_storage::type::hercules; - } else if(cfg.adhocfs == "expand") { - type = scord::adhoc_storage::type::expand; - } else { - throw std::runtime_error( - fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs)); - } + auto type = create_adhoc_type_from_name(cfg.adhocfs); std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id); scord::adhoc_storage::resources resources{nodes}; - scord::adhoc_storage::ctx ctx{ cfg.controller_address, cfg.stager_address, @@ -167,33 +175,11 @@ main(int argc, char* argv[]) { 100, false}; - scord::adhoc_storage adhoc_storage = register_adhoc_storage( srv, adhoc_name, type, ctx, resources); - - fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id()); - auto path = deploy_adhoc_storage(srv, adhoc_storage); - - - // Step 1c : Define job_requirements - /* - - job::requirements::requirements( - std::vector inputs, - std::vector outputs, - std::vector expected_outputs) - : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)), - m_expected_outputs(std::move(expected_outputs)) {} - - job::requirements::requirements( - std::vector inputs, - std::vector outputs, - std::vector expected_outputs, - scord::adhoc_storage adhoc_storage) - */ - + fmt::print("{}\n", path); /* Separate inputs into vector of inputs */ @@ -226,20 +212,95 @@ main(int argc, char* argv[]) { auto job = scord::register_job(srv, job_resources, job_requirements, cfg.slurm_id); + return job.id(); + } - // Now Tranfer Datasets + if(cfg.function == "stage-in") { + // As the job is registered, we can now transfer datasets. Get + // inputs from requirements (TODO) + // Step 2 : If function is stage-in, transfer datasets // convert inputs to split inputs (src, dst) std::vector inputs_src, inputs_dst; - for(auto& route : inputs) { - inputs_src.push_back(route.source()); - inputs_dst.push_back(route.destination()); + auto v_routes_in = split(cfg.inputs, ';'); + + for(auto& src_dst : v_routes_in) { + auto route = split(src_dst, ','); + + inputs_src.push_back(scord::dataset{route[0]}); + inputs_dst.push_back(scord::dataset{route[1]}); + } + + scord::job job(cfg.job_id, cfg.slurm_id); + std::vector v_qos; + if(cfg.qos) { + scord::qos::limit limit{scord::qos::subclass::bandwidth, + cfg.qos}; + v_qos.push_back(limit); } - scord::transfer_datasets(srv, job, inputs_src, inputs_dst, - std::vector{}, - scord::transfer::mapping::n_to_n); + auto transfer = scord::transfer_datasets( + srv, job, inputs_src, inputs_dst, v_qos, + scord::transfer::mapping::n_to_n); + return transfer.id(); + } + + if(cfg.function == "wait") { + // Wait for transfer operation to finish + const scord::transfer transfer = scord::transfer{cfg.slurm_id}; + scord::job job(cfg.job_id, cfg.slurm_id); + + auto response = scord::query_transfer(srv, job, transfer); + + while(response.status() != scord::transfer_state::type::finished) { + std::this_thread::sleep_for(1s); + response = scord::query_transfer(srv, job, transfer); + if(scord::transfer_state::type::failed == response.status()) { + fmt::print("Transfer failed\n"); + return EXIT_FAILURE; + } + } + } + + if(cfg.function == "stage-out") { + // Step 3 : If function is stage-out, transfer datasets + // convert inputs to split inputs (src, dst) + std::vector outputs_src, outputs_dst; + + auto v_routes_out = split(cfg.outputs, ';'); + + for(auto& src_dst : v_routes_out) { + auto route = split(src_dst, ','); + + outputs_src.push_back(scord::dataset{route[0]}); + outputs_dst.push_back(scord::dataset{route[1]}); + } + + scord::job job(cfg.job_id, cfg.slurm_id); + std::vector v_qos; + if(cfg.qos) { + scord::qos::limit limit{scord::qos::subclass::bandwidth, + cfg.qos}; + v_qos.push_back(limit); + } + + auto transfer = scord::transfer_datasets( + srv, job, outputs_src, outputs_dst, v_qos, + scord::transfer::mapping::n_to_n); + return transfer.id(); + } + + if(cfg.function == "destroy") { + // Step 4 : If function is destroy, terminate adhoc fs server + // Build a scord::adhoc_storage object with the adhocid + auto type = create_adhoc_type_from_name(cfg.adhocfs); + scord::adhoc_storage::resources resources; + scord::adhoc_storage::ctx ctx; + + scord::adhoc_storage adhoc_storage(type, "", cfg.adhocid, ctx, + resources); + terminate_adhoc_storage(srv, adhoc_storage); } } catch(const std::exception& ex) { diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index 6544724e..a48b8aa1 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -1,3 +1,29 @@ #!/usr/bin/env bash echo "GEKKOFS Script Called" + +# example of a script that can be called by the adhoc service +# [2023-11-23 09:37:32.583868] [scord-ctl] [2199567] [info] rpc => id: 0 name: "ADM_deploy_adhoc_storage" from: "ofi+tcp;ofi_rxm://127.0.0.1:52000" body: {uuid: "gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5", type: ADM_ADHOC_STORAGE_GEKKOFS, resources: {nodes: [{hostname: "broadwell-001", type: regular}, {hostname: "broadwell-002", type: regular}, {hostname: "broadwell-003", type: regular}, {hostname: "broadwell-004", type: regular}]}} +# option: start --hosts "broadwell-001,broadwell-002,broadwell-003,broadwell-004" --workdir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5 --datadir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5/data --mountdir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5/mnt + +# code to count the number of elements in a comma separated list environment variable (called $nodes) and store in $num_nodes + +if ($1 == "start") then + echo "Starting GEKKOFS" + $nodes = $3 + num_nodes=$(echo $nodes | awk -F, '{print NF-1}') + + $workdir = $5 + $datadir = $7 + $mountdir = $9 + + mkdir -p $5 + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & +else if ($1 == "stop") then + echo "Stopping GEKKOFS" + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 --export=ALL bash -c "pkill -9 gkfs_daemon" +else + echo "Unknown command" + exit 1 + + exit 0 diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 8d26b4ee..db8b5844 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -531,4 +531,41 @@ transfer_datasets(const server& srv, const job& job, } +tl::expected +query_transfer(const server& srv, const job& job, const transfer& transfer) { + + using response_type = network::response_with_value; + + network::client rpc_client{srv.protocol()}; + + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); + + if(const auto lookup_rv = rpc_client.lookup(srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{job_id: {}, tx_id: {}}}", rpc, job.id(), transfer.id()); + + if(const auto call_rv = endp.call(rpc.name(), job.id(), transfer.id()); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + + LOGGER_EVAL( + resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}, tx_state: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); + + if(!resp.error_code()) { + return tl::make_unexpected(resp.error_code()); + } + + return resp.value(); + } + } + + LOGGER_ERROR("rpc call failed"); + return tl::make_unexpected(scord::error_code::other); +} + } // namespace scord::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index c1c5f054..e025e630 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +tl::expected +query_transfer(const server& srv, const job& job, const transfer& transfer); + + } // namespace scord::detail diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index 19e3d58e..d71a3a6f 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -377,6 +377,18 @@ transfer_datasets(const server& srv, const job& job, return rv.value(); } +scord::transfer_state +query_transfer(const server& srv, const job& job, const transfer& transfer) { + + const auto rv = detail::query_transfer(srv, job, transfer); + + if(!rv) { + throw std::runtime_error(fmt::format( + "ADM_query_transfer() error: {}", ADM_strerror(rv.error()))); + } + + return rv.value(); +} ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index 045bdf28..6c35bde4 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -98,6 +98,9 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +scord::transfer_state +query_transfer(const server& srv, const job& job, const transfer& transfer); + void transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 6ec23f13..009240b6 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -561,6 +561,43 @@ private: std::unique_ptr m_pimpl; }; +struct transfer_state { + +enum class type : std::underlying_type::type { + queued = ADM_TRANSFER_QUEUED, + running = ADM_TRANSFER_RUNNING, + finished = ADM_TRANSFER_FINISHED, + failed = ADM_TRANSFER_FAILED, + cancelled = ADM_TRANSFER_CANCELLED + }; + transfer_state(); + explicit transfer_state(transfer_state::type type); + explicit transfer_state(ADM_transfer_status_t adm_transfer_state); + explicit operator ADM_transfer_status_t() const; + + transfer_state(const transfer_state&) noexcept; + transfer_state(transfer_state&&) noexcept; + transfer_state& + operator=(const transfer_state&) noexcept; + transfer_state& + operator=(transfer_state&&) noexcept; + + ~transfer_state(); + + transfer_state::type + status() const; + + // The implementation for this must be deferred until + // after the declaration of the PIMPL class + template + void + serialize(Archive& ar); + +private: + class impl; + std::unique_ptr m_pimpl; +}; + namespace qos { enum class subclass : std::underlying_type::type { @@ -1240,4 +1277,36 @@ struct fmt::formatter> } }; +template <> +struct fmt::formatter : fmt::formatter { + // parse is inherited from formatter. + template + auto + format(const scord::transfer_state& t, FormatContext& ctx) const -> format_context::iterator { + + using scord::node; + std::string_view name = "unknown"; + + switch(t.status()){ + case scord::transfer_state::type::queued: + name = "queued"; + break; + case scord::transfer_state::type::running: + name = "running"; + break; + case scord::transfer_state::type::finished: + name = "finished"; + break; + case scord::transfer_state::type::failed: + name = "failed"; + break; + case scord::transfer_state::type::cancelled: + name = "cancelled"; + break; + } + + return formatter::format(name, ctx); + } +}; + #endif // SCORD_TYPES_HPP diff --git a/src/lib/types.c b/src/lib/types.c index 1e426909..826df7e5 100644 --- a/src/lib/types.c +++ b/src/lib/types.c @@ -1144,6 +1144,54 @@ ADM_transfer_destroy(ADM_transfer_t tx) { return ret; } +/** + * Initialize a transfer state + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param [in] type A Transfer status + * @return A valid TRANSFER STATUS or NULL in case of failure. + */ +ADM_transfer_status_t +ADM_transfer_status_create(ADM_transfer_state_t type) { + + struct adm_transfer_status* adm_transfer_state = + (struct adm_transfer_status*) malloc(sizeof(struct adm_transfer_status)); + + if(!adm_transfer_state) { + LOGGER_ERROR("Could not allocate ADM_transfer_t"); + return NULL; + } + + adm_transfer_state->s_state = type; + + return adm_transfer_state; +} + + +/** + * Destroy a ADM_transfer_status_t created by ADM_transfer_status_create(). + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param[in] tx The ADM_transfer_status_t to destroy. + * @return ADM_SUCCESS or corresponding error code. + */ +ADM_return_t +ADM_transfer_status_destroy(ADM_transfer_status_t tx) { + ADM_return_t ret = ADM_SUCCESS; + + if(!tx) { + LOGGER_ERROR("Invalid ADM_transfer_status_t"); + return ADM_EBADARGS; + } + + free(tx); + return ret; +} + ADM_qos_limit_list_t ADM_qos_limit_list_create(ADM_qos_limit_t limits[], size_t length) { diff --git a/src/lib/types.cpp b/src/lib/types.cpp index a889d336..c07b9ef9 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -462,6 +462,100 @@ template void transfer::serialize( network::serialization::input_archive&); + +class transfer_state::impl { + +public: + impl() = default; + explicit impl(transfer_state::type type) : m_type(type) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + transfer_state::type + status() const { + return m_type; + } + + template + void + load(Archive& ar) { + ar(SCORD_SERIALIZATION_NVP(m_type)); + } + + template + void + save(Archive& ar) const { + ar(SCORD_SERIALIZATION_NVP(m_type)); + } + +private: + transfer_state::type m_type; +}; + +transfer_state::transfer_state() = default; + +transfer_state::transfer_state(transfer_state::type type) + : m_pimpl(std::make_unique(type)) {} + +transfer_state::transfer_state(ADM_transfer_status_t adm_transfer_state) + : transfer_state::transfer_state(static_cast(adm_transfer_state->s_state)) {} + +transfer_state::operator ADM_transfer_status_t() const { + return ADM_transfer_status_create((ADM_transfer_state_t)m_pimpl->status()); +} + +transfer_state::transfer_state(transfer_state&&) noexcept = default; + +transfer_state& +transfer_state::operator=(transfer_state&&) noexcept = default; + +transfer_state::transfer_state(const transfer_state& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +transfer_state& +transfer_state::operator=(const transfer_state& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +transfer_state::~transfer_state() = default; + +transfer_state::type +transfer_state::status() const { + return m_pimpl->status(); +} + +// since the PIMPL class is fully defined at this point, we can now +// define the serialization function +template +inline void +transfer_state::serialize(Archive& ar) { + ar(SCORD_SERIALIZATION_NVP(m_pimpl)); +} + +// we must also explicitly instantiate our template functions for +// serialization in the desired archives +template void +transfer_state::impl::save( + network::serialization::output_archive&) const; + +template void +transfer_state::impl::load( + network::serialization::input_archive&); + +template void +transfer_state::serialize( + network::serialization::output_archive&); + +template void +transfer_state::serialize( + network::serialization::input_archive&); + class dataset::impl { public: impl() = default; @@ -1214,6 +1308,7 @@ entity::data() const { return m_pimpl->data(); } + // since the PIMPL class is fully defined at this point, we can now // define the serialization function template diff --git a/src/lib/types_private.h b/src/lib/types_private.h index 3e0ba099..834c5875 100644 --- a/src/lib/types_private.h +++ b/src/lib/types_private.h @@ -183,6 +183,9 @@ ADM_job_create(uint64_t id, uint64_t slurm_id); ADM_transfer_t ADM_transfer_create(uint64_t id); +ADM_transfer_status_t +ADM_transfer_status_create(ADM_transfer_state_t type); + #ifdef __cplusplus } // extern "C" #endif diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 10bb1e41..194f937a 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -97,6 +97,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(update_pfs_storage)); provider::define(EXPAND(remove_pfs_storage)); provider::define(EXPAND(transfer_datasets)); + provider::define(EXPAND(query_transfer)); #undef EXPAND m_network_engine.push_prefinalize_callback([this]() { @@ -875,6 +876,75 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, } +void +rpc_server::query_transfer(const network::request& req, scord::job_id job_id, scord::transfer_id tx_id) { + + using network::get_address; + using network::response_with_value; + using network::rpc_info; + using response_with_status = response_with_value; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{job_id: {}, tx_id{}}}", + rpc, job_id, tx_id); + + const auto jm_result = m_job_manager.find(job_id); + + if(!jm_result) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error finding job: {}\"", + rpc.id(), job_id); + const auto resp = response_with_status{rpc.id(), jm_result.error()}; + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + req.respond(resp); + return; + } + + const auto& job_metadata_ptr = jm_result.value(); + + if(!job_metadata_ptr->adhoc_storage_metadata()) { + LOGGER_ERROR("rpc id: {} error_msg: \"Job has no adhoc storage\"", + rpc.id(), job_id); + const auto resp = response_with_status{rpc.id(), error_code::no_resources}; + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + req.respond(resp); + return; + } + + const auto data_stager_address = + job_metadata_ptr->adhoc_storage_metadata()->data_stager_address(); + + // Transform the `scord::dataset`s into `cargo::dataset`s and contact the + // Cargo service associated with the job's adhoc storage instance to + // execute the transfers. + cargo::server srv{data_stager_address}; + + // Register the transfer into the `tranfer_manager`. + // We embed the generated `cargo::transfer` object into + // scord's `transfer_metadata` so that we can later query the Cargo + // service for the transfer's status. + const auto rv = + m_transfer_manager.find(tx_id) + .or_else([&](auto&& ec) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error finding " + "transfer: {}\"", + rpc.id(), ec); + }) + .and_then([&](auto&& transfer_metadata_ptr) + -> tl::expected { + return scord::transfer_state(static_cast(transfer_metadata_ptr->transfer().status().state())); + }); + + const auto resp = + rv ? response_with_status{rpc.id(), error_code::success, rv.value()} + : response_with_status{rpc.id(), rv.error()}; + + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:<} body: {{retval: {}, status: {}}}", rpc, + resp.error_code(), resp.value_or_none()); + req.respond(resp); +} + /* Scheduling is done each 0.5 s*/ void rpc_server::scheduler_update() { diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index e4257081..33f4d01b 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -109,6 +109,10 @@ private: const std::vector& targets, const std::vector& limits, enum scord::transfer::mapping mapping); + + void + query_transfer(const network::request& req, scord::job_id job_id, + scord::transfer_id transfer_id); job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; -- GitLab From 89b4bd221f00feded314e6d834267c11bb21ee79 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 07:45:10 +0100 Subject: [PATCH 02/10] Fix issues with GEKKOFS and add new version of Scord --- plugins/adhoc_services.d/gekkofs.sh | 5 ++++- spack/packages/scord/package.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index a48b8aa1..e1011f1b 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -11,7 +11,10 @@ if ($1 == "start") then echo "Starting GEKKOFS" $nodes = $3 num_nodes=$(echo $nodes | awk -F, '{print NF-1}') - + # If num_nodes is >40, we are on the testing environment + if ($num_nodes > 40) then + exit 0 + end $workdir = $5 $datadir = $7 $mountdir = $9 diff --git a/spack/packages/scord/package.py b/spack/packages/scord/package.py index a8f045e8..09f2964a 100644 --- a/spack/packages/scord/package.py +++ b/spack/packages/scord/package.py @@ -56,7 +56,7 @@ class Scord(CMakePackage): sha256="74c51915315e01d8479701d340331641f42c5f5cfae0c08bdea6c2f0b01da665") version("0.3.3", sha256="a8b5a8d05858bee91b9675ca6c929f4c16b5b2562f4e6a8dba3ce0aacb721f48") - + version("0.3.4", branch="rnou/adhoc-integration") # build variants variant('build_type', default='Release', -- GitLab From 2c7f8b8a5b4eab9d4f3c77fa54c8cd7dad6d8d45 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 07:48:13 +0100 Subject: [PATCH 03/10] Fix branch name in Scord package.py --- spack/packages/scord/package.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spack/packages/scord/package.py b/spack/packages/scord/package.py index 09f2964a..59c50bfa 100644 --- a/spack/packages/scord/package.py +++ b/spack/packages/scord/package.py @@ -56,7 +56,7 @@ class Scord(CMakePackage): sha256="74c51915315e01d8479701d340331641f42c5f5cfae0c08bdea6c2f0b01da665") version("0.3.3", sha256="a8b5a8d05858bee91b9675ca6c929f4c16b5b2562f4e6a8dba3ce0aacb721f48") - version("0.3.4", branch="rnou/adhoc-integration") + version("0.3.4", branch="rnou/adhoc_integration") # build variants variant('build_type', default='Release', -- GitLab From bd93f8a9420a14038b74f4e821836ae178906d02 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 11:20:25 +0100 Subject: [PATCH 04/10] Fix GEKKOFS script and join environments --- plugins/adhoc_services.d/gekkofs.sh | 42 +++++++++++++++-------------- src/scord-ctl/command.cpp | 37 ++++++++++++++++++++++++- src/scord-ctl/rpc_server.cpp | 6 +++++ 3 files changed, 64 insertions(+), 21 deletions(-) diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index e1011f1b..b7171075 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -1,32 +1,34 @@ -#!/usr/bin/env bash -echo "GEKKOFS Script Called" +#!/usr/bin/bash +echo "GEKKOFS Script Called" $HOSTNAME $SLURM_JOBID -# example of a script that can be called by the adhoc service -# [2023-11-23 09:37:32.583868] [scord-ctl] [2199567] [info] rpc => id: 0 name: "ADM_deploy_adhoc_storage" from: "ofi+tcp;ofi_rxm://127.0.0.1:52000" body: {uuid: "gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5", type: ADM_ADHOC_STORAGE_GEKKOFS, resources: {nodes: [{hostname: "broadwell-001", type: regular}, {hostname: "broadwell-002", type: regular}, {hostname: "broadwell-003", type: regular}, {hostname: "broadwell-004", type: regular}]}} -# option: start --hosts "broadwell-001,broadwell-002,broadwell-003,broadwell-004" --workdir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5 --datadir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5/data --mountdir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5/mnt -# code to count the number of elements in a comma separated list environment variable (called $nodes) and store in $num_nodes - -if ($1 == "start") then +if [ "$1" == "start" ]; then echo "Starting GEKKOFS" - $nodes = $3 + . /beegfs/home/r.nou/spack/share/spack/setup-env.sh + spack load gekkofs + spack load slurm@23.02.6 + nodes=$3 num_nodes=$(echo $nodes | awk -F, '{print NF-1}') - # If num_nodes is >40, we are on the testing environment - if ($num_nodes > 40) then + # If num_nodes is greater than 40, we are on the testing environment + if [ $num_nodes -gt 40 ]; then exit 0 - end - $workdir = $5 - $datadir = $7 - $mountdir = $9 + fi + workdir=$5 + datadir=$7 + mountdir=$9 - mkdir -p $5 - srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & -else if ($1 == "stop") then + mkdir -p $workdir + /opt/slurm/bin/srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & + sleep 2 +elif [ "$1" == "stop" ]; then echo "Stopping GEKKOFS" - srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 --export=ALL bash -c "pkill -9 gkfs_daemon" + . /beegfs/home/r.nou/spack/share/spack/setup-env.sh + spack load gekkofs + spack load slurm@23.02.6 + /opt/slurm/bin/srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 pkill -9 gkfs_daemon else echo "Unknown command" exit 1 - +fi exit 0 diff --git a/src/scord-ctl/command.cpp b/src/scord-ctl/command.cpp index c45b5b5c..387778c9 100644 --- a/src/scord-ctl/command.cpp +++ b/src/scord-ctl/command.cpp @@ -195,6 +195,38 @@ command::as_vector() const { return tmp; } +// Function to join two sets of environment variables +char** +joinEnvironments(char** env1, const char** env2) { + // Count the number of variables in each environment + int count1 = 0; + while(env1[count1] != nullptr) { + ++count1; + } + + int count2 = 0; + while(env2[count2] != nullptr) { + ++count2; + } + + // Allocate memory for the combined environment + char** combinedEnv = new char*[count1 + count2 + 1]; + + // Copy the variables from the first environment + for(int i = 0; i < count1; ++i) { + combinedEnv[i] = strdup(env1[i]); + } + + // Copy the variables from the second environment + for(int i = 0; i < count2; ++i) { + combinedEnv[count1 + i] = strdup(env2[i]); + } + + // Null-terminate the combined environment + combinedEnv[count1 + count2] = nullptr; + + return combinedEnv; +} void command::exec() const { @@ -207,8 +239,11 @@ command::exec() const { switch(const auto pid = ::fork()) { case 0: { + + // Join the environments + char** combinedEnv = joinEnvironments(environ, envp.get()); ::execvpe(argv[0], const_cast(argv.get()), - const_cast(envp.get())); + const_cast(combinedEnv)); // We cannot use the default logger in the child process because it // is not fork-safe, and even though we received a copy of the // global logger, it is not valid because the child process does diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index 13e6a938..19fff748 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -28,6 +28,7 @@ #include #include "rpc_server.hpp" +extern char** environ; using namespace std::literals; @@ -142,6 +143,9 @@ rpc_server::deploy_adhoc_storage( const auto& adhoc_cfg = it->second; LOGGER_DEBUG("deploy \"{:e}\" (ID: {})", adhoc_type, adhoc_uuid); + for (int i = 0; environ[i] != nullptr; ++i) { + std::cout << environ[i] << std::endl; + } // 1. Create a working directory for the adhoc storage instance adhoc_dir = adhoc_cfg.working_directory() / adhoc_uuid; @@ -172,6 +176,8 @@ rpc_server::deploy_adhoc_storage( const auto cmd = adhoc_cfg.startup_command().eval( adhoc_uuid, *adhoc_dir, hostnames); + // Fill environment + // 4. Execute the startup command try { -- GitLab From feaf302147fb965047dbd617e1e41e5487cf10e2 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 12:27:51 +0100 Subject: [PATCH 05/10] Fix GEKKOFS start and stop commands --- plugins/adhoc_services.d/gekkofs.sh | 26 +++++++++++--------------- src/scord-ctl/rpc_server.cpp | 5 +---- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index b7171075..7da0cceb 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -4,11 +4,9 @@ echo "GEKKOFS Script Called" $HOSTNAME $SLURM_JOBID if [ "$1" == "start" ]; then echo "Starting GEKKOFS" - . /beegfs/home/r.nou/spack/share/spack/setup-env.sh - spack load gekkofs - spack load slurm@23.02.6 + nodes=$3 - num_nodes=$(echo $nodes | awk -F, '{print NF-1}') + num_nodes=$(echo $nodes | awk -F, '{print NF}') # If num_nodes is greater than 40, we are on the testing environment if [ $num_nodes -gt 40 ]; then exit 0 @@ -16,19 +14,17 @@ if [ "$1" == "start" ]; then workdir=$5 datadir=$7 mountdir=$9 - - mkdir -p $workdir - /opt/slurm/bin/srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & - sleep 2 + unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE + + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & + sleep 4 elif [ "$1" == "stop" ]; then echo "Stopping GEKKOFS" - . /beegfs/home/r.nou/spack/share/spack/setup-env.sh - spack load gekkofs - spack load slurm@23.02.6 - /opt/slurm/bin/srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 pkill -9 gkfs_daemon -else - echo "Unknown command" - exit 1 + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 pkill -9 gkfs_daemon +elif [ "$1" == "expand" ]; then + echo "Expand command" +elif [ "$1" == "shrink" ]; then + echo "shrink command" fi exit 0 diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index 19fff748..c3dad566 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -143,10 +143,7 @@ rpc_server::deploy_adhoc_storage( const auto& adhoc_cfg = it->second; LOGGER_DEBUG("deploy \"{:e}\" (ID: {})", adhoc_type, adhoc_uuid); - for (int i = 0; environ[i] != nullptr; ++i) { - std::cout << environ[i] << std::endl; - } - + // 1. Create a working directory for the adhoc storage instance adhoc_dir = adhoc_cfg.working_directory() / adhoc_uuid; -- GitLab From 89742884ff1b43fb8cd7b61d2943a68220d72dc5 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 19:05:15 +0100 Subject: [PATCH 06/10] Fix dependencies and add missing command in gekkofs.sh --- plugins/adhoc_services.d/gekkofs.sh | 2 +- spack/packages/scord/package.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index 7da0cceb..76c4cf81 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -15,7 +15,7 @@ if [ "$1" == "start" ]; then datadir=$7 mountdir=$9 unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE - + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "mkdir -p $mountdir; mkdir -p $datadir" srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & sleep 4 elif [ "$1" == "stop" ]; then diff --git a/spack/packages/scord/package.py b/spack/packages/scord/package.py index 59c50bfa..0f40b000 100644 --- a/spack/packages/scord/package.py +++ b/spack/packages/scord/package.py @@ -92,6 +92,7 @@ class Scord(CMakePackage): depends_on("boost@1.71 +program_options", when='@0.2.0:') depends_on("redis-plus-plus@1.3.3:", when='@0.2.0:') depends_on("cargo@0.3.3:", when='@0.3.1:') + depends_on("cargo@0.3.4:", when='@0.3.4:') depends_on("slurm", when='@0.3.1:') -- GitLab From f680cd41448dfaf3d117bd6a301ab3bf41d13efb Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 19:30:16 +0100 Subject: [PATCH 07/10] Update sleep duration in rpc_server::scheduler_update() function --- cli/scord_adhoc.cpp | 2 +- src/scord/rpc_server.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/scord_adhoc.cpp b/cli/scord_adhoc.cpp index ebefc670..5a12c826 100644 --- a/cli/scord_adhoc.cpp +++ b/cli/scord_adhoc.cpp @@ -179,7 +179,7 @@ main(int argc, char* argv[]) { srv, adhoc_name, type, ctx, resources); auto path = deploy_adhoc_storage(srv, adhoc_storage); - fmt::print("{}\n", path); + fmt::print("{},{}\n", path, adhoc_storage.id()); /* Separate inputs into vector of inputs */ diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 194f937a..d706b334 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -951,7 +951,7 @@ rpc_server::scheduler_update() { std::vector> return_set; const auto threshold = 0.1f; while(!m_shutting_down) { - thallium::thread::self().sleep(m_network_engine, 500); + thallium::thread::self().sleep(m_network_engine, 1000); m_transfer_manager.lock(); const auto transfer = m_transfer_manager.transfer(); std::vector v_ids; -- GitLab From 5ddcf94c15478820b7fa33b8db04e8c1f5eb310d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 19:40:40 +0100 Subject: [PATCH 08/10] Fix GEKKOFS stop command for testing environment --- plugins/adhoc_services.d/gekkofs.sh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index 76c4cf81..35de9cef 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -20,6 +20,13 @@ if [ "$1" == "start" ]; then sleep 4 elif [ "$1" == "stop" ]; then echo "Stopping GEKKOFS" + + nodes=$3 + num_nodes=$(echo $nodes | awk -F, '{print NF}') + # If num_nodes is greater than 40, we are on the testing environment + if [ $num_nodes -gt 40 ]; then + exit 0 + fi srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 pkill -9 gkfs_daemon elif [ "$1" == "expand" ]; then echo "Expand command" -- GitLab From 01395f8f924a5b9fb38e59224ff68f15d56280fd Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 28 Nov 2023 12:33:49 +0100 Subject: [PATCH 09/10] Update CPU configuration for gkfs_daemon --- plugins/adhoc_services.d/gekkofs.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index 35de9cef..be65b8f2 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -16,7 +16,7 @@ if [ "$1" == "start" ]; then mountdir=$9 unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "mkdir -p $mountdir; mkdir -p $datadir" - srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=4 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & sleep 4 elif [ "$1" == "stop" ]; then echo "Stopping GEKKOFS" @@ -27,7 +27,8 @@ elif [ "$1" == "stop" ]; then if [ $num_nodes -gt 40 ]; then exit 0 fi - srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 pkill -9 gkfs_daemon + unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE + srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 pkill -9 gkfs_daemon elif [ "$1" == "expand" ]; then echo "Expand command" elif [ "$1" == "shrink" ]; then -- GitLab From 2ee85b08df317f9153336d2eeb4ae56d046fd4b9 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 11 Jan 2024 12:40:32 +0100 Subject: [PATCH 10/10] Updated CMakeList Version --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f46b1939..4cd6304d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( scord - VERSION 0.3.3 + VERSION 0.3.4 LANGUAGES C CXX ) -- GitLab