diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cd6304d01e2ee32c4da35e2585bc663a4abba4c..fa400f92e202cdc4d467abb84b6f8cbcd488f4c4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( scord - VERSION 0.3.4 + VERSION 0.3.6 LANGUAGES C CXX ) @@ -312,7 +312,7 @@ find_package(RedisPlusPlus 1.3.3 REQUIRED) ### Cargo: required for transferring datasets between storage tiers message(STATUS "[${PROJECT_NAME}] Checking for Cargo") -find_package(Cargo 0.3.1 REQUIRED) +find_package(Cargo 0.3.6 REQUIRED) message(STATUS "[${PROJECT_NAME}] Checking for Hiredis") find_package(hiredis REQUIRED) diff --git a/COPYRIGHT_NOTICE b/COPYRIGHT_NOTICE index a3f50b7b97179c88f3237bc2f5360c6647cde940..502aa0a73325be83ff306a9a04e658c00615a6e3 100644 --- a/COPYRIGHT_NOTICE +++ b/COPYRIGHT_NOTICE @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * Copyright 2021-2024, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). diff --git a/cli/scord_query.cpp b/cli/scord_query.cpp index d0b9b44f64e1ebe95462f89c2d10096900777d9e..b706579d6312df09c4ff4aba2e3e94e6bd63774b 100644 --- a/cli/scord_query.cpp +++ b/cli/scord_query.cpp @@ -31,6 +31,7 @@ struct query_config { std::string progname; std::string server_address; std::uint32_t job_id{}; + bool verbose{}; }; query_config @@ -47,6 +48,7 @@ parse_command_line(int argc, char* argv[]) { ->required(); app.add_option("job_id", cfg.job_id, "Job ID")->required(); + app.add_option("-v, --verbose ", cfg.verbose, "Enable verbose output"); try { app.parse(argc, argv); return cfg; @@ -81,13 +83,14 @@ main(int argc, char* argv[]) { scord::job_info info = scord::query(scord::server{protocol, address}, cfg.job_id); - + if (cfg.verbose) fmt::print(stdout, "Job metadata:\n" " adhoc_controller: {}\n" + " adhoc_uuid: {}\n" " io_procs: {}\n", - info.adhoc_controller_address(), info.io_procs()); - + info.adhoc_controller_address(), info.uuid(), info.io_procs()); + else fmt::print(stdout,"{}\n", info.uuid()); } catch(const std::exception& ex) { fmt::print(stderr, "Error: {}\n", ex.what()); return EXIT_FAILURE; diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 76fff6aa861168298082a063e9c8961625c51f9a..60c07e00c273c0e3b5610618f939cbbc706a4d88 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -117,7 +117,7 @@ main(int argc, char* argv[]) { ADM_transfer_t tx; ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + targets_len, limits, limits_len, mapping, &tx, false); if(ret != ADM_SUCCESS) { fprintf(stderr, diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index 0442740eb548e0eade2e21d5138656cb5d6d52b7..bdaded3dc0f51417597f5f4c244cdbc77ac5274b 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -117,7 +117,7 @@ main(int argc, char* argv[]) { ADM_transfer_t tx; ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + targets_len, limits, limits_len, mapping, &tx, false); if(ret != ADM_SUCCESS) { fprintf(stderr, diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index cbadc2a6877ce2cfc1f65fcb54791357ac9b50a2..c784fbfd493b634c1a5b5e55acb6b7d70fc0b458 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -121,7 +121,7 @@ main(int argc, char* argv[]) { ADM_transfer_t tx; ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + targets_len, limits, limits_len, mapping, &tx, false); if(ret != ADM_SUCCESS) { diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index 3d4bb8dc67e9e3bcd5e17adc89ccf19168b152d7..5164a9ba041592b8716628ddd25b3a30d9f87f47 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -117,7 +117,7 @@ main(int argc, char* argv[]) { ADM_transfer_t tx; ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + targets_len, limits, limits_len, mapping, &tx, false); if(ret != ADM_SUCCESS) { fprintf(stderr, diff --git a/examples/c/ADM_transfer_datasets.c b/examples/c/ADM_transfer_datasets.c index e1bc1d8004a68e00d059aa01641eef60d308336b..4086574ee5333188649e3c06ea39d06bf8bc02dd 100644 --- a/examples/c/ADM_transfer_datasets.c +++ b/examples/c/ADM_transfer_datasets.c @@ -117,7 +117,7 @@ main(int argc, char* argv[]) { ADM_transfer_t tx; ret = ADM_transfer_datasets(server, job, sources, NSOURCES, targets, - NTARGETS, limits, NLIMITS, mapping, &tx); + NTARGETS, limits, NLIMITS, mapping, &tx, false); if(ret != ADM_SUCCESS) { fprintf(stderr, diff --git a/plugins/adhoc_services.d/gekkofs.sh b/plugins/adhoc_services.d/gekkofs.sh index be65b8f2ae32e4cc8e975bcf82851f5e307e4df2..e6a4631f26bcaf2e56b8f7d06f52b545a8af8646 100644 --- a/plugins/adhoc_services.d/gekkofs.sh +++ b/plugins/adhoc_services.d/gekkofs.sh @@ -1,34 +1,42 @@ #!/usr/bin/bash echo "GEKKOFS Script Called" $HOSTNAME $SLURM_JOBID - +# If GKFS_DAEMON is not defined then define it here +if [ -z "$GKFS_DAEMON" ]; then + GKFS_DAEMON=/home/rnou/iodeps/bin/gkfs_daemon +fi +# If LIBGKFS_HOSTS_FILE is not defined then define it here +if [ -z "$LIBGKFS_HOSTS_FILE" ]; then + LIBGKFS_HOSTS_FILE=/tmp/gekkofs/gkfs_hosts.txt +fi if [ "$1" == "start" ]; then echo "Starting GEKKOFS" nodes=$3 num_nodes=$(echo $nodes | awk -F, '{print NF}') - # If num_nodes is greater than 40, we are on the testing environment - if [ $num_nodes -gt 40 ]; then + # If num_nodes is 50, we are on the testing environment + if [ $num_nodes -eq 50 ]; then exit 0 fi workdir=$5 datadir=$7 mountdir=$9 unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE - srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "mkdir -p $mountdir; mkdir -p $datadir" - srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=4 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & + + srun -N $num_nodes -n $num_nodes --oversubscribe --overlap --cpus-per-task=1 --mem-per-cpu=1 --export=ALL /usr/bin/bash -c "mkdir -p $mountdir; mkdir -p $datadir" + srun -N $num_nodes -n $num_nodes --oversubscribe --overlap --cpus-per-task=1 --mem-per-cpu=1 --export=ALL /usr/bin/bash -c "$GKFS_DAEMON --rootdir $datadir --mountdir $mountdir -H $LIBGKFS_HOSTS_FILE" & sleep 4 elif [ "$1" == "stop" ]; then echo "Stopping GEKKOFS" nodes=$3 num_nodes=$(echo $nodes | awk -F, '{print NF}') - # If num_nodes is greater than 40, we are on the testing environment - if [ $num_nodes -gt 40 ]; then + # If num_nodes is 50, we are on the testing environment + if [ $num_nodes -eq 50 ]; then exit 0 fi unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE - srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 pkill -9 gkfs_daemon + srun -N $num_nodes -n $num_nodes --overlap --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL /usr/bin/bash -c "pkill -9 gkfs_daemon" elif [ "$1" == "expand" ]; then echo "Expand command" elif [ "$1" == "shrink" ]; then diff --git a/plugins/slurm/scord_epilog.sh.in b/plugins/slurm/scord_epilog.sh.in index afb047de5affcbc5e4072c41b643c7049dc1b0f6..516d7a8bf61f754e153ab505f49301b954115a3c 100755 --- a/plugins/slurm/scord_epilog.sh.in +++ b/plugins/slurm/scord_epilog.sh.in @@ -108,8 +108,8 @@ CARGO_CONFIG_FILE=$CONFIG_DIRECTORY/$CARGO_ID.cfg CARGO_SERVICE_NAME=$(systemd-escape --template cargo@.service "$CARGO_ID") echo "Shutting down Cargo data stager for job $SLURM_JOB_ID (user: $SLURM_JOB_USER)" - -if ! run_as "$SLURM_JOB_USER" systemctl --user stop "$CARGO_SERVICE_NAME"; then +CUID=$(id -u $SLURM_JOB_USER) +if ! run_as "$SLURM_JOB_USER" DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/$CUID/bus systemctl --user stop "$CARGO_SERVICE_NAME"; then exit 1 fi diff --git a/plugins/slurm/scord_prolog.sh.in b/plugins/slurm/scord_prolog.sh.in index d67e0ececfeddba99a7dd5ffea83f34e8e957ba8..6c187671cb807452a3e5839e7d5e3143219fa258 100755 --- a/plugins/slurm/scord_prolog.sh.in +++ b/plugins/slurm/scord_prolog.sh.in @@ -176,11 +176,21 @@ CARGO_ID=$(echo "cargo_$SLURM_JOB_ID.$SLURM_JOB_UID" | sha256sum | awk '{ print CARGO_CONFIG_FILE=$CARGO_CONFIG_DIRECTORY/$CARGO_ID.cfg CARGO_MASTER_ADDRESS="$SCORDCTL_PROTO://$ADDRESS:$CARGO_PORT" CARGO_INSTANCE_NAME=$(systemd-escape --template cargo@.service "$CARGO_ID") +# This will fail always as we do not have the job registered in this moment + +#if ! CARGO_NUM_NODES=$(@SCORD_QUERY_PROGRAM@ -s @SCORD_SERVICE_ADDRESS@ "$SLURM_JOB_ID" | grep io_procs | awk '{ print $2 }'); then +# echo "Failed to determine the number of I/O processes for job $SLURM_JOB_ID" +#else +CARGO_NUM_NODES=${#hostnames[@]} +#fi +# If LIBGKFS_HOSTS_FILE is nor defined then do it +if [ -z "$LIBGKFS_HOSTS_FILE" ]; then + LIBGKFS_HOSTS_FILE=/tmp/gekkofs/gkfs_hosts.txt +fi -if ! CARGO_NUM_NODES=$(@SCORD_QUERY_PROGRAM@ -s @SCORD_SERVICE_ADDRESS@ "$SLURM_JOB_ID" | grep io_procs | awk '{ print $2 }'); then - echo "Failed to determine the number of I/O processes for job $SLURM_JOB_ID" -else - CARGO_NUM_NODES=${#hostnames[@]} +# if number of CARGO_NUM_NODES is below 2, use 2, they will be colocated +if [ $CARGO_NUM_NODES -lt 2 ]; then + CARGO_NUM_NODES=2 fi cat <>"$CARGO_CONFIG_FILE" @@ -188,6 +198,7 @@ CARGO_ID=$CARGO_ID CARGO_HOSTS=$hostnames_csv CARGO_NUM_NODES=$CARGO_NUM_NODES CARGO_ADDRESS=$CARGO_MASTER_ADDRESS +LIBGKFS_HOSTS_FILE=$LIBGKFS_HOSTS_FILE EOT CUID=$(id -u $SLURM_JOB_USER) chown "$SLURM_JOB_USER":"$SLURM_JOB_GROUP" "$CARGO_CONFIG_FILE" diff --git a/plugins/slurm/slurmadmcli.c b/plugins/slurm/slurmadmcli.c index 61d10078627ca702b97ff695265cdfc11a9dbe80..7a73dcecadf9b179e59f3bb0469cfaf4e591ffe6 100644 --- a/plugins/slurm/slurmadmcli.c +++ b/plugins/slurm/slurmadmcli.c @@ -33,9 +33,22 @@ #include #include +#include +#include #include "defaults.h" #include "utils.h" +#include +#include +#include +#include +#include + +#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(23,0,0) +#define POINTER * +#else +#define POINTER +#endif /** * Slurm SPANK plugin to handle the ADMIRE adhoc storage CLI. Options are * forwarded to scord on srun, salloc and sbatch. See the struct spank_option @@ -434,23 +447,33 @@ process_config(int ac, char** av, scord_plugin_config_t* cfg) { return 0; } - +ADM_server_t scord_server = NULL; +ADM_node_t* nodes = NULL; +ADM_job_resources_t job_resources = NULL; +ADM_adhoc_resources_t adhoc_resources = NULL; +ADM_adhoc_context_t adhoc_ctx = NULL; +ADM_adhoc_storage_t adhoc_storage = NULL; +ADM_job_requirements_t scord_reqs = NULL; +ADM_job_t scord_job = NULL; +ADM_transfer_t transfer = NULL; +char* adhoc_path = NULL; static int -scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, - uint32_t jobid) { +scord_register_job(spank_t sp, scord_plugin_config_t cfg, + scord_nodelist_t nodelist, uint32_t jobid) { int rc = 0; int nnodes = 0; - ADM_server_t scord_server = NULL; - ADM_node_t* nodes = NULL; - ADM_job_resources_t job_resources = NULL; - ADM_adhoc_resources_t adhoc_resources = NULL; - ADM_adhoc_context_t adhoc_ctx = NULL; - ADM_adhoc_storage_t adhoc_storage = NULL; - ADM_job_requirements_t scord_reqs = NULL; - ADM_job_t scord_job = NULL; - char* adhoc_path = NULL; + /* ADM_server_t scord_server = NULL; + ADM_node_t* nodes = NULL; + ADM_job_resources_t job_resources = NULL; + ADM_adhoc_resources_t adhoc_resources = NULL; + ADM_adhoc_context_t adhoc_ctx = NULL; + ADM_adhoc_storage_t adhoc_storage = NULL; + ADM_job_requirements_t scord_reqs = NULL; + ADM_job_t scord_job = NULL; + ADM_transfer_t transfer = NULL; + char* adhoc_path = NULL; */ /* First determine the node on which to launch scord-ctl (typically the * first node of the allocation) */ @@ -578,39 +601,86 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, goto end; } -end: - if(adhoc_path) { - free(adhoc_path); - } - - if(scord_job) { - ADM_job_destroy(scord_job); - } - - if(scord_reqs) { - ADM_job_requirements_destroy(scord_reqs); + // define the environment variables for the job + switch(adhoc_type) { + case ADM_ADHOC_STORAGE_GEKKOFS: + spank_setenv(sp, "ADHOC_TYPE", "gekkofs", 1); + + spank_setenv(sp, "LIBGKFS_HOSTS_FILE", + "/tmp/gekkofs/gkfs_hosts.txt", 1); + break; + case ADM_ADHOC_STORAGE_EXPAND: + spank_setenv(sp, "ADHOC_TYPE", "expand", 1); + break; + case ADM_ADHOC_STORAGE_DATACLAY: + spank_setenv(sp, "ADHOC_TYPE", "dataclay", 1); + break; + case ADM_ADHOC_STORAGE_HERCULES: + spank_setenv(sp, "ADHOC_TYPE", "hercules", 1); + break; } - - if(adhoc_storage) { - ADM_adhoc_storage_destroy(adhoc_storage); - } - - if(adhoc_ctx) { - ADM_adhoc_context_destroy(adhoc_ctx); - } - - if(adhoc_resources) { - ADM_adhoc_resources_destroy(adhoc_resources); - } - - if(job_resources) { - ADM_job_resources_destroy(job_resources); + spank_setenv(sp, "ADHOC_PATH", adhoc_path, 1); + + if(input_datasets_count > 0) { + // divide input_datasets into sources and targets + ADM_dataset_t* sources = + malloc((input_datasets_count) * sizeof(ADM_dataset_t)); + ADM_dataset_t* targets = + malloc((input_datasets_count) * sizeof(ADM_dataset_t)); + ; + + for(unsigned int i = 0; i < input_datasets_count; i++) { + // ADM_dataset_route_list_t r_inputs; + sources[i] = scord_reqs->r_inputs->l_routes[i].d_src; + targets[i] = scord_reqs->r_inputs->l_routes[i].d_dst; + } + // Unfortunaly we have to sleep or cargo will not find the instance up. + sleep(5); + + if(ADM_transfer_datasets( + scord_server, scord_job, sources, input_datasets_count, + targets, input_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE, + &transfer, true) != ADM_SUCCESS) { + slurm_error("%s: adhoc storage transfer failed", plugin_name); + rc = -1; + goto end; + } } - if(scord_server) { - ADM_server_destroy(scord_server); - } +end: + /* if(adhoc_path) { + free(adhoc_path); + } + + if(scord_job) { + ADM_job_destroy(scord_job); + } + + if(scord_reqs) { + ADM_job_requirements_destroy(scord_reqs); + } + + if(adhoc_storage) { + ADM_adhoc_storage_destroy(adhoc_storage); + } + + if(adhoc_ctx) { + ADM_adhoc_context_destroy(adhoc_ctx); + } + + if(adhoc_resources) { + ADM_adhoc_resources_destroy(adhoc_resources); + } + + if(job_resources) { + ADM_job_resources_destroy(job_resources); + } + + if(scord_server) { + ADM_server_destroy(scord_server); + } + */ return rc; } @@ -740,7 +810,7 @@ slurm_spank_user_init(spank_t sp, int ac, char** av) { slurm_debug("%s: %s: job id: %d", plugin_name, __func__, jobid); /* list of job nodes */ - hostlist_t hostlist = get_slurm_hostlist(sp); + hostlist_t POINTER hostlist = get_slurm_hostlist(sp); if(!hostlist) { slurm_error("%s: failed to retrieve hostlist", plugin_name); return -1; @@ -760,7 +830,7 @@ slurm_spank_user_init(spank_t sp, int ac, char** av) { goto cleanup; } - if((ec = scord_register_job(cfg, nodelist, jobid)) != 0) { + if((ec = scord_register_job(sp, cfg, nodelist, jobid)) != 0) { slurm_error("%s: failed to register job with scord", plugin_name); ec = -1; goto cleanup; @@ -776,3 +846,259 @@ cleanup: return ec; } + + +void +remove_dir_content(const char* path) { + struct dirent* de; + char fname[300]; + DIR* dr = opendir(path); + if(dr == NULL) { + return; + } + while((de = readdir(dr)) != NULL) { + int ret = -1; + struct stat statbuf; + sprintf(fname, "%s/%s", path, de->d_name); + if(!strcmp(de->d_name, ".") || !strcmp(de->d_name, "..")) + continue; + if(!stat(fname, &statbuf)) { + if(S_ISDIR(statbuf.st_mode)) { + + ret = unlinkat(dirfd(dr), fname, AT_REMOVEDIR); + if(ret != 0) { + remove_dir_content(fname); + ret = unlinkat(dirfd(dr), fname, AT_REMOVEDIR); + } + } else { + + unlink(fname); + } + } + } + closedir(dr); +} + + +static int +scord_unregister_job(spank_t sp, scord_plugin_config_t cfg, + scord_nodelist_t nodelist, uint32_t jobid) { + (void) sp; + (void) jobid; + int rc = 0; + + /* First determine the node on which to launch scord-ctl (typically the + * first node of the allocation) */ + ADM_node_t ctl_node = scord_nodelist_get_node(nodelist, 0); + cfg.scordctl_info.addr = margo_address_create( + cfg.scordctl_info.proto, ADM_node_get_hostname(ctl_node), + cfg.scordctl_info.port); + + if(!cfg.scordctl_info.addr) { + slurm_error("%s: failed to compute address scordctl server", + plugin_name); + return -1; + } + + /* The Cargo master will also typically reside on the first node of the + * allocation */ + cfg.cargo_info.addr = margo_address_create(cfg.cargo_info.proto, + ADM_node_get_hostname(ctl_node), + cfg.cargo_info.port); + + slurm_debug("%s: %s: scord_info:", plugin_name, __func__); + slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, + cfg.scord_info.addr); + slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, + cfg.scord_info.proto); + slurm_debug("%s: %s: port: %d,", plugin_name, __func__, + cfg.scord_info.port); + + slurm_debug("%s: %s: scordctl_info:", plugin_name, __func__); + slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, + cfg.scordctl_info.addr); + slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, + cfg.scordctl_info.proto); + slurm_debug("%s: %s: port: %d,", plugin_name, __func__, + cfg.scordctl_info.port); + slurm_debug("%s: %s: cargo_info:", plugin_name, __func__); + slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, + cfg.cargo_info.addr); + slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, + cfg.cargo_info.proto); + slurm_debug("%s: %s: port: %d,", plugin_name, __func__, + cfg.cargo_info.port); + + + // Step 1 : Stage-out + + // divide input_datasets into sources and targets + if(output_datasets_count > 0) { + ADM_dataset_t* sources = + malloc((output_datasets_count) * sizeof(ADM_dataset_t)); + ADM_dataset_t* targets = + malloc((output_datasets_count) * sizeof(ADM_dataset_t)); + ; + + for(unsigned int i = 0; i < output_datasets_count; i++) { + // ADM_dataset_route_list_t r_inputs; + sources[i] = scord_reqs->r_outputs->l_routes[i].d_src; + targets[i] = scord_reqs->r_outputs->l_routes[i].d_dst; + } + + + if(ADM_transfer_datasets( + scord_server, scord_job, sources, output_datasets_count, + targets, output_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE, + &transfer, true) != ADM_SUCCESS) { + slurm_error("%s: adhoc storage transfer failed", plugin_name); + rc = -1; + goto end; + } + } + + // remove_adhoc_storage + ADM_terminate_adhoc_storage(scord_server, adhoc_storage); + // ADM_remove_adhoc_storage(scord_server, adhoc_storage); + // remove all the files (this should be done on all the nodes.. TODO) + remove_dir_content(adhoc_path); + rmdir(adhoc_path); + // remove job + ADM_remove_job(scord_server, scord_job); + +end: + if(adhoc_path) { + free(adhoc_path); + } + + if(scord_job) { + ADM_job_destroy(scord_job); + } + + if(scord_reqs) { + ADM_job_requirements_destroy(scord_reqs); + } + + if(adhoc_storage) { + ADM_adhoc_storage_destroy(adhoc_storage); + } + + if(adhoc_ctx) { + ADM_adhoc_context_destroy(adhoc_ctx); + } + + if(adhoc_resources) { + ADM_adhoc_resources_destroy(adhoc_resources); + } + + if(job_resources) { + ADM_job_resources_destroy(job_resources); + } + + if(scord_server) { + ADM_server_destroy(scord_server); + } + + return rc; +} + + +int +slurm_spank_exit(spank_t sp, int ac, char** av) { + + (void) sp; + (void) ac; + (void) av; + + + spank_err_t rc = ESPANK_SUCCESS; + + // spank_context_t sctx = spank_context(); + + + // slurm_debug("%s: %s() registering options", plugin_name, __func__); + + /* register adm/scord options */ + // struct spank_option* opt = &spank_opts[0]; + // while(opt->name) { + // rc = spank_option_register(sp, opt++); + // } + + + /* No ADMIRE options were passed to the job, nothing to do here */ + if(!scord_flag) { + return 0; + } + + /* Get relative for the node executing id. Job registration is only done + * by the node with ID 0 */ + spank_context_t sctx = spank_context(); + if(sctx != S_CTX_REMOTE) return 0; + uint32_t nodeid; + + if((rc = spank_get_item(sp, S_JOB_NODEID, &nodeid)) != ESPANK_SUCCESS) { + slurm_error("%s: failed to get node id: %s", plugin_name, + spank_strerror(rc)); + return -1; + } + + slurm_info("%s: %s: node id: %d", plugin_name, __func__, nodeid); + + if(nodeid != 0) { + return 0; + } + + scord_plugin_config_t cfg = default_cfg; + + if(process_config(ac, av, &cfg) != 0) { + return -1; + } + + /* get job id */ + uint32_t jobid; + + if((rc = spank_get_item(sp, S_JOB_ID, &jobid)) != ESPANK_SUCCESS) { + slurm_info("%s: failed to get jobid: %s", plugin_name, + spank_strerror(rc)); + return -1; + } + + slurm_info("%s: %s: job id: %d", plugin_name, __func__, jobid); + + /* list of job nodes */ + hostlist_t POINTER hostlist = get_slurm_hostlist(sp); + if(!hostlist) { + slurm_info("%s: failed to retrieve hostlist", plugin_name); + return -1; + } + + char buf[256]; + slurm_hostlist_ranged_string(hostlist, sizeof(buf), buf); + slurm_info("%s: %s: hostlist: %s", plugin_name, __func__, buf); + + scord_nodelist_t nodelist = scord_nodelist_create(hostlist); + + int ec; + + if(!nodelist) { + slurm_info("%s: failed to create nodelist", plugin_name); + ec = -1; + goto cleanup; + } + + + // We get here, do stage-out and clean up + + scord_unregister_job(sp, cfg, nodelist, jobid); + + +cleanup: + if(cfg.scordctl_info.addr) { + free((void*) cfg.scordctl_info.addr); + } + + scord_nodelist_destroy(nodelist); + slurm_hostlist_destroy(hostlist); + + return ec; +} \ No newline at end of file diff --git a/plugins/slurm/systemd/cargo@.service.in b/plugins/slurm/systemd/cargo@.service.in index b0bcb46b997224be8b8131e3d615b4fb1dbc166d..0270b9572339ebdcba0d0db764efde28ef397b36 100644 --- a/plugins/slurm/systemd/cargo@.service.in +++ b/plugins/slurm/systemd/cargo@.service.in @@ -7,5 +7,5 @@ EnvironmentFile=%h/.config/cargo/%I.cfg ExecStart=@CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/slurm/cargoctl start -s ${CARGO_ADDRESS} -H ${CARGO_HOSTS} -n ${CARGO_NUM_NODES} ExecStop=@CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/slurm/cargoctl stop -s ${CARGO_ADDRESS} Restart=no -PrivateTmp=true +PrivateTmp=false NoNewPrivileges=true diff --git a/plugins/slurm/utils.c b/plugins/slurm/utils.c index 2107498c256b3f1c27fddec1c2921799de3abfe5..0e428f0cfd70c9749b4de5001e87761ae6ed69f9 100644 --- a/plugins/slurm/utils.c +++ b/plugins/slurm/utils.c @@ -32,7 +32,7 @@ extern const char plugin_name[]; -hostlist_t +hostlist_t POINTER get_slurm_hostlist(spank_t sp) { /* get list of nodes. /!\ at this point env SLURM_NODELIST is @@ -72,7 +72,7 @@ get_slurm_hostlist(spank_t sp) { slurm_debug("%s: SLURM_NODELIST=%s", plugin_name, nodelist); - hostlist_t hl = NULL; + hostlist_t POINTER hl = NULL; hl = slurm_hostlist_create(nodelist); if(!hl) { @@ -84,7 +84,7 @@ get_slurm_hostlist(spank_t sp) { } scord_nodelist_t -scord_nodelist_create(hostlist_t hostlist) { +scord_nodelist_create(hostlist_t POINTER hostlist) { ADM_node_t* nodes = NULL; char* host = NULL; diff --git a/plugins/slurm/utils.h b/plugins/slurm/utils.h index 46ed86864cd50e4eca43e0678c9cd573b20464ca..022683c4e06c64e8d61307d754f6faeb645e2517 100644 --- a/plugins/slurm/utils.h +++ b/plugins/slurm/utils.h @@ -29,8 +29,12 @@ #include #include - -hostlist_t +#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(23,0,0) +#define POINTER * +#else +#define POINTER +#endif +hostlist_t POINTER get_slurm_hostlist(spank_t sp); typedef struct scord_nodelist { @@ -39,7 +43,7 @@ typedef struct scord_nodelist { }* scord_nodelist_t; scord_nodelist_t -scord_nodelist_create(hostlist_t hostlist); +scord_nodelist_create(hostlist_t POINTER hostlist); int scord_nodelist_get_nodecount(scord_nodelist_t nodelist); diff --git a/spack/packages/scord/package.py b/spack/packages/scord/package.py index 279967eb2fc7d15e387dddc3384c28144595a94a..e82c2a315005321f34ca1ac505415709260e0035 100644 --- a/spack/packages/scord/package.py +++ b/spack/packages/scord/package.py @@ -35,7 +35,7 @@ class Scord(CMakePackage): homepage = "https://storage.bsc.es/gitlab/eu/admire/io-scheduler" url = ("https://storage.bsc.es/gitlab/eu/admire/io-scheduler/-/archive/" - "v0.3.4/io-scheduler-v0.3.4.tar.gz") + "v0.3.6/io-scheduler-v0.3.6.tar.gz") git = "https://storage.bsc.es/gitlab/eu/admire/io-scheduler.git" maintainers("alberto-miranda") @@ -57,6 +57,7 @@ class Scord(CMakePackage): version("0.3.3", sha256="a8b5a8d05858bee91b9675ca6c929f4c16b5b2562f4e6a8dba3ce0aacb721f48") version("0.3.4", sha256="e5e6a46d174db266e1caa2689cd17d88a7dc0623429c5efba20a374383f54a12") + version("0.3.6") # build variants variant('build_type', default='Release', diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 1b9f797ae3586f46ea313a8af49bf576c3b99c1d..29cd475b2f0948bafe1ce584e659453ae2b7f681 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -227,8 +227,8 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, ADM_dataset_t sources[], size_t sources_len, ADM_dataset_t targets[], size_t targets_len, ADM_qos_limit_t limits[], size_t limits_len, - ADM_transfer_mapping_t mapping, - ADM_transfer_t* transfer) { + ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, + bool wait = false) { const auto rv = scord::detail::transfer_datasets( scord::server{server}, scord::job{job}, @@ -241,6 +241,36 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, } *transfer = static_cast(rv.value()); + if(wait) { + auto rv_wait = scord::detail::query_transfer( + scord::server{server}, scord::job{job}, + scord::transfer{rv.value()}); + if(!rv_wait) { + if(rv_wait.error().value() == scord::error_code::no_such_entity) + { + return ADM_SUCCESS; + } + else + return rv_wait.error(); + } + auto status = rv_wait.value().status(); + + while(status == scord::transfer_state::type::running or + status == scord::transfer_state::type::queued) { + sleep(5); + rv_wait = scord::detail::query_transfer( + scord::server{server}, scord::job{job}, + scord::transfer{rv.value()}); + if(!rv_wait) { + if(rv_wait.error().value() == scord::error_code::no_such_entity) { + return ADM_SUCCESS; + } + else + return rv_wait.error(); + } + status = rv_wait.value().status(); + } + } return ADM_SUCCESS; } diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h index d3c75164701b2a148945580707a885362e053579..b45effc84f48593befa611bb62a4b03849f5dd7d 100644 --- a/src/lib/scord/scord.h +++ b/src/lib/scord/scord.h @@ -249,7 +249,7 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, ADM_dataset_t sources[], size_t sources_len, ADM_dataset_t targets[], size_t targets_len, ADM_qos_limit_t limits[], size_t limits_len, - ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); + ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait); /** * Sets the obtained bw for the transfer operation diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 009240b6e7ab78dac4d5f1a6b7c7f5f30367e6a8..7a666848681dd9555f23439d46f731962921824e 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -493,9 +493,9 @@ class job_info { public: job_info() = default; - explicit job_info(std::string adhoc_controller_address, + explicit job_info(std::string adhoc_controller_address,std::string uuid, std::uint32_t procs_for_io) - : m_adhoc_address(std::move(adhoc_controller_address)), + : m_adhoc_address(std::move(adhoc_controller_address)), m_uuid(std::move(uuid)), m_procs_for_io(procs_for_io) {} constexpr std::string const& @@ -512,16 +512,23 @@ public: return m_procs_for_io; } + constexpr std::string const& + uuid() const { + return m_uuid; + } + private: friend class cereal::access; template void serialize(Archive& ar) { ar & m_adhoc_address; + ar & m_uuid; ar & m_procs_for_io; } std::string m_adhoc_address; + std::string m_uuid; std::uint32_t m_procs_for_io; }; diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index da3027a350facdd877b58c0cb92c7b78c69462ee..7f91a0af95886faaf547d43b2220c64616832f56 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -52,7 +52,7 @@ dataset_process(std::string id) { type = cargo::dataset::type::parallel; } else if(id.find("gekkofs:") != std::string::npos) { id = id.substr(strlen("gekkofs:")); - type = cargo::dataset::type::posix; + type = cargo::dataset::type::gekkofs; } else if(id.find("hercules:") != std::string::npos) { id = id.substr(strlen("hercules:")); type = cargo::dataset::type::hercules; @@ -62,8 +62,11 @@ dataset_process(std::string id) { } else if(id.find("dataclay:") != std::string::npos) { id = id.substr(strlen("dataclay:")); type = cargo::dataset::type::dataclay; - } else + } else if(id.find("posix:") != std::string::npos) { + id = id.substr(strlen("posix:")); type = cargo::dataset::type::posix; + } + else type = cargo::dataset::type::none; return cargo::dataset{id, type}; } @@ -164,9 +167,13 @@ rpc_server::query(const network::request& req, slurm_job_id job_id) { return tl::make_unexpected( error_code::no_resources); } + + + return job_info{ job_metadata_ptr->adhoc_storage_metadata() ->controller_address(), + job_metadata_ptr->adhoc_storage_metadata()->uuid(), job_metadata_ptr->io_procs()}; }); @@ -846,7 +853,7 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, std::transform(targets.cbegin(), targets.cend(), std::back_inserter(outputs), [](const auto& tgt) { return ::dataset_process(tgt.id()); }); - + const auto cargo_tx = cargo::transfer_datasets(srv, inputs, outputs); // Register the transfer into the `tranfer_manager`.