Commit f462bf65 authored by Ramon Nou's avatar Ramon Nou
Browse files

Add query_transfer function to scord library

Add ADM_transfer_status_create and ADM_transfer_status_destroy functions
to types.c
parent 097a659b
Loading
Loading
Loading
Loading
Loading
+104 −43
Original line number Diff line number Diff line
@@ -34,12 +34,14 @@ struct query_config {
    std::string controller_address;
    std::string stager_address;
    std::uint32_t slurm_id{};
    std::uint32_t job_id{};
    std::uint32_t adhocid{};
    std::string nodes;
    std::string adhocfs;
    std::string inputs;
    std::string outputs;
    std::string function;
    std::uint32_t qos{};
};


@@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) {
            ->option_text("CARGOADDRESS")
            ->required();
    app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required();
    app.add_option("--job_id", cfg.job_id, "Job ID (for subsequent ops)");

    app.add_option("-n,--nodes", cfg.nodes, "Nodes");

@@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) {

    app.add_option("-o,--outputs", cfg.outputs, "Output dataset");

    app.add_option("-q,--qos", cfg.qos, "QoS MB/s");

    app.add_option("-f,--function", cfg.function,
                   "Function {create, stage-in, stage-out, destroy}")
            ->required();
@@ -113,6 +118,21 @@ parse_address(const std::string& address) {
    return std::make_pair(protocol, address);
}

auto
create_adhoc_type_from_name(const std::string& name) {
    if(name == "gekkofs") {
        return scord::adhoc_storage::type::gekkofs;
    } else if(name == "hercules") {
        return scord::adhoc_storage::type::hercules;
    } else if(name == "expand") {
        return scord::adhoc_storage::type::expand;
    } else if(name == "dataclay") {
        return scord::adhoc_storage::type::dataclay;
    } else {
        throw std::runtime_error(
                fmt::format("Invalid adhoc fs type: {}", name));
    }
}

int
main(int argc, char* argv[]) {
@@ -141,24 +161,12 @@ main(int argc, char* argv[]) {
            scord::job::resources job_resources(nodes);

            // Step 1b : Define adhoc_storage
            /* type needs to process the sctring in cfg.adhocfs */
            auto type = scord::adhoc_storage::type::gekkofs;
            if(cfg.adhocfs == "gekkofs") {
                type = scord::adhoc_storage::type::gekkofs;
            } else if(cfg.adhocfs == "hercules") {
                type = scord::adhoc_storage::type::hercules;
            } else if(cfg.adhocfs == "expand") {
                type = scord::adhoc_storage::type::expand;
            } else {
                throw std::runtime_error(
                        fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs));
            }

            auto type = create_adhoc_type_from_name(cfg.adhocfs);

            std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id);
            scord::adhoc_storage::resources resources{nodes};


            scord::adhoc_storage::ctx ctx{
                    cfg.controller_address,
                    cfg.stager_address,
@@ -167,33 +175,11 @@ main(int argc, char* argv[]) {
                    100,
                    false};


            scord::adhoc_storage adhoc_storage = register_adhoc_storage(
                    srv, adhoc_name, type, ctx, resources);


            fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id());

            auto path = deploy_adhoc_storage(srv, adhoc_storage);


            // Step 1c : Define job_requirements
            /*

            job::requirements::requirements(
            std::vector<scord::dataset_route> inputs,
            std::vector<scord::dataset_route> outputs,
            std::vector<scord::dataset_route> expected_outputs)
        : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)),
          m_expected_outputs(std::move(expected_outputs)) {}

    job::requirements::requirements(
            std::vector<scord::dataset_route> inputs,
            std::vector<scord::dataset_route> outputs,
            std::vector<scord::dataset_route> expected_outputs,
            scord::adhoc_storage adhoc_storage)
            */

            fmt::print("{}\n", path);

            /* Separate inputs into vector of inputs */

@@ -226,20 +212,95 @@ main(int argc, char* argv[]) {
            auto job = scord::register_job(srv, job_resources, job_requirements,
                                           cfg.slurm_id);

            return job.id();
        }

            // Now Tranfer Datasets
        if(cfg.function == "stage-in") {
            // As the job is registered, we can now transfer datasets. Get
            // inputs from requirements (TODO)

            // Step 2 : If function is stage-in, transfer datasets
            // convert inputs to split inputs (src, dst)
            std::vector<scord::dataset> inputs_src, inputs_dst;

            for(auto& route : inputs) {
                inputs_src.push_back(route.source());
                inputs_dst.push_back(route.destination());
            auto v_routes_in = split(cfg.inputs, ';');

            for(auto& src_dst : v_routes_in) {
                auto route = split(src_dst, ',');

                inputs_src.push_back(scord::dataset{route[0]});
                inputs_dst.push_back(scord::dataset{route[1]});
            }

            scord::job job(cfg.job_id, cfg.slurm_id);
            std::vector<scord::qos::limit> v_qos;
            if(cfg.qos) {
                scord::qos::limit limit{scord::qos::subclass::bandwidth,
                                        cfg.qos};
                v_qos.push_back(limit);
            }

            scord::transfer_datasets(srv, job, inputs_src, inputs_dst,
                                     std::vector<scord::qos::limit>{},
            auto transfer = scord::transfer_datasets(
                    srv, job, inputs_src, inputs_dst, v_qos,
                    scord::transfer::mapping::n_to_n);
            return transfer.id();
        }

        if(cfg.function == "wait") {
            // Wait for transfer operation to finish
            const scord::transfer transfer = scord::transfer{cfg.slurm_id};
            scord::job job(cfg.job_id, cfg.slurm_id);

            auto response = scord::query_transfer(srv, job, transfer);

            while(response.status() != scord::transfer_state::type::finished) {
                std::this_thread::sleep_for(1s);
                response = scord::query_transfer(srv, job, transfer);
                if(scord::transfer_state::type::failed == response.status()) {
                    fmt::print("Transfer failed\n");
                    return EXIT_FAILURE;
                }
            }
        }

        if(cfg.function == "stage-out") {
            // Step 3 : If function is stage-out, transfer datasets
            // convert inputs to split inputs (src, dst)
            std::vector<scord::dataset> outputs_src, outputs_dst;

            auto v_routes_out = split(cfg.outputs, ';');

            for(auto& src_dst : v_routes_out) {
                auto route = split(src_dst, ',');

                outputs_src.push_back(scord::dataset{route[0]});
                outputs_dst.push_back(scord::dataset{route[1]});
            }

            scord::job job(cfg.job_id, cfg.slurm_id);
            std::vector<scord::qos::limit> v_qos;
            if(cfg.qos) {
                scord::qos::limit limit{scord::qos::subclass::bandwidth,
                                        cfg.qos};
                v_qos.push_back(limit);
            }

            auto transfer = scord::transfer_datasets(
                    srv, job, outputs_src, outputs_dst, v_qos,
                    scord::transfer::mapping::n_to_n);
            return transfer.id();
        }

        if(cfg.function == "destroy") {
            // Step 4 : If function is destroy, terminate adhoc fs server
            // Build a scord::adhoc_storage object with the adhocid
            auto type = create_adhoc_type_from_name(cfg.adhocfs);
            scord::adhoc_storage::resources resources;
            scord::adhoc_storage::ctx ctx;

            scord::adhoc_storage adhoc_storage(type, "", cfg.adhocid, ctx,
                                               resources);
            terminate_adhoc_storage(srv, adhoc_storage);
        }

    } catch(const std::exception& ex) {
+26 −0
Original line number Diff line number Diff line
#!/usr/bin/env bash
echo "GEKKOFS Script Called"

# example of a script that can be called by the adhoc service
# [2023-11-23 09:37:32.583868] [scord-ctl] [2199567] [info] rpc => id: 0 name: "ADM_deploy_adhoc_storage" from: "ofi+tcp;ofi_rxm://127.0.0.1:52000" body: {uuid: "gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5", type: ADM_ADHOC_STORAGE_GEKKOFS, resources: {nodes: [{hostname: "broadwell-001", type: regular}, {hostname: "broadwell-002", type: regular}, {hostname: "broadwell-003", type: regular}, {hostname: "broadwell-004", type: regular}]}}
# option: start --hosts "broadwell-001,broadwell-002,broadwell-003,broadwell-004" --workdir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5 --datadir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5/data --mountdir /tmp/gekkofs/gekkofs-JR4ny5xHMhmlwh6KqThfYt71IaoR9cH5/mnt

# code to count the number of elements in a comma separated list environment variable (called $nodes) and store in $num_nodes

if ($1 == "start") then
    echo "Starting GEKKOFS"
    $nodes = $3
    num_nodes=$(echo $nodes | awk -F, '{print NF-1}')

    $workdir = $5
    $datadir = $7
    $mountdir = $9

    mkdir -p $5
    srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" & 
else if ($1 == "stop") then
    echo "Stopping GEKKOFS"
    srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-task=1 --export=ALL bash -c "pkill -9 gkfs_daemon"
else
    echo "Unknown command"
    exit 1


exit 0
+37 −0
Original line number Diff line number Diff line
@@ -531,4 +531,41 @@ transfer_datasets(const server& srv, const job& job,
}


tl::expected<transfer_state, error_code>
query_transfer(const server& srv, const job& job, const transfer& transfer) {

    using response_type = network::response_with_value<transfer_state>;

    network::client rpc_client{srv.protocol()};

    const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address());

    if(const auto lookup_rv = rpc_client.lookup(srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO("rpc {:<} body: {{job_id: {}, tx_id: {}}}", rpc, job.id(), transfer.id());

        if(const auto call_rv = endp.call(rpc.name(), job.id(), transfer.id());
           call_rv.has_value()) {

            const response_type resp{call_rv.value()};

            LOGGER_EVAL(
                    resp.error_code(), INFO, ERROR,
                    "rpc {:>} body: {{retval: {}, tx_state: {}}} [op_id: {}]",
                    rpc, resp.error_code(), resp.value(), resp.op_id());

            if(!resp.error_code()) {
                return tl::make_unexpected(resp.error_code());
            }

            return resp.value();
        }
    }

    LOGGER_ERROR("rpc call failed");
    return tl::make_unexpected(scord::error_code::other);
}

} // namespace scord::detail
+4 −0
Original line number Diff line number Diff line
@@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job,
                  const std::vector<qos::limit>& limits,
                  transfer::mapping mapping);

tl::expected<transfer_state, error_code>
query_transfer(const server& srv, const job& job, const transfer& transfer);



} // namespace scord::detail

+12 −0
Original line number Diff line number Diff line
@@ -377,6 +377,18 @@ transfer_datasets(const server& srv, const job& job,
    return rv.value();
}

scord::transfer_state 
query_transfer(const server& srv, const job& job, const transfer& transfer) {

    const auto rv = detail::query_transfer(srv, job, transfer);

    if(!rv) {
        throw std::runtime_error(fmt::format(
                "ADM_query_transfer() error: {}", ADM_strerror(rv.error())));
    }

    return rv.value();
}

ADM_return_t
set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,
Loading