Skip to content
Snippets Groups Projects

adhoc fs integration

Merged Ramon Nou requested to merge rnou/adhoc_integration into main
Files
15
+ 104
43
@@ -34,12 +34,14 @@ struct query_config {
std::string controller_address;
std::string stager_address;
std::uint32_t slurm_id{};
std::uint32_t job_id{};
std::uint32_t adhocid{};
std::string nodes;
std::string adhocfs;
std::string inputs;
std::string outputs;
std::string function;
std::uint32_t qos{};
};
@@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) {
->option_text("CARGOADDRESS")
->required();
app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required();
app.add_option("--job_id", cfg.job_id, "Job ID (for subsequent ops)");
app.add_option("-n,--nodes", cfg.nodes, "Nodes");
@@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) {
app.add_option("-o,--outputs", cfg.outputs, "Output dataset");
app.add_option("-q,--qos", cfg.qos, "QoS MB/s");
app.add_option("-f,--function", cfg.function,
"Function {create, stage-in, stage-out, destroy}")
->required();
@@ -113,6 +118,21 @@ parse_address(const std::string& address) {
return std::make_pair(protocol, address);
}
auto
create_adhoc_type_from_name(const std::string& name) {
if(name == "gekkofs") {
return scord::adhoc_storage::type::gekkofs;
} else if(name == "hercules") {
return scord::adhoc_storage::type::hercules;
} else if(name == "expand") {
return scord::adhoc_storage::type::expand;
} else if(name == "dataclay") {
return scord::adhoc_storage::type::dataclay;
} else {
throw std::runtime_error(
fmt::format("Invalid adhoc fs type: {}", name));
}
}
int
main(int argc, char* argv[]) {
@@ -141,24 +161,12 @@ main(int argc, char* argv[]) {
scord::job::resources job_resources(nodes);
// Step 1b : Define adhoc_storage
/* type needs to process the sctring in cfg.adhocfs */
auto type = scord::adhoc_storage::type::gekkofs;
if(cfg.adhocfs == "gekkofs") {
type = scord::adhoc_storage::type::gekkofs;
} else if(cfg.adhocfs == "hercules") {
type = scord::adhoc_storage::type::hercules;
} else if(cfg.adhocfs == "expand") {
type = scord::adhoc_storage::type::expand;
} else {
throw std::runtime_error(
fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs));
}
auto type = create_adhoc_type_from_name(cfg.adhocfs);
std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id);
scord::adhoc_storage::resources resources{nodes};
scord::adhoc_storage::ctx ctx{
cfg.controller_address,
cfg.stager_address,
@@ -167,33 +175,11 @@ main(int argc, char* argv[]) {
100,
false};
scord::adhoc_storage adhoc_storage = register_adhoc_storage(
srv, adhoc_name, type, ctx, resources);
fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id());
auto path = deploy_adhoc_storage(srv, adhoc_storage);
// Step 1c : Define job_requirements
/*
job::requirements::requirements(
std::vector<scord::dataset_route> inputs,
std::vector<scord::dataset_route> outputs,
std::vector<scord::dataset_route> expected_outputs)
: m_inputs(std::move(inputs)), m_outputs(std::move(outputs)),
m_expected_outputs(std::move(expected_outputs)) {}
job::requirements::requirements(
std::vector<scord::dataset_route> inputs,
std::vector<scord::dataset_route> outputs,
std::vector<scord::dataset_route> expected_outputs,
scord::adhoc_storage adhoc_storage)
*/
fmt::print("{},{}\n", path, adhoc_storage.id());
/* Separate inputs into vector of inputs */
@@ -226,20 +212,95 @@ main(int argc, char* argv[]) {
auto job = scord::register_job(srv, job_resources, job_requirements,
cfg.slurm_id);
return job.id();
}
// Now Tranfer Datasets
if(cfg.function == "stage-in") {
// As the job is registered, we can now transfer datasets. Get
// inputs from requirements (TODO)
// Step 2 : If function is stage-in, transfer datasets
// convert inputs to split inputs (src, dst)
std::vector<scord::dataset> inputs_src, inputs_dst;
for(auto& route : inputs) {
inputs_src.push_back(route.source());
inputs_dst.push_back(route.destination());
auto v_routes_in = split(cfg.inputs, ';');
for(auto& src_dst : v_routes_in) {
auto route = split(src_dst, ',');
inputs_src.push_back(scord::dataset{route[0]});
inputs_dst.push_back(scord::dataset{route[1]});
}
scord::job job(cfg.job_id, cfg.slurm_id);
std::vector<scord::qos::limit> v_qos;
if(cfg.qos) {
scord::qos::limit limit{scord::qos::subclass::bandwidth,
cfg.qos};
v_qos.push_back(limit);
}
scord::transfer_datasets(srv, job, inputs_src, inputs_dst,
std::vector<scord::qos::limit>{},
scord::transfer::mapping::n_to_n);
auto transfer = scord::transfer_datasets(
srv, job, inputs_src, inputs_dst, v_qos,
scord::transfer::mapping::n_to_n);
return transfer.id();
}
if(cfg.function == "wait") {
// Wait for transfer operation to finish
const scord::transfer transfer = scord::transfer{cfg.slurm_id};
scord::job job(cfg.job_id, cfg.slurm_id);
auto response = scord::query_transfer(srv, job, transfer);
while(response.status() != scord::transfer_state::type::finished) {
std::this_thread::sleep_for(1s);
response = scord::query_transfer(srv, job, transfer);
if(scord::transfer_state::type::failed == response.status()) {
fmt::print("Transfer failed\n");
return EXIT_FAILURE;
}
}
}
if(cfg.function == "stage-out") {
// Step 3 : If function is stage-out, transfer datasets
// convert inputs to split inputs (src, dst)
std::vector<scord::dataset> outputs_src, outputs_dst;
auto v_routes_out = split(cfg.outputs, ';');
for(auto& src_dst : v_routes_out) {
auto route = split(src_dst, ',');
outputs_src.push_back(scord::dataset{route[0]});
outputs_dst.push_back(scord::dataset{route[1]});
}
scord::job job(cfg.job_id, cfg.slurm_id);
std::vector<scord::qos::limit> v_qos;
if(cfg.qos) {
scord::qos::limit limit{scord::qos::subclass::bandwidth,
cfg.qos};
v_qos.push_back(limit);
}
auto transfer = scord::transfer_datasets(
srv, job, outputs_src, outputs_dst, v_qos,
scord::transfer::mapping::n_to_n);
return transfer.id();
}
if(cfg.function == "destroy") {
// Step 4 : If function is destroy, terminate adhoc fs server
// Build a scord::adhoc_storage object with the adhocid
auto type = create_adhoc_type_from_name(cfg.adhocfs);
scord::adhoc_storage::resources resources;
scord::adhoc_storage::ctx ctx;
scord::adhoc_storage adhoc_storage(type, "", cfg.adhocid, ctx,
resources);
terminate_adhoc_storage(srv, adhoc_storage);
}
} catch(const std::exception& ex) {
Loading