Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • eu/admire/io-scheduler
1 result
Show changes
Commits on Source (2)
...@@ -112,14 +112,13 @@ config: ...@@ -112,14 +112,13 @@ config:
command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh
start start
--hosts {ADHOC_NODES} --hosts {ADHOC_NODES}
--workdir {ADHOC_DIRECTORY}
--datadir {ADHOC_DIRECTORY}/data --datadir {ADHOC_DIRECTORY}/data
--mountdir {ADHOC_DIRECTORY}/mnt --shareddir {ADHOC_DIRECTORY}/mnt
shutdown: shutdown:
environment: environment:
command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh
stop stop
--workdir {ADHOC_DIRECTORY} --shareddir {ADHOC_DIRECTORY}
expand: expand:
environment: environment:
command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh
......
...@@ -76,7 +76,7 @@ get_opts() { ...@@ -76,7 +76,7 @@ get_opts() {
## default values ## default values
ACTION="" ACTION=""
HOSTLIST=$(hostname -I) HOSTLIST=$(hostname -i)
DATADIR="/tmp/expand/data" DATADIR="/tmp/expand/data"
SHAREDDIR="" SHAREDDIR=""
DEPLOYMENTFILE="" DEPLOYMENTFILE=""
...@@ -94,7 +94,8 @@ if [ -z "$SHAREDDIR" ]; then ...@@ -94,7 +94,8 @@ if [ -z "$SHAREDDIR" ]; then
usage_short usage_short
exit 1 exit 1
fi fi
mkdir -p ${DATADIR}
mkdir -p ${SHAREDDIR}
HOSTFILE=${SHAREDDIR}/hostfile.txt HOSTFILE=${SHAREDDIR}/hostfile.txt
HOSTFILE_REBUILD=${SHAREDDIR}/hostfile_rebuild.txt HOSTFILE_REBUILD=${SHAREDDIR}/hostfile_rebuild.txt
HOSTFILE_START=${SHAREDDIR}/hostfile_start.txt HOSTFILE_START=${SHAREDDIR}/hostfile_start.txt
......
...@@ -44,10 +44,10 @@ ...@@ -44,10 +44,10 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <fcntl.h> #include <fcntl.h>
#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(23,0,0) #if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(23, 0, 0)
#define POINTER * #define POINTER *
#else #else
#define POINTER #define POINTER
#endif #endif
/** /**
* Slurm SPANK plugin to handle the ADMIRE adhoc storage CLI. Options are * Slurm SPANK plugin to handle the ADMIRE adhoc storage CLI. Options are
...@@ -72,6 +72,8 @@ ...@@ -72,6 +72,8 @@
#define TAG_DATASET_OUTPUT 7 #define TAG_DATASET_OUTPUT 7
#define TAG_DATASET_EXPECTED_OUTPUT 8 #define TAG_DATASET_EXPECTED_OUTPUT 8
#define TAG_DATASET_EXPECTED_INOUT_DATASET 9 #define TAG_DATASET_EXPECTED_INOUT_DATASET 9
#define TAG_QOS_LIMIT 10
// clang-format off // clang-format off
SPANK_PLUGIN (admire-cli, 1) SPANK_PLUGIN (admire-cli, 1)
...@@ -93,6 +95,7 @@ ADM_dataset_route_t* expected_output_datasets = NULL; ...@@ -93,6 +95,7 @@ ADM_dataset_route_t* expected_output_datasets = NULL;
size_t expected_output_datasets_count = 0; size_t expected_output_datasets_count = 0;
ADM_dataset_route_t* expected_inout_datasets = NULL; ADM_dataset_route_t* expected_inout_datasets = NULL;
size_t expected_inout_datasets_count = 0; size_t expected_inout_datasets_count = 0;
int64_t limit = 0;
/* server-related options */ /* server-related options */
typedef struct { typedef struct {
...@@ -241,6 +244,9 @@ struct spank_option spank_opts[] = { ...@@ -241,6 +244,9 @@ struct spank_option spank_opts[] = {
TAG_DATASET_EXPECTED_INOUT_DATASET, /* option tag */ TAG_DATASET_EXPECTED_INOUT_DATASET, /* option tag */
(spank_opt_cb_f) process_opts /* callback */ (spank_opt_cb_f) process_opts /* callback */
}, },
{"adm-qos-limit", "qos-limit-bw",
"Define the qos limit for the tranfer operation.", 1, TAG_QOS_LIMIT,
(spank_opt_cb_f) process_opts},
SPANK_OPTIONS_TABLE_END}; SPANK_OPTIONS_TABLE_END};
int int
...@@ -364,6 +370,18 @@ process_opts(int tag, const char* optarg, int remote) { ...@@ -364,6 +370,18 @@ process_opts(int tag, const char* optarg, int remote) {
return -1; return -1;
} }
return 0; return 0;
case TAG_QOS_LIMIT:
char* endptr;
errno = 0;
limit = 0;
limit = strtol(optarg, &endptr, 0);
if(errno != 0 || endptr == optarg || *endptr != '\0' ||
limit <= 0) {
return -1;
}
return 0;
default: default:
return -1; return -1;
...@@ -634,13 +652,21 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg, ...@@ -634,13 +652,21 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg,
sources[i] = scord_reqs->r_inputs->l_routes[i].d_src; sources[i] = scord_reqs->r_inputs->l_routes[i].d_src;
targets[i] = scord_reqs->r_inputs->l_routes[i].d_dst; targets[i] = scord_reqs->r_inputs->l_routes[i].d_dst;
} }
// Unfortunaly we have to sleep or cargo will not find the instance up. // Unfortunaly we have to sleep or cargo will not find the instance
// up.
sleep(5); sleep(5);
if(ADM_transfer_datasets( int nlimit = 0;
if(limit > 0) {
nlimit = 1;
}
slurm_info("%s: prepared limits %ld", plugin_name, limit);
if(ADM_transfer_datasets_1(
scord_server, scord_job, sources, input_datasets_count, scord_server, scord_job, sources, input_datasets_count,
targets, input_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE, targets, input_datasets_count, limit, nlimit,
&transfer, true) != ADM_SUCCESS) { ADM_MAPPING_ONE_TO_ONE, &transfer, true) != ADM_SUCCESS) {
slurm_error("%s: adhoc storage transfer failed", plugin_name); slurm_error("%s: adhoc storage transfer failed", plugin_name);
rc = -1; rc = -1;
goto end; goto end;
...@@ -685,9 +711,9 @@ end: ...@@ -685,9 +711,9 @@ end:
} }
/** /**
* Called just after plugins are loaded. In remote context, this is just after * Called just after plugins are loaded. In remote context, this is just
* job step is initialized. This function is called before any plugin option * after job step is initialized. This function is called before any plugin
* processing. * option processing.
* *
* ┌-----------------------┐ * ┌-----------------------┐
* | Command | Context | * | Command | Context |
...@@ -727,8 +753,8 @@ slurm_spank_init(spank_t sp, int ac, char** av) { ...@@ -727,8 +753,8 @@ slurm_spank_init(spank_t sp, int ac, char** av) {
/** /**
* Called in local context only after all options have been processed. * Called in local context only after all options have been processed.
* This is called after the job ID and step IDs are available. This happens in * This is called after the job ID and step IDs are available. This happens
* `srun` after the allocation is made, but before tasks are launched. * in `srun` after the allocation is made, but before tasks are launched.
* *
* ┌-----------------------┐ * ┌-----------------------┐
* | Command | Context | * | Command | Context |
...@@ -946,11 +972,17 @@ scord_unregister_job(spank_t sp, scord_plugin_config_t cfg, ...@@ -946,11 +972,17 @@ scord_unregister_job(spank_t sp, scord_plugin_config_t cfg,
targets[i] = scord_reqs->r_outputs->l_routes[i].d_dst; targets[i] = scord_reqs->r_outputs->l_routes[i].d_dst;
} }
int nlimit = 0;
if(ADM_transfer_datasets( if(limit > 0) {
nlimit = 1;
}
if(ADM_transfer_datasets_1(
scord_server, scord_job, sources, output_datasets_count, scord_server, scord_job, sources, output_datasets_count,
targets, output_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE, targets, output_datasets_count, limit, nlimit,
&transfer, true) != ADM_SUCCESS) { ADM_MAPPING_ONE_TO_ONE, &transfer, true) != ADM_SUCCESS) {
slurm_error("%s: adhoc storage transfer failed", plugin_name); slurm_error("%s: adhoc storage transfer failed", plugin_name);
rc = -1; rc = -1;
goto end; goto end;
...@@ -1016,7 +1048,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) { ...@@ -1016,7 +1048,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) {
// spank_context_t sctx = spank_context(); // spank_context_t sctx = spank_context();
// slurm_debug("%s: %s() registering options", plugin_name, __func__); // slurm_debug("%s: %s() registering options", plugin_name,
// __func__);
/* register adm/scord options */ /* register adm/scord options */
// struct spank_option* opt = &spank_opts[0]; // struct spank_option* opt = &spank_opts[0];
...@@ -1033,7 +1066,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) { ...@@ -1033,7 +1066,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) {
/* Get relative for the node executing id. Job registration is only done /* Get relative for the node executing id. Job registration is only done
* by the node with ID 0 */ * by the node with ID 0 */
spank_context_t sctx = spank_context(); spank_context_t sctx = spank_context();
if(sctx != S_CTX_REMOTE) return 0; if(sctx != S_CTX_REMOTE)
return 0;
uint32_t nodeid; uint32_t nodeid;
if((rc = spank_get_item(sp, S_JOB_NODEID, &nodeid)) != ESPANK_SUCCESS) { if((rc = spank_get_item(sp, S_JOB_NODEID, &nodeid)) != ESPANK_SUCCESS) {
......
...@@ -58,6 +58,16 @@ convert(ADM_qos_limit_t limits[], size_t limits_len) { ...@@ -58,6 +58,16 @@ convert(ADM_qos_limit_t limits[], size_t limits_len) {
return rv; return rv;
} }
std::vector<scord::qos::limit>
convert(uint64_t limit, size_t limits_len) {
std::vector<scord::qos::limit> rv(limits_len);
rv[0] = scord::qos::limit{scord::qos::subclass::bandwidth, limit};
return rv;
}
} // namespace } // namespace
...@@ -246,11 +256,60 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, ...@@ -246,11 +256,60 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
scord::server{server}, scord::job{job}, scord::server{server}, scord::job{job},
scord::transfer{rv.value()}); scord::transfer{rv.value()});
if(!rv_wait) { if(!rv_wait) {
if(rv_wait.error().value() == scord::error_code::no_such_entity) if(rv_wait.error().value() == scord::error_code::no_such_entity) {
{
return ADM_SUCCESS; return ADM_SUCCESS;
} else
return rv_wait.error();
}
auto status = rv_wait.value().status();
while(status == scord::transfer_state::type::running or
status == scord::transfer_state::type::queued) {
sleep(5);
rv_wait = scord::detail::query_transfer(
scord::server{server}, scord::job{job},
scord::transfer{rv.value()});
if(!rv_wait) {
if(rv_wait.error().value() ==
scord::error_code::no_such_entity) {
return ADM_SUCCESS;
} else
return rv_wait.error();
} }
else status = rv_wait.value().status();
}
}
return ADM_SUCCESS;
}
ADM_return_t
ADM_transfer_datasets_1(ADM_server_t server, ADM_job_t job,
ADM_dataset_t sources[], size_t sources_len,
ADM_dataset_t targets[], size_t targets_len,
uint64_t limit, size_t limits_len,
ADM_transfer_mapping_t mapping,
ADM_transfer_t* transfer, bool wait = false) {
const auto rv = scord::detail::transfer_datasets(
scord::server{server}, scord::job{job},
::convert(sources, sources_len), ::convert(targets, targets_len),
::convert(limit, limits_len),
static_cast<scord::transfer::mapping>(mapping));
if(!rv) {
return rv.error();
}
*transfer = static_cast<ADM_transfer_t>(rv.value());
if(wait) {
auto rv_wait = scord::detail::query_transfer(
scord::server{server}, scord::job{job},
scord::transfer{rv.value()});
if(!rv_wait) {
if(rv_wait.error().value() == scord::error_code::no_such_entity) {
return ADM_SUCCESS;
} else
return rv_wait.error(); return rv_wait.error();
} }
auto status = rv_wait.value().status(); auto status = rv_wait.value().status();
...@@ -262,10 +321,10 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, ...@@ -262,10 +321,10 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
scord::server{server}, scord::job{job}, scord::server{server}, scord::job{job},
scord::transfer{rv.value()}); scord::transfer{rv.value()});
if(!rv_wait) { if(!rv_wait) {
if(rv_wait.error().value() == scord::error_code::no_such_entity) { if(rv_wait.error().value() ==
scord::error_code::no_such_entity) {
return ADM_SUCCESS; return ADM_SUCCESS;
} } else
else
return rv_wait.error(); return rv_wait.error();
} }
status = rv_wait.value().status(); status = rv_wait.value().status();
......
...@@ -251,6 +251,14 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, ...@@ -251,6 +251,14 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
ADM_qos_limit_t limits[], size_t limits_len, ADM_qos_limit_t limits[], size_t limits_len,
ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait); ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait);
ADM_return_t
ADM_transfer_datasets_1(ADM_server_t server, ADM_job_t job,
ADM_dataset_t sources[], size_t sources_len,
ADM_dataset_t targets[], size_t targets_len,
uint64_t limit, size_t limits_len,
ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait);
/** /**
* Sets the obtained bw for the transfer operation * Sets the obtained bw for the transfer operation
* *
......