Commit 8264dbed authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'rnou/adhoc_integration' into 'main'

adhoc fs integration

Solves #162 
Solves #161

See merge request !124
parents 097a659b 2ee85b08
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19)

project(
  scord
  VERSION 0.3.3
  VERSION 0.3.4
  LANGUAGES C CXX
)

+104 −43
Original line number Diff line number Diff line
@@ -34,12 +34,14 @@ struct query_config {
    std::string controller_address;
    std::string stager_address;
    std::uint32_t slurm_id{};
    std::uint32_t job_id{};
    std::uint32_t adhocid{};
    std::string nodes;
    std::string adhocfs;
    std::string inputs;
    std::string outputs;
    std::string function;
    std::uint32_t qos{};
};


@@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) {
            ->option_text("CARGOADDRESS")
            ->required();
    app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required();
    app.add_option("--job_id", cfg.job_id, "Job ID (for subsequent ops)");

    app.add_option("-n,--nodes", cfg.nodes, "Nodes");

@@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) {

    app.add_option("-o,--outputs", cfg.outputs, "Output dataset");

    app.add_option("-q,--qos", cfg.qos, "QoS MB/s");

    app.add_option("-f,--function", cfg.function,
                   "Function {create, stage-in, stage-out, destroy}")
            ->required();
@@ -113,6 +118,21 @@ parse_address(const std::string& address) {
    return std::make_pair(protocol, address);
}

auto
create_adhoc_type_from_name(const std::string& name) {
    if(name == "gekkofs") {
        return scord::adhoc_storage::type::gekkofs;
    } else if(name == "hercules") {
        return scord::adhoc_storage::type::hercules;
    } else if(name == "expand") {
        return scord::adhoc_storage::type::expand;
    } else if(name == "dataclay") {
        return scord::adhoc_storage::type::dataclay;
    } else {
        throw std::runtime_error(
                fmt::format("Invalid adhoc fs type: {}", name));
    }
}

int
main(int argc, char* argv[]) {
@@ -141,24 +161,12 @@ main(int argc, char* argv[]) {
            scord::job::resources job_resources(nodes);

            // Step 1b : Define adhoc_storage
            /* type needs to process the sctring in cfg.adhocfs */
            auto type = scord::adhoc_storage::type::gekkofs;
            if(cfg.adhocfs == "gekkofs") {
                type = scord::adhoc_storage::type::gekkofs;
            } else if(cfg.adhocfs == "hercules") {
                type = scord::adhoc_storage::type::hercules;
            } else if(cfg.adhocfs == "expand") {
                type = scord::adhoc_storage::type::expand;
            } else {
                throw std::runtime_error(
                        fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs));
            }

            auto type = create_adhoc_type_from_name(cfg.adhocfs);

            std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id);
            scord::adhoc_storage::resources resources{nodes};


            scord::adhoc_storage::ctx ctx{
                    cfg.controller_address,
                    cfg.stager_address,
@@ -167,33 +175,11 @@ main(int argc, char* argv[]) {
                    100,
                    false};


            scord::adhoc_storage adhoc_storage = register_adhoc_storage(
                    srv, adhoc_name, type, ctx, resources);


            fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id());

            auto path = deploy_adhoc_storage(srv, adhoc_storage);


            // Step 1c : Define job_requirements
            /*

            job::requirements::requirements(
            std::vector<scord::dataset_route> inputs,
            std::vector<scord::dataset_route> outputs,
            std::vector<scord::dataset_route> expected_outputs)
        : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)),
          m_expected_outputs(std::move(expected_outputs)) {}

    job::requirements::requirements(
            std::vector<scord::dataset_route> inputs,
            std::vector<scord::dataset_route> outputs,
            std::vector<scord::dataset_route> expected_outputs,
            scord::adhoc_storage adhoc_storage)
            */

            fmt::print("{},{}\n", path, adhoc_storage.id());

            /* Separate inputs into vector of inputs */

@@ -226,20 +212,95 @@ main(int argc, char* argv[]) {
            auto job = scord::register_job(srv, job_resources, job_requirements,
                                           cfg.slurm_id);

            return job.id();
        }

            // Now Tranfer Datasets
        if(cfg.function == "stage-in") {
            // As the job is registered, we can now transfer datasets. Get
            // inputs from requirements (TODO)

            // Step 2 : If function is stage-in, transfer datasets
            // convert inputs to split inputs (src, dst)
            std::vector<scord::dataset> inputs_src, inputs_dst;

            for(auto& route : inputs) {
                inputs_src.push_back(route.source());
                inputs_dst.push_back(route.destination());
            auto v_routes_in = split(cfg.inputs, ';');

            for(auto& src_dst : v_routes_in) {
                auto route = split(src_dst, ',');

                inputs_src.push_back(scord::dataset{route[0]});
                inputs_dst.push_back(scord::dataset{route[1]});
            }

            scord::job job(cfg.job_id, cfg.slurm_id);
            std::vector<scord::qos::limit> v_qos;
            if(cfg.qos) {
                scord::qos::limit limit{scord::qos::subclass::bandwidth,
                                        cfg.qos};
                v_qos.push_back(limit);
            }

            scord::transfer_datasets(srv, job, inputs_src, inputs_dst,
                                     std::vector<scord::qos::limit>{},
            auto transfer = scord::transfer_datasets(
                    srv, job, inputs_src, inputs_dst, v_qos,
                    scord::transfer::mapping::n_to_n);
            return transfer.id();
        }

        if(cfg.function == "wait") {
            // Wait for transfer operation to finish
            const scord::transfer transfer = scord::transfer{cfg.slurm_id};
            scord::job job(cfg.job_id, cfg.slurm_id);

            auto response = scord::query_transfer(srv, job, transfer);

            while(response.status() != scord::transfer_state::type::finished) {
                std::this_thread::sleep_for(1s);
                response = scord::query_transfer(srv, job, transfer);
                if(scord::transfer_state::type::failed == response.status()) {
                    fmt::print("Transfer failed\n");
                    return EXIT_FAILURE;
                }
            }
        }

        if(cfg.function == "stage-out") {
            // Step 3 : If function is stage-out, transfer datasets
            // convert inputs to split inputs (src, dst)
            std::vector<scord::dataset> outputs_src, outputs_dst;

            auto v_routes_out = split(cfg.outputs, ';');

            for(auto& src_dst : v_routes_out) {
                auto route = split(src_dst, ',');

                outputs_src.push_back(scord::dataset{route[0]});
                outputs_dst.push_back(scord::dataset{route[1]});
            }

            scord::job job(cfg.job_id, cfg.slurm_id);
            std::vector<scord::qos::limit> v_qos;
            if(cfg.qos) {
                scord::qos::limit limit{scord::qos::subclass::bandwidth,
                                        cfg.qos};
                v_qos.push_back(limit);
            }

            auto transfer = scord::transfer_datasets(
                    srv, job, outputs_src, outputs_dst, v_qos,
                    scord::transfer::mapping::n_to_n);
            return transfer.id();
        }

        if(cfg.function == "destroy") {
            // Step 4 : If function is destroy, terminate adhoc fs server
            // Build a scord::adhoc_storage object with the adhocid
            auto type = create_adhoc_type_from_name(cfg.adhocfs);
            scord::adhoc_storage::resources resources;
            scord::adhoc_storage::ctx ctx;

            scord::adhoc_storage adhoc_storage(type, "", cfg.adhocid, ctx,
                                               resources);
            terminate_adhoc_storage(srv, adhoc_storage);
        }

    } catch(const std::exception& ex) {
+37 −2
Original line number Diff line number Diff line
#!/usr/bin/env bash
echo "GEKKOFS Script Called"
#!/usr/bin/bash
echo "GEKKOFS Script Called" $HOSTNAME $SLURM_JOBID


if [ "$1" == "start" ]; then
    echo "Starting GEKKOFS"

    nodes=$3
    num_nodes=$(echo $nodes | awk -F, '{print NF}')
    # If num_nodes is greater than 40, we are on the testing environment
    if [ $num_nodes -gt 40 ]; then
        exit 0
    fi
    workdir=$5
    datadir=$7
    mountdir=$9
    unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE
    srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "mkdir -p $mountdir; mkdir -p $datadir" 
    srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=4 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" &
    sleep 4
elif [ "$1" == "stop" ]; then
    echo "Stopping GEKKOFS"
    
    nodes=$3
    num_nodes=$(echo $nodes | awk -F, '{print NF}')
    # If num_nodes is greater than 40, we are on the testing environment
    if [ $num_nodes -gt 40 ]; then
        exit 0
    fi
    unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE
    srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 pkill -9 gkfs_daemon
elif [ "$1" == "expand" ]; then
    echo "Expand command"
elif [ "$1" == "shrink" ]; then
    echo "shrink command"
fi

exit 0
+2 −1
Original line number Diff line number Diff line
@@ -56,7 +56,7 @@ class Scord(CMakePackage):
            sha256="74c51915315e01d8479701d340331641f42c5f5cfae0c08bdea6c2f0b01da665")
    version("0.3.3",
            sha256="a8b5a8d05858bee91b9675ca6c929f4c16b5b2562f4e6a8dba3ce0aacb721f48")

    version("0.3.4", branch="rnou/adhoc_integration")
    # build variants
    variant('build_type',
            default='Release',
@@ -92,6 +92,7 @@ class Scord(CMakePackage):
    depends_on("boost@1.71 +program_options", when='@0.2.0:')
    depends_on("redis-plus-plus@1.3.3:", when='@0.2.0:')
    depends_on("cargo@0.3.3:", when='@0.3.1:')
    depends_on("cargo@0.3.4:", when='@0.3.4:')
    depends_on("slurm", when='@0.3.1:')


+37 −0
Original line number Diff line number Diff line
@@ -531,4 +531,41 @@ transfer_datasets(const server& srv, const job& job,
}


tl::expected<transfer_state, error_code>
query_transfer(const server& srv, const job& job, const transfer& transfer) {

    using response_type = network::response_with_value<transfer_state>;

    network::client rpc_client{srv.protocol()};

    const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address());

    if(const auto lookup_rv = rpc_client.lookup(srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO("rpc {:<} body: {{job_id: {}, tx_id: {}}}", rpc, job.id(), transfer.id());

        if(const auto call_rv = endp.call(rpc.name(), job.id(), transfer.id());
           call_rv.has_value()) {

            const response_type resp{call_rv.value()};

            LOGGER_EVAL(
                    resp.error_code(), INFO, ERROR,
                    "rpc {:>} body: {{retval: {}, tx_state: {}}} [op_id: {}]",
                    rpc, resp.error_code(), resp.value(), resp.op_id());

            if(!resp.error_code()) {
                return tl::make_unexpected(resp.error_code());
            }

            return resp.value();
        }
    }

    LOGGER_ERROR("rpc call failed");
    return tl::make_unexpected(scord::error_code::other);
}

} // namespace scord::detail
Loading