Commit 7944ed3e authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'rnou/cargo_ioscheduler2' into 'main'

scord i/o Scheduling for cargo

This completes another part of #143.

Creates a argobots thread to send actions to the different cargo/transfer jobs.
When a cargo scheduler is processed, obtained_bw is reset.

It is build from !104 MR.

Another solution could be to be process a single transfer-update and piggyback the action to the cargo thread.

See merge request !105
parents b5b4680e 6d5b0780
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -84,8 +84,8 @@ if(SCORD_BUILD_TESTS)
  set_tests_properties(start_cargo
    PROPERTIES FIXTURES_SETUP cargo)

  add_test(stop_cargo
    ${CMAKE_SOURCE_DIR}/scripts/runner.sh stop TERM cargo.pid)

 add_test(stop_cargo ${CARGO_BIN_INSTALL_DIR}/cargo_shutdown --server ${DATA_STAGER_ADDRESS_STRING})

  set_tests_properties(stop_cargo
    PROPERTIES FIXTURES_CLEANUP cargo)
+17 −2
Original line number Diff line number Diff line
@@ -75,15 +75,30 @@ main(int argc, char* argv[]) {
                server, name, scord::adhoc_storage::type::gekkofs,
                adhoc_storage_ctx, adhoc_resources);


        std::vector <scord::dataset> ins;
        std::vector <scord::dataset> outs;

        scord::dataset in1;
        scord::dataset out1;

        in1 = scord::dataset("lustre:/tmp/input-dataset-1");
        out1 = scord::dataset("gekkofs:/tmp/input-dataset-cp");
 
        ins.push_back (in1);
        outs.push_back (out1);
        
        scord::job::requirements reqs(inputs, outputs, expected_outputs,
                                      adhoc_storage);

        const auto job = scord::register_job(
                server, scord::job::resources{job_nodes}, reqs, 0);

        const auto transfer = scord::transfer_datasets(
                server, job, sources, targets, qos_limits, mapping);
                server, job, ins, outs, qos_limits, mapping);

        // scord::transfer_update(server, transfer.id(), 10.0f);
        
        scord::transfer_update(server, transfer.id(), 10.0f);
        fmt::print(stdout, "ADM_transfer_update() remote procedure completed "
                           "successfully\n");
        exit(EXIT_SUCCESS);
+2 −2
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller
  ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage
  # transfers
  ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority
  ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update
  ADM_cancel_transfer ADM_get_pending_transfers
  # qos
  ADM_set_qos_constraints ADM_get_qos_constraints
  # data operations
@@ -77,7 +77,7 @@ if(SCORD_BUILD_TESTS)
      ${SCORD_CTL_ADDRESS_STRING}
      ${DATA_STAGER_ADDRESS_STRING})
    set_tests_properties(run_${TEST_NAME}
      PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl"
      PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl;cargo"
      ENVIRONMENT "${TEST_ENV}")

    add_test(validate_${TEST_NAME}
+5 −4
Original line number Diff line number Diff line
@@ -83,18 +83,19 @@ class Scord(CMakePackage):
    # specific dependencies
    # v0.2.0+
    depends_on("argobots@1.1", when='@0.2.0:')
    depends_on("mochi-margo@0.9.8", when='@0.2.0:')
    depends_on("mochi-thallium@0.10.1", when='@0.2.0:')
    depends_on("mochi-margo@0.9.8:", when='@0.2.0:')
    depends_on("mochi-thallium@0.10.1:", when='@0.2.0:')
    depends_on("boost@1.71 +program_options", when='@0.2.0:')
    depends_on("redis-plus-plus@1.3.3:", when='@0.2.0:')
    depends_on("cargo@0.3.2:", when='@0.3.1:')

    with when("@0.2.0: +ofi"):
        depends_on("libfabric@1.14.0 fabrics=sockets,tcp,rxm")
        depends_on("mercury@2.1.0 +ofi")
        depends_on("mercury@2.1.0: +ofi")

    with when("@0.2.0: +ucx"):
        depends_on("ucx@1.12.0")
        depends_on("mercury@2.1.0 +ucx")
        depends_on("mercury@2.1.0: +ucx")

    def cmake_args(self):
        """Setup scord CMake arguments"""
+1 −0
Original line number Diff line number Diff line
@@ -391,6 +391,7 @@ server::teardown_and_exit() {

void
server::shutdown() {
    m_shutting_down = true;
    m_network_engine.finalize();
}

Loading