diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 78707baecbb6d72d0893180247fad841b9831b68..3460edaaff3bab01fd1603a4dbdb802e6609d7b6 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,6 +1,6 @@ # Compilation of scord and execution of tests -image: bscstorage/scord:0.3.0 +image: bscstorage/scord:0.4.0-wip stages: - build diff --git a/CMakeLists.txt b/CMakeLists.txt index 0381582f7b609395896e9c0a9481d4f605bf5945..e6ac925995f340028a17600d8414b635b1ff8909 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -157,6 +157,12 @@ set(SCORD_CTL_BIND_PORT ) message(STATUS "[${PROJECT_NAME}] server bind port: ${SCORD_CTL_BIND_PORT}") +set(CARGO_PORT + "62000" + CACHE STRING + "Define the port through wich we should commmunicate with Cargo" + ) + option(SCORD_BUILD_EXAMPLES "Build examples (disabled by default)" OFF) option(SCORD_BUILD_TESTS "Build tests (disabled by default)" OFF) @@ -299,6 +305,10 @@ mark_variables_as_advanced(REGEX "^(FETCHCONTENT|fmt|FMT|spdlog|SPDLOG)_.*$") message(STATUS "[${PROJECT_NAME}] Checking for Redis Plus Plus") find_package(RedisPlusPlus 1.3.3 REQUIRED) +### Cargo: required for transferring datasets between storage tiers +message(STATUS "[${PROJECT_NAME}] Checking for Cargo") +find_package(Cargo 0.2.0 REQUIRED) + # ############################################################################## # Process subdirectories @@ -309,6 +319,7 @@ add_compile_options("-Wall" "-Wextra" "-Werror" "$<$:-O3>") add_compile_definitions("$<$:SCORD_DEBUG_BUILD>") add_compile_definitions("$<$:LOGGER_ENABLE_DEBUG>") +add_subdirectory(cli) add_subdirectory(etc) add_subdirectory(src) add_subdirectory(plugins) diff --git a/README.md b/README.md index fbd1cee4e8ed917aec61849291d6eb3aafb5fb85..fe4269c72b04d87560b9d6c0b2605d0c82783ad1 100644 --- a/README.md +++ b/README.md @@ -295,12 +295,12 @@ Which should produce output similar to the following: [2021-11-19 10:30:30.066151] [scord] [131119] [info] [2021-11-19 10:30:30.066161] [scord] [131119] [info] [[ Start up successful, awaiting requests... ]] ``` -Now we can use one of the example programs to send a `ping` RPC to Scord: -```bash +Now we can use the `scord_ping` CLI program packaged with the service to +send a `ping` RPC to Scord: -cd $HOME/scord/build/examples -./ADM_ping ofi+tcp://192.168.0.111:52000 +```bash +scord_ping ofi+tcp://192.168.0.111:52000 ``` And the server logs should update with an entry similar the following one: diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..39ea9270ccc82f621fce58e247e17b7e0afccba1 --- /dev/null +++ b/cli/CMakeLists.txt @@ -0,0 +1,58 @@ +################################################################################ +# Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +# scord_ping: ping a remote scord server +add_executable(scord_ping) + +target_sources(scord_ping + PRIVATE + scord_ping.cpp +) + +target_link_libraries(scord_ping + PUBLIC fmt::fmt CLI11::CLI11 libscord) + +install(TARGETS scord_ping + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} +) + +# scord_query: query a remote scord server +add_executable(scord_query) + +target_sources(scord_query + PRIVATE + scord_query.cpp +) + +target_link_libraries(scord_query + PUBLIC fmt::fmt CLI11::CLI11 libscord) + +install(TARGETS scord_query + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} +) + +set_target_properties( + scord_query PROPERTIES __INSTALLED_PATH + ${CMAKE_INSTALL_FULL_BINDIR}/scord_query +) diff --git a/cli/scord_ping.cpp b/cli/scord_ping.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a3961218e7bb32948ec7086457bbd1a1800619b8 --- /dev/null +++ b/cli/scord_ping.cpp @@ -0,0 +1,83 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include + +struct ping_config { + std::string progname; + std::string server_address; +}; + +ping_config +parse_command_line(int argc, char* argv[]) { + + ping_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Scord ping client", cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, "Server address") + ->option_text("ADDRESS") + ->required(); + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + + +int +main(int argc, char* argv[]) { + + using namespace std::chrono_literals; + + ping_config cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + ping(scord::server{protocol, address}); + fmt::print("Ping succeeded!\n"); + } catch(const std::exception& ex) { + fmt::print(stderr, "Ping failed: {}\n", ex.what()); + return EXIT_FAILURE; + } +} diff --git a/cli/scord_query.cpp b/cli/scord_query.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d0b9b44f64e1ebe95462f89c2d10096900777d9e --- /dev/null +++ b/cli/scord_query.cpp @@ -0,0 +1,95 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include + +struct query_config { + std::string progname; + std::string server_address; + std::uint32_t job_id{}; +}; + +query_config +parse_command_line(int argc, char* argv[]) { + + query_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Scord metainfo client", cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, "Server address") + ->option_text("ADDRESS") + ->required(); + app.add_option("job_id", cfg.job_id, "Job ID")->required(); + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + + +int +main(int argc, char* argv[]) { + + using namespace std::chrono_literals; + + query_config cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + + scord::server srv{protocol, address}; + + scord::job_info info = + scord::query(scord::server{protocol, address}, cfg.job_id); + + fmt::print(stdout, + "Job metadata:\n" + " adhoc_controller: {}\n" + " io_procs: {}\n", + info.adhoc_controller_address(), info.io_procs()); + + } catch(const std::exception& ex) { + fmt::print(stderr, "Error: {}\n", ex.what()); + return EXIT_FAILURE; + } +} diff --git a/cmake/scord-utils.cmake b/cmake/scord-utils.cmake index 1b28ba918e33984a3dba5116675df4fde5c94492..d243ca2f1f018b5d5a89d8f02d8ed32153224d65 100644 --- a/cmake/scord-utils.cmake +++ b/cmake/scord-utils.cmake @@ -142,3 +142,11 @@ function(mark_variables_as_advanced) mark_as_advanced(${_var}) endforeach() endfunction() + +# Define a custom target property so that we can store and retrieve the +# installed path of a target. +define_property( + TARGET PROPERTY __INSTALLED_PATH + BRIEF_DOCS "Full path to a target's installed location" + FULL_DOCS "Full path to a target's installed location" +) diff --git a/docker/0.4.0-wip/Dockerfile b/docker/0.4.0-wip/Dockerfile index fea5b4d9280194305553a00f0dea04bb270ae468..15d5918322a91e3979c6c6d76d97d165ec04f041 100644 --- a/docker/0.4.0-wip/Dockerfile +++ b/docker/0.4.0-wip/Dockerfile @@ -1,4 +1,4 @@ -FROM debian:bullseye-slim +FROM debian:bookworm-slim LABEL Description="Debian-based environment suitable to build scord" @@ -32,10 +32,11 @@ RUN apt-get update && \ libyaml-dev libcurl4-openssl-dev procps \ # genopts dependencies python3-venv \ - # redis-plus-plus dependencies \ - libhiredis-dev \ # Slurm plugin dependencies \ libslurm-dev \ + # Cargo dependencies \ + libboost-dev \ + libboost-mpi-dev \ # tests dependencies \ python3-pip && \ ### install cmake 3.23.1 ################################################### @@ -55,11 +56,14 @@ RUN apt-get update && \ git clone https://github.com/mercury-hpc/mercury --recurse-submodules && \ git clone https://github.com/mochi-hpc/mochi-margo --recurse-submodules && \ # cd mochi-margo && git reset --hard v0.9.9 && cd .. && \ + git clone https://github.com/redis/hiredis.git --recurse-submodules && \ + cd hiredis && git checkout v1.2.0 && cd .. && \ git clone https://github.com/sewenew/redis-plus-plus --recurse-submodules && \ git clone https://github.com/francielizanon/agios --recurse-submodules && \ cd agios && git checkout development && cd .. && \ git clone https://github.com/USCiLab/cereal --recurse-submodules && \ git clone https://github.com/mochi-hpc/mochi-thallium --recurse-submodules && \ + git clone https://storage.bsc.es/gitlab/hpc/cargo.git && \ cd mochi-thallium && \ export LD_LIBRARY_PATH=${DEPS_INSTALL_PATH}/lib:${DEPS_INSTALL_PATH}/lib64 && \ export PKG_CONFIG_PATH=${DEPS_INSTALL_PATH}/lib/pkgconfig:${DEPS_INSTALL_PATH}/lib64/pkgconfig && \ @@ -121,6 +125,15 @@ RUN apt-get update && \ make -j install && \ cd .. && rm -rf build && cd && \ \ + ### hiredis + cd deps/hiredis && \ + mkdir build && cd build && \ + cmake -DCMAKE_INSTALL_PREFIX=${DEPS_INSTALL_PATH} \ + -DCMAKE_BUILD_TYPE:STRING=Release \ + .. && \ + make install -j && \ + cd .. && rm -rf build && cd && \ + \ ### redis-plus-plus cd deps/redis-plus-plus && \ mkdir build && cd build && \ @@ -161,8 +174,17 @@ RUN apt-get update && \ make -j install && \ cd .. && rm -rf build && cd && \ \ + ### Cargo \ + cd deps/cargo && \ + mkdir build && cd build && \ + cmake -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_INSTALL_PREFIX=${DEPS_INSTALL_PATH} \ + .. && \ + make -j install && \ + cd .. && rm -rf build && cd && \ + \ ### python packages for testing scripts\ - pip install lark loguru && \ + pip install lark loguru --break-system-packages && \ \ ### Cleanup # Clean apt cache to reduce image layer size diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index fa7a8e969301b242cbd78f866de3c047e14a0f93..cadabb2ed61d26ed7b8e24246125601d4aa9b915 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -121,8 +121,3 @@ if(SCORD_BUILD_TESTS) ) endforeach() endif() - -# Install ADM_ping_cxx as a convenience tool -install(TARGETS ADM_ping_cxx - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - ) diff --git a/plugins/slurm/CMakeLists.txt b/plugins/slurm/CMakeLists.txt index 88b5b6c8017bb44453e36b36fc4d96e71fae8b4b..1395701754c4e5d9cdeef722e65fd9a3a036f446 100644 --- a/plugins/slurm/CMakeLists.txt +++ b/plugins/slurm/CMakeLists.txt @@ -27,7 +27,8 @@ find_package(Slurm REQUIRED) add_library(slurm-plugin SHARED) -get_target_property(SCORD_CTL_BIN scord-ctl SCORD_CTL_BINARY) +get_target_property(SCORDCTL_PROGRAM scord-ctl __INSTALLED_PATH) +get_target_property(SCORD_QUERY_PROGRAM scord_query __INSTALLED_PATH) configure_file(defaults.h.in defaults.h @ONLY) @@ -59,6 +60,14 @@ install( PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME} ) +find_program(MPIEXEC_EXECUTABLE + NAMES mpiexec mpiexec.hydra mpiexec.mpd mpirun + PATH_SUFFIXES bin sbin + DOC "MPI launcher executable") + +set(SCORD_SERVICE_ADDRESS + ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT}) + configure_file(scord_common.sh.in scord_common.sh @ONLY) configure_file(scord_prolog.sh.in scord_prolog.sh @ONLY) configure_file(scord_epilog.sh.in scord_epilog.sh @ONLY) @@ -66,12 +75,9 @@ configure_file(scord_epilog.sh.in scord_epilog.sh @ONLY) get_filename_component(INSTALL_DESTINATION ${CMAKE_CURRENT_SOURCE_DIR} NAME) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/scord_common.sh - ${CMAKE_CURRENT_BINARY_DIR}/scord_prolog.sh - ${CMAKE_CURRENT_BINARY_DIR}/scord_epilog.sh + PROGRAMS + ${CMAKE_CURRENT_BINARY_DIR}/scord_common.sh + ${CMAKE_CURRENT_BINARY_DIR}/scord_prolog.sh + ${CMAKE_CURRENT_BINARY_DIR}/scord_epilog.sh DESTINATION ${CMAKE_INSTALL_DATADIR}/${PROJECT_NAME}/${INSTALL_DESTINATION} - PERMISSIONS - OWNER_EXECUTE OWNER_WRITE OWNER_READ - GROUP_EXECUTE GROUP_READ - WORLD_EXECUTE WORLD_READ ) diff --git a/plugins/slurm/scord_common.sh.in b/plugins/slurm/scord_common.sh.in index 539ad4114ea487435766d439ddbec06482fa11f4..f67242b3a5403903c895e4fdff0df9d026479d3c 100644 --- a/plugins/slurm/scord_common.sh.in +++ b/plugins/slurm/scord_common.sh.in @@ -103,6 +103,34 @@ function get_nodelist { readarray -t rv < <(scontrol show hostnames "$2") } +# Get the list of hostnames associated with a hostlist as a CSV string +# Usage: get_nodelist_as_csv +# Example: +# declare hn +# get_nodelist_as_csv hn_csv tux[1,3-4],snoo[1-2] +# echo "${hn_csv}" # tux1,tux3,tux4,snoo1,snoo2 +function get_nodelist_as_csv { + + if [[ -z "$1" ]]; then + echo "No output array specified" + return 1 + fi + + if [[ -z "$2" ]]; then + echo "No hostlist specified" + return 1 + fi + + local -n rv=$1 + declare -a hn + if ! get_nodelist hn "$2"; then + return 1 + fi + # shellcheck disable=SC2034 + printf -v rv "%s," "${hn[@]}" + rv="${rv%,}" +} + # Get the list of IP addresses associated with a hostname # Usage: get_addrs # Example: @@ -150,6 +178,9 @@ function get_addrs { } # shellcheck disable=SC2034 -export SCORDCTL_PROGRAM="@SCORD_CTL_BIN@" +export SCORD_SERVICE_ADDRESS="@SCORD_SERVICE_ADDRESS@" +export SCORD_QUERY_PROGRAM="@SCORD_QUERY_PROGRAM@" +export SCORDCTL_PROGRAM="@SCORDCTL_PROGRAM@" export SCORDCTL_PROTO="@SCORD_TRANSPORT_PROTOCOL@" export SCORDCTL_PORT="@SCORD_CTL_BIND_PORT@" +export CARGO_PORT="@CARGO_PORT@" diff --git a/plugins/slurm/scord_epilog.sh.in b/plugins/slurm/scord_epilog.sh.in index b45902a155da84e6c0d22be425a4af28e7a3c4ff..afb047de5affcbc5e4072c41b643c7049dc1b0f6 100755 --- a/plugins/slurm/scord_epilog.sh.in +++ b/plugins/slurm/scord_epilog.sh.in @@ -54,7 +54,7 @@ get_nodelist hostnames "$SLURM_NODELIST" # create a temporary directory for the job and redirect both stdout and stderr # to a log file within it -WORKDIR="$EPILOG_TMPDIR/$SLURM_JOB_USER/$SLURM_JOBID" +WORKDIR="$EPILOG_TMPDIR/$SLURM_JOB_USER/$SLURM_JOB_ID" if [ ! -d "$WORKDIR" ]; then run_as "$SLURM_JOB_USER" mkdir -p "$WORKDIR" fi @@ -77,7 +77,42 @@ if [[ "$HOSTNAME" != "${hostnames[0]}" ]]; then fi echo "Shutting down adhoc controller for job $SLURM_JOB_ID (user: $SLURM_JOB_USER)" -PIDFILE="$EPILOG_TMPDIR/$SLURM_JOB_USER/$SLURM_JOBID/scord-ctl.pid" +PIDFILE="$EPILOG_TMPDIR/$SLURM_JOB_USER/$SLURM_JOB_ID/scord-ctl.pid" if [[ -f "$PIDFILE" ]]; then kill -TERM "$(<"$PIDFILE")" fi + +# find out the IP address of the first node of the allocation +declare -a addrs +if ! get_addrs addrs "$HOSTNAME" v4; then + echo "Error searching IP addresses for $HOSTNAME." + exit 1 +fi + +if ((${#addrs[@]} == 0)); then + echo "No addresses found." + exit 1 +fi + +if [[ -z $SLURM_JOB_ID || -z $SLURM_JOB_UID ]]; then + echo "Missing required environment variables" >&2 + exit 1 +fi + +# shellcheck disable=SC2016 +USER_HOME=$(run_as "$SLURM_JOB_USER" echo '$HOME') +CONFIG_DIRECTORY="${XDG_CONFIG_HOME:-$USER_HOME/.config}/cargo" + +CARGO_ID=$(echo "cargo_$SLURM_JOB_ID.$SLURM_JOB_UID" | sha256sum | awk '{ print $1 }') +CARGO_CONFIG_FILE=$CONFIG_DIRECTORY/$CARGO_ID.cfg +CARGO_SERVICE_NAME=$(systemd-escape --template cargo@.service "$CARGO_ID") + +echo "Shutting down Cargo data stager for job $SLURM_JOB_ID (user: $SLURM_JOB_USER)" + +if ! run_as "$SLURM_JOB_USER" systemctl --user stop "$CARGO_SERVICE_NAME"; then + exit 1 +fi + +if [[ -e "$CARGO_CONFIG_FILE" ]]; then + rm "$CARGO_CONFIG_FILE" +fi diff --git a/plugins/slurm/scord_prolog.sh.in b/plugins/slurm/scord_prolog.sh.in index fb4a57eed843f6dd1e6d639132628c3c4e69b353..8389cf148e882a3a5835369e83437225414d5d82 100755 --- a/plugins/slurm/scord_prolog.sh.in +++ b/plugins/slurm/scord_prolog.sh.in @@ -25,7 +25,6 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ - # This is a prolog script for SLURM that starts the SCORD adhoc controller # for the job. It is meant to be used with the SCORD SLURM plugin. # The script is executed as the user that submitted the job. The script @@ -59,10 +58,12 @@ fi HOSTNAME=$(hostname -s) declare -a hostnames get_nodelist hostnames "$SLURM_NODELIST" +declare hostnames_csv +get_nodelist_as_csv hostnames_csv "$SLURM_NODELIST" # create a temporary directory for the job and redirect both stdout and stderr # to a log file within it -WORKDIR="$PROLOG_TMPDIR/$SLURM_JOB_USER/$SLURM_JOBID" +WORKDIR="$PROLOG_TMPDIR/$SLURM_JOB_USER/$SLURM_JOB_ID" if [ ! -d "$WORKDIR" ]; then run_as "$SLURM_JOB_USER" mkdir -p "$WORKDIR" fi @@ -72,8 +73,8 @@ if ((${#hostnames[@]} == 0)); then exit 0 fi -# only run on the first node of the allocation (scord-ctl will always be -# started on the first node of the allocation) +# only run on the first node of the allocation (both scord-ctl and Cargo +# are always started on the first node of the allocation) if [[ "$HOSTNAME" != "${hostnames[0]}" ]]; then exit 0 fi @@ -90,7 +91,7 @@ if ((${#addrs[@]} == 0)); then exit 1 fi -ADDRESS=$(echo "${addrs[@]}" | awk '{ print $1; exit }') +ADDRESS=$(echo "${addrs[0]}" | awk '{ print $1; exit }') # now that we have a specific working directory, move the previous log file # into $WORKDIR so that we have all messages in one place (since the file is @@ -98,7 +99,11 @@ ADDRESS=$(echo "${addrs[@]}" | awk '{ print $1; exit }') # messages are written) mv "$PROLOG_TMPDIR/scord_prolog.$SLURM_JOB_ID.log" "$WORKDIR/scord_prolog.log" -# start the adhoc controller in the background and store its PID in a file +################################################################################ +# Start the scorc-ctl adhoc controller. +# +# in the background and store its PID in a +# file echo "Starting adhoc controller for job $SLURM_JOB_ID (user: $SLURM_JOB_USER)" run_as "$SLURM_JOB_USER" \ "$SCORDCTL_PROGRAM" \ @@ -120,4 +125,84 @@ PID=$(<"$WORKDIR/scord-ctl.pid") echo "Adhoc controller started successfully (PID: $PID)" +################################################################################ +# Start the Cargo data stager. + +# N.B.: Since Slurm doesn't allow programs in the prolog to survive beyond +# their parent script, we start the data stager as a systemd (user-level) +# service. Care must, thus, be taken to ensure that the service is stopped +# when the job finishes. +echo "Starting Cargo data stager for job $SLURM_JOB_ID (user: $SLURM_JOB_USER)" + +if [[ -z $SLURM_JOB_ID || -z $SLURM_JOB_UID ]]; then + echo "Missing required environment variables" >&2 + exit 1 +fi + +# Step 1: Find (or create) the user's directory where configurations can be +# stored (note that $HOME is not set when this prolog script is being executed). +# shellcheck disable=SC2016 +USER_HOME=$(run_as "$SLURM_JOB_USER" echo '$HOME') +USER_CONFIG_DIRECTORY="${XDG_CONFIG_HOME:-$USER_HOME/.config}" +CARGO_CONFIG_DIRECTORY="$USER_CONFIG_DIRECTORY/cargo" +SYSTEMD_USER_DIRECTORY="$USER_CONFIG_DIRECTORY/systemd/user" + +[[ ! -d "$USER_CONFIG_DIRECTORY" ]] && run_as "$SLURM_JOB_USER" mkdir -p "$USER_CONFIG_DIRECTORY" +[[ ! -d "$CARGO_CONFIG_DIRECTORY" ]] && run_as "$SLURM_JOB_USER" mkdir -p "$CARGO_CONFIG_DIRECTORY" +[[ ! -d "$SYSTEMD_USER_DIRECTORY" ]] && run_as "$SLURM_JOB_USER" mkdir -p "$SYSTEMD_USER_DIRECTORY" + +# Step2: Copy the service file provided by Cargo to the user's configuration +# directory so that systemd can find it. +CARGO_SERVICE_FILE="@CARGO_DATA_INSTALL_DIR@/cargo@.service" + +if [[ ! -f "$CARGO_SERVICE_FILE" ]]; then + echo "Cargo service file not found: $CARGO_SERVICE_FILE" + echo "Please check your Cargo installation" + exit 1 +fi + +if ! run_as "$SLURM_JOB_USER" cp "$CARGO_SERVICE_FILE" "$SYSTEMD_USER_DIRECTORY"; then + exit 1 +fi + +# Step 3: Create a configuration file for the Cargo user-level service +# required by this job. +# Each Cargo user-level service must be configured for its job and identified +# by a unique ID. We use the job ID and user ID to generate a unique ID +# for this service instance. Since systemd doesn't allow to easily parameterize +# a service file, we use a template service file (`cargo@.service`) and +# generate a specific configuration file each Cargo service. +CARGO_ID=$(echo "cargo_$SLURM_JOB_ID.$SLURM_JOB_UID" | sha256sum | awk '{ print $1 }') +CARGO_CONFIG_FILE=$CARGO_CONFIG_DIRECTORY/$CARGO_ID.cfg +CARGO_MASTER_ADDRESS="$SCORDCTL_PROTO://$ADDRESS:$CARGO_PORT" +CARGO_INSTANCE_NAME=$(systemd-escape --template cargo@.service "$CARGO_ID") + +if ! CARGO_NUM_NODES=$(@SCORD_QUERY_PROGRAM@ -s @SCORD_SERVICE_ADDRESS@ "$SLURM_JOB_ID" | grep io_procs | awk '{ print $2 }'); then + echo "Failed to determine the number of I/O processes for job $SLURM_JOB_ID" +else + CARGO_NUM_NODES=${#hostnames[@]} +fi + +cat <>"$CARGO_CONFIG_FILE" +CARGO_ID=$CARGO_ID +CARGO_HOSTS=$hostnames_csv +CARGO_NUM_NODES=$CARGO_NUM_NODES +CARGO_ADDRESS=$CARGO_MASTER_ADDRESS +EOT + +chown "$SLURM_JOB_USER":"$SLURM_JOB_GROUP" "$CARGO_CONFIG_FILE" + +if ! run_as "$SLURM_JOB_USER" systemctl --user start "$CARGO_INSTANCE_NAME"; then + exit 1 +fi + +sleep 1s + +if ! run_as "$SLURM_JOB_USER" systemctl --user is-active --quiet "$CARGO_INSTANCE_NAME"; then + echo "Cargo data stager failed to start" + exit 1 +fi + +echo "Cargo data stager started successfully" + exit 0 diff --git a/plugins/slurm/slurmadmcli.c b/plugins/slurm/slurmadmcli.c index d47900cc2f35acd7b7aec6e6b934436c3aa529c8..204c084005da5898d05bc578015152f308d6ea49 100644 --- a/plugins/slurm/slurmadmcli.c +++ b/plugins/slurm/slurmadmcli.c @@ -488,7 +488,7 @@ end: } if(scord_job) { - ADM_remove_job(scord_server, scord_job); + ADM_job_destroy(scord_job); } if(scord_reqs) { diff --git a/plugins/slurm/systemd/cargo@.service.in b/plugins/slurm/systemd/cargo@.service.in new file mode 100644 index 0000000000000000000000000000000000000000..a04872b8a6b35a5304d66c527989f58aa36ad411 --- /dev/null +++ b/plugins/slurm/systemd/cargo@.service.in @@ -0,0 +1,11 @@ +[Unit] +Description=Cargo parallel data stager + +[Service] +Type=simple +EnvironmentFile=%S/cargo/%I.cfg +ExecStart=@CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/slurm/cargoctl start -s ${CARGO_ADDRESS} -H ${CARGO_HOSTS} -n ${CARGO_NUM_NODES} +ExecStop=@CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/slurm/cargoctl stop -s ${CARGO_ADDRESS} +Restart=no +PrivateTmp=true +NoNewPrivileges=true diff --git a/src/common/net/endpoint.hpp b/src/common/net/endpoint.hpp index 1c8db2aae471d76f57335a3fdf959d3f9a1b0bd2..7166280f38f8df972599b7d871ccf8f34e57c295 100644 --- a/src/common/net/endpoint.hpp +++ b/src/common/net/endpoint.hpp @@ -53,6 +53,25 @@ public: } } + template + inline std::optional> + timed_call(const std::string& rpc_name, + const std::chrono::duration& timeout, + Args&&... args) const { + + using namespace std::chrono_literals; + + try { + const auto rpc = m_engine.define(rpc_name); + return std::make_optional( + rpc.on(m_endpoint) + .timed(timeout, std::forward(args)...)); + } catch(const std::exception& ex) { + LOGGER_ERROR("endpoint::timed_call() failed: {}", ex.what()); + return std::nullopt; + } + } + auto endp() const { return m_endpoint; diff --git a/src/common/net/request.hpp b/src/common/net/request.hpp index a6570e76158ffa7a3724d056e2adc2c602da379a..e03d630b489c65034544e5234fb6a1bda02f66e3 100644 --- a/src/common/net/request.hpp +++ b/src/common/net/request.hpp @@ -97,6 +97,11 @@ public: return m_value.value_or(std::move(default_value)); } + constexpr auto + value_or_none() const noexcept { + return m_value; + } + constexpr auto has_value() const noexcept { return m_value.has_value(); diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index f664d4ab4580ea31f104444650339bfe6885de6e..3ace0a2d16734bf6fa5aa5afe707f9ae894454a5 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -33,6 +33,8 @@ using namespace std::literals; +constexpr auto default_ping_timeout = 4s; + namespace api { struct remote_procedure { @@ -62,7 +64,9 @@ ping(const server& srv) { LOGGER_INFO("rpc {:<} body: {{}}", rpc); - if(const auto call_rv = endp.call(rpc.name()); call_rv.has_value()) { + if(const auto call_rv = + endp.timed_call(rpc.name(), default_ping_timeout); + call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; @@ -78,6 +82,43 @@ ping(const server& srv) { return scord::error_code::other; } +tl::expected +query(const server& srv, slurm_job_id job_id) { + + using response_type = network::response_with_value; + + network::client rpc_client{srv.protocol()}; + + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); + + if(const auto& lookup_rv = rpc_client.lookup(srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{slurm_job_id: {}}}", rpc, job_id); + + if(const auto& call_rv = endp.call(rpc.name(), job_id); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + + LOGGER_EVAL( + resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}, job_info: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); + + if(!resp.error_code()) { + return tl::make_unexpected(resp.error_code()); + } + + return resp.value(); + } + } + + LOGGER_ERROR("rpc call failed"); + return tl::make_unexpected(scord::error_code::other); +} + tl::expected register_job(const server& srv, const job::resources& job_resources, const job::requirements& job_requirements, diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index e102d9580a00cf740be8abb64ee9740f6bda8b55..31529bcea64791e04c85b76fea48bc1ddddcdfb2 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -34,6 +34,9 @@ namespace scord::detail { scord::error_code ping(const server& srv); +tl::expected +query(const server& srv, slurm_job_id job_id); + tl::expected register_job(const server& srv, const job::resources& job_resources, const job::requirements& job_requirements, diff --git a/src/lib/errors.c b/src/lib/errors.c index 9b2baa76ea7663ab0fbde6decea27302ca8b58d1..3653591aeeac9ecec919e84df370ae96a00103ab 100644 --- a/src/lib/errors.c +++ b/src/lib/errors.c @@ -37,6 +37,7 @@ const char* const adm_errlist[ADM_ERR_MAX + 1] = { "Cannot create adhoc storage directory", [ADM_EADHOC_DIR_EXISTS] = "Adhoc storage directory already exists", [ADM_ESUBPROCESS_ERROR] = "Subprocess error", + [ADM_ENO_RESOURCES] = "No resources available", [ADM_ETIMEOUT] = "Timeout", [ADM_EOTHER] = "Undetermined error", diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index bb0dfffc9b00447bde6cefc9e3ed82a85b16ff6a..8686636971f809ee875ef60954d6366d7ee14f23 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -208,6 +208,15 @@ ping(const server& srv) { } } +job_info +query(const server& srv, slurm_job_id id) { + return detail::query(srv, id) + .or_else([](auto ec) { + throw std::runtime_error( + fmt::format("ADM_query() error: {}", ec.message())); + }) + .value(); +} scord::job register_job(const server& srv, const job::resources& resources, diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index b7a55895153f352a2281e615e61e666c7bd472b3..469d7d41484394e7169f3e52eb8e93e7c9639def 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -46,6 +46,9 @@ namespace scord { void ping(const server& srv); +job_info +query(const server& srv, slurm_job_id job_id); + scord::job register_job(const server& srv, const job::resources& job_resources, const job::requirements& job_requirements, diff --git a/src/lib/scord/types.h b/src/lib/scord/types.h index 690134ac88fa6de50e3db6079f2639c5fdbd8156..633f849bb4fb2d33a68286782163474649c42621 100644 --- a/src/lib/scord/types.h +++ b/src/lib/scord/types.h @@ -53,6 +53,7 @@ typedef enum { ADM_EADHOC_DIR_CREATE_FAILED, ADM_EADHOC_DIR_EXISTS, ADM_ESUBPROCESS_ERROR, + ADM_ENO_RESOURCES, ADM_ETIMEOUT, ADM_EOTHER, ADM_ERR_MAX = 512 @@ -385,6 +386,18 @@ ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len, ADM_return_t ADM_job_requirements_destroy(ADM_job_requirements_t reqs); +/** + * Destroy a ADM_job_t created by ADM_job_create(). + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param[in] reqs The ADM_job_t to destroy. + * @return ADM_SUCCESS or corresponding error code. + */ +ADM_return_t +ADM_job_destroy(ADM_job_t job); + /* ----------------------------------------------------- */ /* Datasets */ diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 731d478fa4f6f25556035eda1611a56b41787f17..4f34f36aadb4f2ea11aa0ab6ec015e7a83b6b995 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -50,6 +50,7 @@ struct error_code { static const error_code adhoc_dir_create_failed; static const error_code adhoc_dir_exists; static const error_code subprocess_error; + static const error_code no_resources; static const error_code other; constexpr error_code() : m_value(ADM_SUCCESS) {} @@ -121,6 +122,7 @@ constexpr error_code error_code::adhoc_dir_create_failed{ ADM_EADHOC_DIR_CREATE_FAILED}; constexpr error_code error_code::adhoc_dir_exists{ADM_EADHOC_DIR_EXISTS}; constexpr error_code error_code::subprocess_error{ADM_ESUBPROCESS_ERROR}; +constexpr error_code error_code::no_resources{ADM_ENO_RESOURCES}; constexpr error_code error_code::other{ADM_EOTHER}; using job_id = std::uint64_t; @@ -240,7 +242,7 @@ struct adhoc_storage { explicit ctx(ADM_adhoc_context_t ctx); explicit operator ADM_adhoc_context_t() const; - std::string + std::string const& controller_address() const; execution_mode exec_mode() const; @@ -290,7 +292,7 @@ struct adhoc_storage { type() const; std::uint64_t id() const; - adhoc_storage::ctx + adhoc_storage::ctx const& context() const; adhoc_storage::resources @@ -465,6 +467,45 @@ private: serialize(Archive& ar); }; +/** + * Information about a job. + */ +class job_info { + +public: + job_info() = default; + explicit job_info(std::string adhoc_controller_address, + std::uint32_t procs_for_io) + : m_adhoc_address(std::move(adhoc_controller_address)), + m_procs_for_io(procs_for_io) {} + + constexpr std::string const& + adhoc_controller_address() const { + return m_adhoc_address; + } + + /** + * @brief Get the number of processes that should be used for I/O. + * @return The number of processes that should be used for I/O. + */ + constexpr std::uint32_t + io_procs() const { + return m_procs_for_io; + } + +private: + friend class cereal::access; + template + void + serialize(Archive& ar) { + ar & m_adhoc_address; + ar & m_procs_for_io; + } + + std::string m_adhoc_address; + std::uint32_t m_procs_for_io; +}; + struct transfer { enum class mapping : std::underlying_type::type { @@ -632,6 +673,17 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const scord::job_info& ji, FormatContext& ctx) const { + return format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}", + ji.adhoc_controller_address(), ji.io_procs()); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. @@ -879,6 +931,17 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + + // parse is inherited from formatter. + template + auto + format(const std::nullopt_t& /*t*/, FormatContext& ctx) const { + return formatter::format("none", ctx); + } +}; + template struct fmt::formatter> : formatter { diff --git a/src/lib/types.c b/src/lib/types.c index 9694549cf2130e758cdb282881889d5a232b1561..ab439d33911b70bbb97f6b4d0fa82ecaf83d4882 100644 --- a/src/lib/types.c +++ b/src/lib/types.c @@ -919,15 +919,6 @@ ADM_job_create(uint64_t id, uint64_t slurm_id) { return adm_job; } -/** - * Destroy a ADM_job_t created by ADM_job_create(). - * - * @remark This function is not actually part of the public API, but it is - * useful to have for internal purposes - * - * @param[in] reqs The ADM_job_t to destroy. - * @return ADM_SUCCESS or corresponding error code. - */ ADM_return_t ADM_job_destroy(ADM_job_t job) { ADM_return_t ret = ADM_SUCCESS; diff --git a/src/lib/types.cpp b/src/lib/types.cpp index b2bc97ffe7cabc447fe3c780261cbd80216d958d..cb6782e8a7ed5ee8baa523737e57ea6facb92250 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -583,7 +583,7 @@ adhoc_storage::ctx::operator ADM_adhoc_context_t() const { m_should_flush); } -std::string +std::string const& adhoc_storage::ctx::controller_address() const { return m_controller_address; } @@ -640,7 +640,7 @@ public: return m_id; } - adhoc_storage::ctx + adhoc_storage::ctx const& context() const { return m_ctx; } @@ -740,7 +740,7 @@ adhoc_storage::id() const { return m_pimpl->id(); } -adhoc_storage::ctx +adhoc_storage::ctx const& adhoc_storage::context() const { return m_pimpl->context(); } diff --git a/src/scord-ctl/CMakeLists.txt b/src/scord-ctl/CMakeLists.txt index 7e97d3588142b741ffa138d4ba860a4005a887b4..be23cd5d1c11097daf4acdea18deb4dead6cd7c0 100644 --- a/src/scord-ctl/CMakeLists.txt +++ b/src/scord-ctl/CMakeLists.txt @@ -44,16 +44,9 @@ target_link_libraries( libscord_cxx_types fmt::fmt CLI11::CLI11 ryml::ryml ) -set(SCORD_CTL_BIN "${CMAKE_INSTALL_FULL_BINDIR}/scord-ctl") - -define_property( - TARGET PROPERTY SCORD_CTL_BINARY - BRIEF_DOCS "Path to scord-ctl binary" - FULL_DOCS "Path to scord-ctl binary" -) - set_target_properties( - scord-ctl PROPERTIES SCORD_CTL_BINARY ${CMAKE_INSTALL_FULL_BINDIR}/scord-ctl + scord-ctl PROPERTIES __INSTALLED_PATH + ${CMAKE_INSTALL_FULL_BINDIR}/scord-ctl ) install(TARGETS scord-ctl DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/scord/adhoc_storage_manager.hpp b/src/scord/adhoc_storage_manager.hpp index e0bccc0797761273df57d674e6fb7d6a9eb66bfa..d22de87ae300fff3ee6bc63c1f0a6c4b8325df49 100644 --- a/src/scord/adhoc_storage_manager.hpp +++ b/src/scord/adhoc_storage_manager.hpp @@ -113,8 +113,8 @@ struct adhoc_storage_manager { if(const auto it = m_adhoc_storages.find(id); it != m_adhoc_storages.end()) { - const auto current_adhoc_info = it->second; - current_adhoc_info->update(std::move(new_resources)); + const auto adhoc_metadata_ptr = it->second; + adhoc_metadata_ptr->update(std::move(new_resources)); return scord::error_code::success; } @@ -156,12 +156,14 @@ struct adhoc_storage_manager { } scord::error_code - add_client_info(std::uint64_t adhoc_id, - std::shared_ptr job_info) { + add_client_info( + std::uint64_t adhoc_id, + std::shared_ptr job_metadata_ptr) { if(auto am_result = find(adhoc_id); am_result.has_value()) { - const auto adhoc_storage_info = am_result.value(); - return adhoc_storage_info->add_client_info(std::move(job_info)); + const auto adhoc_metadata_ptr = am_result.value(); + return adhoc_metadata_ptr->add_client_info( + std::move(job_metadata_ptr)); } return scord::error_code::no_such_entity; @@ -170,8 +172,8 @@ struct adhoc_storage_manager { scord::error_code remove_client_info(std::uint64_t adhoc_id) { if(auto am_result = find(adhoc_id); am_result.has_value()) { - const auto adhoc_storage_info = *am_result; - adhoc_storage_info->remove_client_info(); + const auto adhoc_metadata_ptr = *am_result; + adhoc_metadata_ptr->remove_client_info(); return scord::error_code::success; } diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp index eb82d0c09d5e075e02d25818bd7127362a1753fa..993cefb2a8a84df963f01bfd705c5b505c314d8d 100644 --- a/src/scord/internal_types.cpp +++ b/src/scord/internal_types.cpp @@ -29,10 +29,13 @@ namespace scord::internal { -job_metadata::job_metadata(scord::job job, scord::job::resources resources, - scord::job::requirements requirements) +job_metadata::job_metadata( + scord::job job, scord::job::resources resources, + scord::job::requirements requirements, + std::shared_ptr adhoc_metadata_ptr) : m_job(std::move(job)), m_resources(std::move(resources)), - m_requirements(std::move(requirements)) {} + m_requirements(std::move(requirements)), + m_adhoc_metadata_ptr(std::move(adhoc_metadata_ptr)) {} scord::job job_metadata::job() const { @@ -44,6 +47,14 @@ job_metadata::resources() const { return m_resources; } +std::uint32_t +job_metadata::io_procs() const { + if(m_resources) { + return m_resources->nodes().size(); + } + return 0; +} + void job_metadata::update(scord::job::resources resources) { m_resources = std::move(resources); @@ -63,6 +74,11 @@ adhoc_storage_metadata::uuid() const { return m_uuid; } +std::string const& +adhoc_storage_metadata::controller_address() const { + return m_adhoc_storage.context().controller_address(); +} + void adhoc_storage_metadata::update(scord::adhoc_storage::resources new_resources) { m_adhoc_storage.update(std::move(new_resources)); @@ -70,9 +86,9 @@ adhoc_storage_metadata::update(scord::adhoc_storage::resources new_resources) { scord::error_code adhoc_storage_metadata::add_client_info( - std::shared_ptr job_info) { + std::shared_ptr job_metadata_ptr) { - scord::abt::unique_lock lock(m_info_mutex); + scord::abt::unique_lock lock(m_mutex); if(m_client_info) { LOGGER_ERROR("adhoc storage {} already has a client", @@ -80,20 +96,20 @@ adhoc_storage_metadata::add_client_info( return error_code::adhoc_in_use; } - m_client_info = std::move(job_info); + m_client_info = std::move(job_metadata_ptr); return error_code::success; } void adhoc_storage_metadata::remove_client_info() { - scord::abt::unique_lock lock(m_info_mutex); + scord::abt::unique_lock lock(m_mutex); m_client_info.reset(); } std::shared_ptr adhoc_storage_metadata::client_info() const { - scord::abt::shared_lock lock(m_info_mutex); + scord::abt::shared_lock lock(m_mutex); return m_client_info; } diff --git a/src/scord/internal_types.hpp b/src/scord/internal_types.hpp index 08fb6c0de42e17132c093bc1a6593f74ba176fa8..e22f862143587161b3c51aeae78264508a689d78 100644 --- a/src/scord/internal_types.hpp +++ b/src/scord/internal_types.hpp @@ -35,7 +35,9 @@ namespace scord::internal { struct job_metadata { job_metadata(scord::job job, scord::job::resources resources, - scord::job::requirements requirements); + scord::job::requirements requirements, + std::shared_ptr + adhoc_metadata_ptr); scord::job job() const; @@ -43,17 +45,26 @@ struct job_metadata { std::optional resources() const; + std::uint32_t + io_procs() const; + std::optional requirements() const { return m_requirements; } + std::shared_ptr + adhoc_storage_metadata() const { + return m_adhoc_metadata_ptr; + } + void update(scord::job::resources resources); scord::job m_job; std::optional m_resources; std::optional m_requirements; + std::shared_ptr m_adhoc_metadata_ptr; }; struct adhoc_storage_metadata { @@ -67,11 +78,15 @@ struct adhoc_storage_metadata { std::string const& uuid() const; + std::string const& + controller_address() const; + void update(scord::adhoc_storage::resources new_resources); scord::error_code - add_client_info(std::shared_ptr job_info); + add_client_info( + std::shared_ptr job_metadata_ptr); void remove_client_info(); @@ -82,7 +97,7 @@ struct adhoc_storage_metadata { std::string m_uuid; scord::adhoc_storage m_adhoc_storage; std::shared_ptr m_client_info; - mutable scord::abt::shared_mutex m_info_mutex; + mutable scord::abt::shared_mutex m_mutex; }; struct pfs_storage_metadata { diff --git a/src/scord/job_manager.hpp b/src/scord/job_manager.hpp index ce11595f599c18ff3ded8c02efd12289b36f6524..cf4486d597c0d431d140f5bd0a5193341e9fb2fb 100644 --- a/src/scord/job_manager.hpp +++ b/src/scord/job_manager.hpp @@ -41,7 +41,9 @@ struct job_manager { tl::expected, scord::error_code> create(scord::slurm_job_id slurm_id, scord::job::resources job_resources, - scord::job::requirements job_requirements) { + scord::job::requirements job_requirements, + std::shared_ptr + adhoc_metadata_ptr) { static std::atomic_uint64_t current_id; scord::job_id id = current_id++; @@ -53,7 +55,10 @@ struct job_manager { id, std::make_shared( scord::job{id, slurm_id}, std::move(job_resources), - std::move(job_requirements))); + std::move(job_requirements), + std::move(adhoc_metadata_ptr))); + + m_slurm_to_scord.emplace(slurm_id, id); if(!inserted) { LOGGER_ERROR("{}: Emplace failed", __FUNCTION__); @@ -96,6 +101,22 @@ struct job_manager { return tl::make_unexpected(scord::error_code::no_such_entity); } + tl::expected, + scord::error_code> + find_by_slurm_id(scord::slurm_job_id slurm_id) { + + abt::shared_lock lock(m_jobs_mutex); + + if(auto it = m_slurm_to_scord.find(slurm_id); + it != m_slurm_to_scord.end()) { + return find(it->second); + } + + LOGGER_ERROR("Slurm job '{}' was not registered or was already deleted", + slurm_id); + return tl::make_unexpected(scord::error_code::no_such_entity); + } + tl::expected, scord::error_code> remove(scord::job_id id) { @@ -104,6 +125,7 @@ struct job_manager { if(const auto it = m_jobs.find(id); it != m_jobs.end()) { auto nh = m_jobs.extract(it); + m_slurm_to_scord.erase(nh.mapped()->job().slurm_id()); return nh.mapped(); } @@ -117,6 +139,7 @@ private: std::unordered_map> m_jobs; + std::unordered_map m_slurm_to_scord; }; } // namespace scord diff --git a/src/scord/pfs_storage_manager.hpp b/src/scord/pfs_storage_manager.hpp index d03e89571fb8155858d7d4c59b87512937cb2848..c1a84a73c6467213867862cb393d937b5dc263dc 100644 --- a/src/scord/pfs_storage_manager.hpp +++ b/src/scord/pfs_storage_manager.hpp @@ -73,8 +73,8 @@ struct pfs_storage_manager { if(const auto it = m_pfs_storages.find(id); it != m_pfs_storages.end()) { - const auto current_pfs_info = it->second; - current_pfs_info->update(std::move(new_ctx)); + const auto pfs_metadata_ptr = it->second; + pfs_metadata_ptr->update(std::move(new_ctx)); return scord::error_code::success; } diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 4e14eb0b7ca6c52de673ad4f2575fe88a2dceab6..737b93197f24763dcb0c1f765d114830a49e6049 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -29,6 +29,15 @@ #include #include "rpc_server.hpp" +template +constexpr std::optional +value_or_none(tl::expected&& e) { + if(e.has_value()) { + return e.value(); + } + return std::nullopt; +} + using namespace std::literals; namespace scord { @@ -42,6 +51,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, #define EXPAND(rpc_name) "ADM_" #rpc_name##s, &rpc_server::rpc_name provider::define(EXPAND(ping)); + provider::define(EXPAND(query)); provider::define(EXPAND(register_job)); provider::define(EXPAND(update_job)); provider::define(EXPAND(remove_job)); @@ -79,6 +89,45 @@ rpc_server::ping(const network::request& req) { req.respond(resp); } +void +rpc_server::query(const network::request& req, slurm_job_id job_id) { + + using network::get_address; + using network::rpc_info; + using response_type = network::response_with_value; + + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{slurm_job_id: {}}}", rpc, job_id); + + const auto rv = + m_job_manager.find_by_slurm_id(job_id) + .or_else([&](auto&& ec) { + LOGGER_ERROR("Error retrieving job metadata: {}", ec); + }) + .and_then([&](auto&& job_metadata_ptr) + -> tl::expected { + if(!job_metadata_ptr->resources()) { + return tl::make_unexpected( + error_code::no_resources); + } + return job_info{ + job_metadata_ptr->adhoc_storage_metadata() + ->controller_address(), + job_metadata_ptr->io_procs()}; + }); + + const response_type resp = + rv ? response_type{rpc.id(), error_code::success, rv.value()} + : response_type{rpc.id(), rv.error()}; + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:<} body: {{retval: {}, job_info: {}}}", rpc, + resp.error_code(), resp.value_or_none()); + + req.respond(resp); +} + void rpc_server::register_job(const network::request& req, const scord::job::resources& job_resources, @@ -98,24 +147,41 @@ rpc_server::register_job(const network::request& req, scord::error_code ec; std::optional job_id; - if(const auto jm_result = - m_job_manager.create(slurm_id, job_resources, job_requirements); + std::shared_ptr adhoc_metadata_ptr; + + // If the job requires an adhoc storage instance, find the appropriate + // adhoc_storage metadata so that we can associate it with the job_metadata + // we are about to create + if(job_requirements.adhoc_storage()) { + const auto adhoc_id = job_requirements.adhoc_storage()->id(); + if(const auto am_result = m_adhoc_manager.find(adhoc_id); + am_result.has_value()) { + adhoc_metadata_ptr = am_result.value(); + } else { + LOGGER_ERROR( + "rpc id: {} error_msg: \"Error finding adhoc_storage: {}\"", + rpc.id(), am_result.error()); + ec = am_result.error(); + } + + if(!ec) { + goto respond; + } + } + + if(const auto jm_result = m_job_manager.create( + slurm_id, job_resources, job_requirements, adhoc_metadata_ptr); jm_result.has_value()) { - const auto& job_metadata = jm_result.value(); + const auto& job_metadata_ptr = jm_result.value(); // if the job requires an adhoc storage instance, inform the appropriate // adhoc_storage instance (if registered) - if(job_requirements.adhoc_storage()) { - const auto adhoc_id = job_requirements.adhoc_storage()->id(); - ec = m_adhoc_manager.add_client_info(adhoc_id, job_metadata); - - if(!ec) { - goto respond; - } + if(adhoc_metadata_ptr) { + adhoc_metadata_ptr->add_client_info(job_metadata_ptr); } - job_id = job_metadata->job().id(); + job_id = job_metadata_ptr->job().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc.id(), jm_result.error()); @@ -174,10 +240,10 @@ rpc_server::remove_job(const network::request& req, scord::job_id job_id) { if(jm_result) { // if the job was using an adhoc storage instance, inform the // appropriate adhoc_storage that the job is no longer its client - const auto& job_metadata = jm_result.value(); + const auto& job_metadata_ptr = jm_result.value(); if(const auto adhoc_storage = - job_metadata->requirements()->adhoc_storage(); + job_metadata_ptr->requirements()->adhoc_storage(); adhoc_storage.has_value()) { ec = m_adhoc_manager.remove_client_info(adhoc_storage->id()); } @@ -217,8 +283,8 @@ rpc_server::register_adhoc_storage( if(const auto am_result = m_adhoc_manager.create(type, name, ctx, resources); am_result.has_value()) { - const auto& adhoc_storage_info = am_result.value(); - adhoc_id = adhoc_storage_info->adhoc_storage().id(); + const auto& adhoc_metadata_ptr = am_result.value(); + adhoc_id = adhoc_metadata_ptr->adhoc_storage().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: " "{}\"", @@ -310,10 +376,10 @@ rpc_server::deploy_adhoc_storage(const network::request& req, * information about the instance to deploy. * @return */ - const auto deploy_helper = [&](const auto& adhoc_metadata) + const auto deploy_helper = [&](const auto& adhoc_metadata_ptr) -> tl::expected { - assert(adhoc_metadata); - const auto adhoc_storage = adhoc_metadata->adhoc_storage(); + assert(adhoc_metadata_ptr); + const auto adhoc_storage = adhoc_metadata_ptr->adhoc_storage(); const auto endp = lookup(adhoc_storage.context().controller_address()); if(!endp) { @@ -325,12 +391,12 @@ rpc_server::deploy_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", - child_rpc, std::quoted(adhoc_metadata->uuid()), + child_rpc, std::quoted(adhoc_metadata_ptr->uuid()), adhoc_storage.type(), adhoc_storage.get_resources()); - if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata->uuid(), - adhoc_storage.type(), - adhoc_storage.get_resources()); + if(const auto call_rv = endp->call( + rpc.name(), adhoc_metadata_ptr->uuid(), adhoc_storage.type(), + adhoc_storage.get_resources()); call_rv.has_value()) { const response_type resp{call_rv.value()}; @@ -390,9 +456,9 @@ rpc_server::terminate_adhoc_storage(const network::request& req, * @return */ const auto terminate_helper = - [&](const auto& adhoc_metadata) -> error_code { - assert(adhoc_metadata); - const auto adhoc_storage = adhoc_metadata->adhoc_storage(); + [&](const auto& adhoc_metadata_ptr) -> error_code { + assert(adhoc_metadata_ptr); + const auto adhoc_storage = adhoc_metadata_ptr->adhoc_storage(); const auto endp = lookup(adhoc_storage.context().controller_address()); if(!endp) { @@ -404,10 +470,12 @@ rpc_server::terminate_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}}}", child_rpc, - std::quoted(adhoc_metadata->uuid()), adhoc_storage.type()); + std::quoted(adhoc_metadata_ptr->uuid()), + adhoc_storage.type()); - if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata->uuid(), - adhoc_storage.type()); + if(const auto call_rv = + endp->call(rpc.name(), adhoc_metadata_ptr->uuid(), + adhoc_storage.type()); call_rv.has_value()) { const response_type resp{call_rv.value()}; @@ -457,8 +525,8 @@ rpc_server::register_pfs_storage(const network::request& req, if(const auto pm_result = m_pfs_manager.create(type, name, ctx); pm_result.has_value()) { - const auto& adhoc_storage_info = pm_result.value(); - pfs_id = adhoc_storage_info->pfs_storage().id(); + const auto& adhoc_metadata_ptr = pm_result.value(); + pfs_id = adhoc_metadata_ptr->pfs_storage().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating pfs_storage: {}\"", rpc.id(), pm_result.error()); diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 6fe2331acd565059a032b89732d923eb1ac8749d..494c85fe164f77b40042639662fd2fda67065630 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -45,6 +45,9 @@ private: void ping(const network::request& req); + void + query(const network::request& req, scord::job_id job_id); + void register_job(const network::request& req, const scord::job::resources& job_resources, @@ -71,11 +74,11 @@ private: remove_adhoc_storage(const network::request& req, std::uint64_t adhoc_id); void - deploy_adhoc_storage(const network::request& adhoc_metadata, + deploy_adhoc_storage(const network::request& adhoc_metadata_ptr, std::uint64_t adhoc_id); void - terminate_adhoc_storage(const network::request& adhoc_metadata, + terminate_adhoc_storage(const network::request& adhoc_metadata_ptr, std::uint64_t adhoc_id); void