Skip to content
Snippets Groups Projects

adhoc fs integration

Merged Ramon Nou requested to merge rnou/adhoc_integration into main
Files
15
+ 104
43
@@ -34,12 +34,14 @@ struct query_config {
@@ -34,12 +34,14 @@ struct query_config {
std::string controller_address;
std::string controller_address;
std::string stager_address;
std::string stager_address;
std::uint32_t slurm_id{};
std::uint32_t slurm_id{};
 
std::uint32_t job_id{};
std::uint32_t adhocid{};
std::uint32_t adhocid{};
std::string nodes;
std::string nodes;
std::string adhocfs;
std::string adhocfs;
std::string inputs;
std::string inputs;
std::string outputs;
std::string outputs;
std::string function;
std::string function;
 
std::uint32_t qos{};
};
};
@@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) {
@@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) {
->option_text("CARGOADDRESS")
->option_text("CARGOADDRESS")
->required();
->required();
app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required();
app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required();
 
app.add_option("--job_id", cfg.job_id, "Job ID (for subsequent ops)");
app.add_option("-n,--nodes", cfg.nodes, "Nodes");
app.add_option("-n,--nodes", cfg.nodes, "Nodes");
@@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) {
@@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) {
app.add_option("-o,--outputs", cfg.outputs, "Output dataset");
app.add_option("-o,--outputs", cfg.outputs, "Output dataset");
 
app.add_option("-q,--qos", cfg.qos, "QoS MB/s");
 
app.add_option("-f,--function", cfg.function,
app.add_option("-f,--function", cfg.function,
"Function {create, stage-in, stage-out, destroy}")
"Function {create, stage-in, stage-out, destroy}")
->required();
->required();
@@ -113,6 +118,21 @@ parse_address(const std::string& address) {
@@ -113,6 +118,21 @@ parse_address(const std::string& address) {
return std::make_pair(protocol, address);
return std::make_pair(protocol, address);
}
}
 
auto
 
create_adhoc_type_from_name(const std::string& name) {
 
if(name == "gekkofs") {
 
return scord::adhoc_storage::type::gekkofs;
 
} else if(name == "hercules") {
 
return scord::adhoc_storage::type::hercules;
 
} else if(name == "expand") {
 
return scord::adhoc_storage::type::expand;
 
} else if(name == "dataclay") {
 
return scord::adhoc_storage::type::dataclay;
 
} else {
 
throw std::runtime_error(
 
fmt::format("Invalid adhoc fs type: {}", name));
 
}
 
}
int
int
main(int argc, char* argv[]) {
main(int argc, char* argv[]) {
@@ -141,24 +161,12 @@ main(int argc, char* argv[]) {
@@ -141,24 +161,12 @@ main(int argc, char* argv[]) {
scord::job::resources job_resources(nodes);
scord::job::resources job_resources(nodes);
// Step 1b : Define adhoc_storage
// Step 1b : Define adhoc_storage
/* type needs to process the sctring in cfg.adhocfs */
auto type = scord::adhoc_storage::type::gekkofs;
if(cfg.adhocfs == "gekkofs") {
type = scord::adhoc_storage::type::gekkofs;
} else if(cfg.adhocfs == "hercules") {
type = scord::adhoc_storage::type::hercules;
} else if(cfg.adhocfs == "expand") {
type = scord::adhoc_storage::type::expand;
} else {
throw std::runtime_error(
fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs));
}
 
auto type = create_adhoc_type_from_name(cfg.adhocfs);
std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id);
std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id);
scord::adhoc_storage::resources resources{nodes};
scord::adhoc_storage::resources resources{nodes};
scord::adhoc_storage::ctx ctx{
scord::adhoc_storage::ctx ctx{
cfg.controller_address,
cfg.controller_address,
cfg.stager_address,
cfg.stager_address,
@@ -167,33 +175,11 @@ main(int argc, char* argv[]) {
@@ -167,33 +175,11 @@ main(int argc, char* argv[]) {
100,
100,
false};
false};
scord::adhoc_storage adhoc_storage = register_adhoc_storage(
scord::adhoc_storage adhoc_storage = register_adhoc_storage(
srv, adhoc_name, type, ctx, resources);
srv, adhoc_name, type, ctx, resources);
fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id());
auto path = deploy_adhoc_storage(srv, adhoc_storage);
auto path = deploy_adhoc_storage(srv, adhoc_storage);
fmt::print("{},{}\n", path, adhoc_storage.id());
// Step 1c : Define job_requirements
/*
job::requirements::requirements(
std::vector<scord::dataset_route> inputs,
std::vector<scord::dataset_route> outputs,
std::vector<scord::dataset_route> expected_outputs)
: m_inputs(std::move(inputs)), m_outputs(std::move(outputs)),
m_expected_outputs(std::move(expected_outputs)) {}
job::requirements::requirements(
std::vector<scord::dataset_route> inputs,
std::vector<scord::dataset_route> outputs,
std::vector<scord::dataset_route> expected_outputs,
scord::adhoc_storage adhoc_storage)
*/
/* Separate inputs into vector of inputs */
/* Separate inputs into vector of inputs */
@@ -226,20 +212,95 @@ main(int argc, char* argv[]) {
@@ -226,20 +212,95 @@ main(int argc, char* argv[]) {
auto job = scord::register_job(srv, job_resources, job_requirements,
auto job = scord::register_job(srv, job_resources, job_requirements,
cfg.slurm_id);
cfg.slurm_id);
 
return job.id();
 
}
// Now Tranfer Datasets
if(cfg.function == "stage-in") {
 
// As the job is registered, we can now transfer datasets. Get
 
// inputs from requirements (TODO)
 
// Step 2 : If function is stage-in, transfer datasets
// convert inputs to split inputs (src, dst)
// convert inputs to split inputs (src, dst)
std::vector<scord::dataset> inputs_src, inputs_dst;
std::vector<scord::dataset> inputs_src, inputs_dst;
for(auto& route : inputs) {
auto v_routes_in = split(cfg.inputs, ';');
inputs_src.push_back(route.source());
inputs_dst.push_back(route.destination());
for(auto& src_dst : v_routes_in) {
 
auto route = split(src_dst, ',');
 
 
inputs_src.push_back(scord::dataset{route[0]});
 
inputs_dst.push_back(scord::dataset{route[1]});
 
}
 
 
scord::job job(cfg.job_id, cfg.slurm_id);
 
std::vector<scord::qos::limit> v_qos;
 
if(cfg.qos) {
 
scord::qos::limit limit{scord::qos::subclass::bandwidth,
 
cfg.qos};
 
v_qos.push_back(limit);
}
}
scord::transfer_datasets(srv, job, inputs_src, inputs_dst,
auto transfer = scord::transfer_datasets(
std::vector<scord::qos::limit>{},
srv, job, inputs_src, inputs_dst, v_qos,
scord::transfer::mapping::n_to_n);
scord::transfer::mapping::n_to_n);
 
return transfer.id();
 
}
 
 
if(cfg.function == "wait") {
 
// Wait for transfer operation to finish
 
const scord::transfer transfer = scord::transfer{cfg.slurm_id};
 
scord::job job(cfg.job_id, cfg.slurm_id);
 
 
auto response = scord::query_transfer(srv, job, transfer);
 
 
while(response.status() != scord::transfer_state::type::finished) {
 
std::this_thread::sleep_for(1s);
 
response = scord::query_transfer(srv, job, transfer);
 
if(scord::transfer_state::type::failed == response.status()) {
 
fmt::print("Transfer failed\n");
 
return EXIT_FAILURE;
 
}
 
}
 
}
 
 
if(cfg.function == "stage-out") {
 
// Step 3 : If function is stage-out, transfer datasets
 
// convert inputs to split inputs (src, dst)
 
std::vector<scord::dataset> outputs_src, outputs_dst;
 
 
auto v_routes_out = split(cfg.outputs, ';');
 
 
for(auto& src_dst : v_routes_out) {
 
auto route = split(src_dst, ',');
 
 
outputs_src.push_back(scord::dataset{route[0]});
 
outputs_dst.push_back(scord::dataset{route[1]});
 
}
 
 
scord::job job(cfg.job_id, cfg.slurm_id);
 
std::vector<scord::qos::limit> v_qos;
 
if(cfg.qos) {
 
scord::qos::limit limit{scord::qos::subclass::bandwidth,
 
cfg.qos};
 
v_qos.push_back(limit);
 
}
 
 
auto transfer = scord::transfer_datasets(
 
srv, job, outputs_src, outputs_dst, v_qos,
 
scord::transfer::mapping::n_to_n);
 
return transfer.id();
 
}
 
 
if(cfg.function == "destroy") {
 
// Step 4 : If function is destroy, terminate adhoc fs server
 
// Build a scord::adhoc_storage object with the adhocid
 
auto type = create_adhoc_type_from_name(cfg.adhocfs);
 
scord::adhoc_storage::resources resources;
 
scord::adhoc_storage::ctx ctx;
 
 
scord::adhoc_storage adhoc_storage(type, "", cfg.adhocid, ctx,
 
resources);
 
terminate_adhoc_storage(srv, adhoc_storage);
}
}
} catch(const std::exception& ex) {
} catch(const std::exception& ex) {
Loading