Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • eu/admire/io-scheduler
1 result
Show changes
Commits on Source (2)
......@@ -112,14 +112,13 @@ config:
command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh
start
--hosts {ADHOC_NODES}
--workdir {ADHOC_DIRECTORY}
--datadir {ADHOC_DIRECTORY}/data
--mountdir {ADHOC_DIRECTORY}/mnt
--shareddir {ADHOC_DIRECTORY}/mnt
shutdown:
environment:
command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh
stop
--workdir {ADHOC_DIRECTORY}
--shareddir {ADHOC_DIRECTORY}
expand:
environment:
command: @CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/adhoc_services.d/expand.sh
......
......@@ -76,7 +76,7 @@ get_opts() {
## default values
ACTION=""
HOSTLIST=$(hostname -I)
HOSTLIST=$(hostname -i)
DATADIR="/tmp/expand/data"
SHAREDDIR=""
DEPLOYMENTFILE=""
......@@ -94,7 +94,8 @@ if [ -z "$SHAREDDIR" ]; then
usage_short
exit 1
fi
mkdir -p ${DATADIR}
mkdir -p ${SHAREDDIR}
HOSTFILE=${SHAREDDIR}/hostfile.txt
HOSTFILE_REBUILD=${SHAREDDIR}/hostfile_rebuild.txt
HOSTFILE_START=${SHAREDDIR}/hostfile_start.txt
......
......@@ -44,10 +44,10 @@
#include <sys/stat.h>
#include <fcntl.h>
#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(23,0,0)
#if SLURM_VERSION_NUMBER > SLURM_VERSION_NUM(23, 0, 0)
#define POINTER *
#else
#define POINTER
#define POINTER
#endif
/**
* Slurm SPANK plugin to handle the ADMIRE adhoc storage CLI. Options are
......@@ -72,6 +72,8 @@
#define TAG_DATASET_OUTPUT 7
#define TAG_DATASET_EXPECTED_OUTPUT 8
#define TAG_DATASET_EXPECTED_INOUT_DATASET 9
#define TAG_QOS_LIMIT 10
// clang-format off
SPANK_PLUGIN (admire-cli, 1)
......@@ -93,6 +95,7 @@ ADM_dataset_route_t* expected_output_datasets = NULL;
size_t expected_output_datasets_count = 0;
ADM_dataset_route_t* expected_inout_datasets = NULL;
size_t expected_inout_datasets_count = 0;
int64_t limit = 0;
/* server-related options */
typedef struct {
......@@ -241,6 +244,9 @@ struct spank_option spank_opts[] = {
TAG_DATASET_EXPECTED_INOUT_DATASET, /* option tag */
(spank_opt_cb_f) process_opts /* callback */
},
{"adm-qos-limit", "qos-limit-bw",
"Define the qos limit for the tranfer operation.", 1, TAG_QOS_LIMIT,
(spank_opt_cb_f) process_opts},
SPANK_OPTIONS_TABLE_END};
int
......@@ -364,6 +370,18 @@ process_opts(int tag, const char* optarg, int remote) {
return -1;
}
return 0;
case TAG_QOS_LIMIT:
char* endptr;
errno = 0;
limit = 0;
limit = strtol(optarg, &endptr, 0);
if(errno != 0 || endptr == optarg || *endptr != '\0' ||
limit <= 0) {
return -1;
}
return 0;
default:
return -1;
......@@ -634,13 +652,21 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg,
sources[i] = scord_reqs->r_inputs->l_routes[i].d_src;
targets[i] = scord_reqs->r_inputs->l_routes[i].d_dst;
}
// Unfortunaly we have to sleep or cargo will not find the instance up.
// Unfortunaly we have to sleep or cargo will not find the instance
// up.
sleep(5);
if(ADM_transfer_datasets(
int nlimit = 0;
if(limit > 0) {
nlimit = 1;
}
slurm_info("%s: prepared limits %ld", plugin_name, limit);
if(ADM_transfer_datasets_1(
scord_server, scord_job, sources, input_datasets_count,
targets, input_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE,
&transfer, true) != ADM_SUCCESS) {
targets, input_datasets_count, limit, nlimit,
ADM_MAPPING_ONE_TO_ONE, &transfer, true) != ADM_SUCCESS) {
slurm_error("%s: adhoc storage transfer failed", plugin_name);
rc = -1;
goto end;
......@@ -685,9 +711,9 @@ end:
}
/**
* Called just after plugins are loaded. In remote context, this is just after
* job step is initialized. This function is called before any plugin option
* processing.
* Called just after plugins are loaded. In remote context, this is just
* after job step is initialized. This function is called before any plugin
* option processing.
*
* ┌-----------------------┐
* | Command | Context |
......@@ -727,8 +753,8 @@ slurm_spank_init(spank_t sp, int ac, char** av) {
/**
* Called in local context only after all options have been processed.
* This is called after the job ID and step IDs are available. This happens in
* `srun` after the allocation is made, but before tasks are launched.
* This is called after the job ID and step IDs are available. This happens
* in `srun` after the allocation is made, but before tasks are launched.
*
* ┌-----------------------┐
* | Command | Context |
......@@ -946,11 +972,17 @@ scord_unregister_job(spank_t sp, scord_plugin_config_t cfg,
targets[i] = scord_reqs->r_outputs->l_routes[i].d_dst;
}
int nlimit = 0;
if(ADM_transfer_datasets(
if(limit > 0) {
nlimit = 1;
}
if(ADM_transfer_datasets_1(
scord_server, scord_job, sources, output_datasets_count,
targets, output_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE,
&transfer, true) != ADM_SUCCESS) {
targets, output_datasets_count, limit, nlimit,
ADM_MAPPING_ONE_TO_ONE, &transfer, true) != ADM_SUCCESS) {
slurm_error("%s: adhoc storage transfer failed", plugin_name);
rc = -1;
goto end;
......@@ -1016,7 +1048,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) {
// spank_context_t sctx = spank_context();
// slurm_debug("%s: %s() registering options", plugin_name, __func__);
// slurm_debug("%s: %s() registering options", plugin_name,
// __func__);
/* register adm/scord options */
// struct spank_option* opt = &spank_opts[0];
......@@ -1033,7 +1066,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) {
/* Get relative for the node executing id. Job registration is only done
* by the node with ID 0 */
spank_context_t sctx = spank_context();
if(sctx != S_CTX_REMOTE) return 0;
if(sctx != S_CTX_REMOTE)
return 0;
uint32_t nodeid;
if((rc = spank_get_item(sp, S_JOB_NODEID, &nodeid)) != ESPANK_SUCCESS) {
......
......@@ -58,6 +58,16 @@ convert(ADM_qos_limit_t limits[], size_t limits_len) {
return rv;
}
std::vector<scord::qos::limit>
convert(uint64_t limit, size_t limits_len) {
std::vector<scord::qos::limit> rv(limits_len);
rv[0] = scord::qos::limit{scord::qos::subclass::bandwidth, limit};
return rv;
}
} // namespace
......@@ -246,11 +256,60 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
scord::server{server}, scord::job{job},
scord::transfer{rv.value()});
if(!rv_wait) {
if(rv_wait.error().value() == scord::error_code::no_such_entity)
{
if(rv_wait.error().value() == scord::error_code::no_such_entity) {
return ADM_SUCCESS;
} else
return rv_wait.error();
}
auto status = rv_wait.value().status();
while(status == scord::transfer_state::type::running or
status == scord::transfer_state::type::queued) {
sleep(5);
rv_wait = scord::detail::query_transfer(
scord::server{server}, scord::job{job},
scord::transfer{rv.value()});
if(!rv_wait) {
if(rv_wait.error().value() ==
scord::error_code::no_such_entity) {
return ADM_SUCCESS;
} else
return rv_wait.error();
}
else
status = rv_wait.value().status();
}
}
return ADM_SUCCESS;
}
ADM_return_t
ADM_transfer_datasets_1(ADM_server_t server, ADM_job_t job,
ADM_dataset_t sources[], size_t sources_len,
ADM_dataset_t targets[], size_t targets_len,
uint64_t limit, size_t limits_len,
ADM_transfer_mapping_t mapping,
ADM_transfer_t* transfer, bool wait = false) {
const auto rv = scord::detail::transfer_datasets(
scord::server{server}, scord::job{job},
::convert(sources, sources_len), ::convert(targets, targets_len),
::convert(limit, limits_len),
static_cast<scord::transfer::mapping>(mapping));
if(!rv) {
return rv.error();
}
*transfer = static_cast<ADM_transfer_t>(rv.value());
if(wait) {
auto rv_wait = scord::detail::query_transfer(
scord::server{server}, scord::job{job},
scord::transfer{rv.value()});
if(!rv_wait) {
if(rv_wait.error().value() == scord::error_code::no_such_entity) {
return ADM_SUCCESS;
} else
return rv_wait.error();
}
auto status = rv_wait.value().status();
......@@ -262,10 +321,10 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
scord::server{server}, scord::job{job},
scord::transfer{rv.value()});
if(!rv_wait) {
if(rv_wait.error().value() == scord::error_code::no_such_entity) {
if(rv_wait.error().value() ==
scord::error_code::no_such_entity) {
return ADM_SUCCESS;
}
else
} else
return rv_wait.error();
}
status = rv_wait.value().status();
......
......@@ -251,6 +251,14 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
ADM_qos_limit_t limits[], size_t limits_len,
ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait);
ADM_return_t
ADM_transfer_datasets_1(ADM_server_t server, ADM_job_t job,
ADM_dataset_t sources[], size_t sources_len,
ADM_dataset_t targets[], size_t targets_len,
uint64_t limit, size_t limits_len,
ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait);
/**
* Sets the obtained bw for the transfer operation
*
......