Loading src/CMakeLists.txt +18 −2 Original line number Diff line number Diff line Loading @@ -26,6 +26,23 @@ add_subdirectory(logger) add_subdirectory(net) add_subdirectory(posix_file) # New static library for request management logic add_library(request_manager_lib STATIC request_manager.cpp parallel_request.cpp ) target_include_directories(request_manager_lib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ) target_link_libraries(request_manager_lib PUBLIC cargo logger::logger tl::expected Argobots::Argobots fmt::fmt ) set_property(TARGET request_manager_lib PROPERTY POSITION_INDEPENDENT_CODE ON) ## The main executable for the Cargo data stager add_executable(cargo_server) Loading @@ -49,9 +66,7 @@ target_sources( worker/worker.hpp env.hpp mpioxx.hpp parallel_request.cpp parallel_request.hpp request_manager.cpp request_manager.hpp shared_mutex.hpp proto/rpc/response.hpp Loading @@ -76,6 +91,7 @@ target_link_libraries( Boost::serialization Boost::mpi posix_file request_manager_lib ) set_target_properties(cargo_server PROPERTIES OUTPUT_NAME "cargo") Loading tests/CMakeLists.txt +64 −25 Original line number Diff line number Diff line Loading @@ -4,43 +4,70 @@ # This software was partially supported by the EuroHPC-funded project ADMIRE # # (Project ID: 956748, https://www.admire-eurohpc.eu). # # # # This file is part of cargo. # # This file is part of Cargo. # # # # cargo is free software: you can redistribute it and/or modify # # Cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # cargo is distributed in the hope that it will be useful, # # Cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with cargo. If not, see <https://www.gnu.org/licenses/>. # # along with Cargo. If not, see <https://www.gnu.org/licenses/>. # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ include(Catch) add_executable(tests) # --- Test Artifact Directory Setup (3.1) --- set(CARGO_TESTS_DIRECTORY "${CMAKE_BINARY_DIR}/test_run_dir") # This command will be attached as a setup fixture to ensure a clean state add_custom_command( OUTPUT ${CARGO_TESTS_DIRECTORY}/.placeholder COMMAND ${CMAKE_COMMAND} -E remove_directory ${CARGO_TESTS_DIRECTORY} COMMAND ${CMAKE_COMMAND} -E make_directory ${CARGO_TESTS_DIRECTORY} COMMAND ${CMAKE_COMMAND} -E touch ${CARGO_TESTS_DIRECTORY}/.placeholder COMMENT "Recreating test artifact directory" ) add_custom_target( clean_test_dir ALL DEPENDS ${CARGO_TESTS_DIRECTORY}/.placeholder ) set_property(TARGET clean_test_dir PROPERTY CTEST_SETUP_FIXTURES clean_env) # --- Unit Tests (1.1) --- add_executable(request_manager_tests) target_sources( tests PRIVATE tests.cpp posix_file_tests.cpp common.hpp common.cpp request_manager_tests PRIVATE request_manager_tests.cpp common.cpp ) target_link_libraries( tests PUBLIC Catch2::Catch2 Boost::iostreams fmt::fmt cargo posix_file request_manager_tests PRIVATE Catch2::Catch2WithMain fmt::fmt cargo posix_file Argobots::Argobots request_manager_lib ) target_compile_definitions(request_manager_tests PRIVATE TEST_RUN_DIR="${CARGO_TESTS_DIRECTORY}") add_test(NAME unit_request_manager COMMAND request_manager_tests) set_tests_properties(unit_request_manager PROPERTIES LABELS "unit") # prepare the environment for the Cargo daemon set(CARGO_TESTS_DIRECTORY "${CMAKE_BINARY_DIR}/Testing") file(MAKE_DIRECTORY ${CARGO_TESTS_DIRECTORY}) set(TEST_DIRECTORY "${CARGO_TESTS_DIRECTORY}/cargo_server") file(MAKE_DIRECTORY ${TEST_DIRECTORY}) # --- Integration Tests --- add_executable(integration_tests) target_sources( integration_tests PRIVATE tests.cpp posix_file_tests.cpp common.cpp ) target_link_libraries( integration_tests PUBLIC Catch2::Catch2 Boost::iostreams fmt::fmt cargo posix_file ) target_compile_definitions(integration_tests PRIVATE TEST_RUN_DIR="${CARGO_TESTS_DIRECTORY}") # --- Test Fixtures for Integration Tests --- set(CARGO_ADDRESS ${CARGO_TRANSPORT_PROTOCOL}://${CARGO_BIND_ADDRESS}:${CARGO_BIND_PORT}) Loading @@ -48,21 +75,33 @@ add_test(NAME start_cargo COMMAND ${CMAKE_SOURCE_DIR}/scripts/runner.sh start /dev/null ${MPIEXEC} ${MPIEXEC_NUMPROC_FLAG} 4 $<TARGET_FILE:cargo_server> -l ${CARGO_ADDRESS} -o ${TEST_DIRECTORY}/cargo.log $<TARGET_FILE:cargo_server> -l ${CARGO_ADDRESS} -o ${CARGO_TESTS_DIRECTORY}/cargo.log ) set_tests_properties(start_cargo PROPERTIES FIXTURES_SETUP "clean_env;cargo_daemon") set_tests_properties(start_cargo PROPERTIES FIXTURES_SETUP cargo_daemon) add_test(NAME stop_cargo COMMAND cargo_shutdown --server ${CARGO_ADDRESS} ) set_tests_properties(stop_cargo PROPERTIES FIXTURES_CLEANUP cargo_daemon) add_test(NAME stop_cargo COMMAND cargo_shutdown --server ${CARGO_ADDRESS}) set_tests_properties(stop_cargo PROPERTIES FIXTURES_CLEANUP cargo_daemon) # Discover and run integration tests catch_discover_tests( tests EXTRA_ARGS "-S ${CARGO_TRANSPORT_PROTOCOL}://${CARGO_BIND_ADDRESS}:${CARGO_BIND_PORT}" PROPERTIES FIXTURES_REQUIRED cargo_daemon integration_tests EXTRA_ARGS "-S ${CARGO_ADDRESS}" PROPERTIES FIXTURES_REQUIRED "cargo_daemon" LABELS "integration" ) # --- Failure Condition Tests (2.1) --- add_executable(failure_tests) target_sources( failure_tests PRIVATE failure_tests.cpp common.cpp ) target_link_libraries( failure_tests PRIVATE Catch2::Catch2WithMain fmt::fmt cargo ) target_compile_definitions(failure_tests PRIVATE TEST_RUN_DIR="${CARGO_TESTS_DIRECTORY}") add_test(NAME failure_conditions COMMAND failure_tests) set_tests_properties(failure_conditions PROPERTIES LABELS "failure") No newline at end of file tests/common.cpp +7 −2 Original line number Diff line number Diff line Loading @@ -2,13 +2,18 @@ #include "common.hpp" #include <filesystem> // TEST_RUN_DIR is defined by CMake in tests/CMakeLists.txt const std::string test_run_dir = TEST_RUN_DIR; std::vector<cargo::dataset> prepare_datasets(cargo::dataset::type type, const std::string& pattern, size_t n) { std::vector<cargo::dataset> datasets; datasets.reserve(n); for(size_t i = 0; i < n; ++i) { datasets.emplace_back(std::filesystem::current_path().string()+"/"+fmt::format(fmt::runtime(pattern), i), type); std::filesystem::path file_path = test_run_dir; file_path /= fmt::format(fmt::runtime(pattern), i); datasets.emplace_back(file_path.string(), type); } return datasets; Loading tests/failure_tests.cpp 0 → 100644 +44 −0 Original line number Diff line number Diff line #include <catch2/catch_test_macros.hpp> #include <cargo/cargo.hpp> #include <vector> #include <tuple> using namespace cargo; SCENARIO("Client API handles connection errors", "[failure][client]") { GIVEN("A server address where no server is running") { cargo::server server("ofi+tcp://127.0.0.1:9999"); WHEN("a transfer is planned") { THEN("it throws a runtime_error") { std::vector<dataset> sources; std::vector<dataset> targets; REQUIRE_THROWS_AS(cargo::plan_transfer_datasets(server, sources, targets), std::runtime_error); } } } } SCENARIO("Client API handles transfer errors", "[failure][transfer]") { GIVEN("A valid server connection") { // This test requires a running server, but we will test the client-side logic // by simulating a server response indicating failure. For a real integration test, // we would need to mock the server or trigger a real failure. // Here we test the logic of asking for a non-existent file. // This requires a running server fixture, so we can't do it in a 'failure' labeled test // without more complex test setup. We will assume for now that if the server returns // an error, the client will propagate it. // A more complete test would be: // 1. Start server // 2. auto tx = cargo::transfer_dataset(server, non_existent_source, valid_target); // 3. auto status = tx.wait(); // 4. REQUIRE(status.failed()); // 5. REQUIRE(status.error() != error_code::success); SUCCEED("Placeholder for future integration failure tests."); } } No newline at end of file tests/request_manager_tests.cpp 0 → 100644 +83 −0 Original line number Diff line number Diff line #include <catch2/catch_test_macros.hpp> #include <catch2/matchers/catch_matchers_all.hpp> #include <chrono> #include "../src/request_manager.hpp" using namespace cargo; using namespace std::chrono_literals; SCENARIO("Request manager handles transfer state aggregation", "[request_manager][unit]") { GIVEN("A request manager and a new transfer request") { request_manager rm; size_t nworkers = 2; auto req_exp = rm.create(nworkers); REQUIRE(req_exp.has_value()); auto req = req_exp.value(); auto tid = req.tid(); WHEN("The transfer has files but no status updates have arrived") { size_t nfiles = 2; std::vector<std::size_t> sizes = {1024, 1024}; rm.update(tid, nfiles, sizes); THEN("The overall status is pending") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::pending); REQUIRE(status_exp->bytes_transferred() == 0); REQUIRE(status_exp->total_bytes() == 2048); } } WHEN("One worker starts processing one file") { rm.update(tid, 1, {1024}); rm.update(tid, 0, 0, "file0", transfer_state::running, 50.0f, 200); THEN("The overall status is running") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::running); REQUIRE(status_exp->bytes_transferred() == 200); } } WHEN("All parts of a file are complete") { rm.update(tid, 1, {1024}); rm.update(tid, 0, 0, "file0", transfer_state::completed, 0.0f, 512); rm.update(tid, 0, 1, "file0", transfer_state::completed, 0.0f, 512); THEN("The overall status is completed") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::completed); REQUIRE(status_exp->bytes_transferred() == 1024); REQUIRE(status_exp->error() == error_code::success); } } WHEN("One worker reports a failure") { rm.update(tid, 1, {1024}); rm.update(tid, 0, 0, "file0", transfer_state::completed, 0.0f, 512); rm.update(tid, 0, 1, "file0", transfer_state::failed, 0.0f, 0, error_code::snafu); THEN("The overall status is failed") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::failed); REQUIRE(status_exp->error() == error_code::snafu); } } WHEN("One file is running and another is pending") { rm.update(tid, 2, {1024, 2048}); rm.update(tid, 0, 0, "file0", transfer_state::running, 50.0f, 100); THEN("The overall status is running") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::running); } } } } No newline at end of file Loading
src/CMakeLists.txt +18 −2 Original line number Diff line number Diff line Loading @@ -26,6 +26,23 @@ add_subdirectory(logger) add_subdirectory(net) add_subdirectory(posix_file) # New static library for request management logic add_library(request_manager_lib STATIC request_manager.cpp parallel_request.cpp ) target_include_directories(request_manager_lib PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ) target_link_libraries(request_manager_lib PUBLIC cargo logger::logger tl::expected Argobots::Argobots fmt::fmt ) set_property(TARGET request_manager_lib PROPERTY POSITION_INDEPENDENT_CODE ON) ## The main executable for the Cargo data stager add_executable(cargo_server) Loading @@ -49,9 +66,7 @@ target_sources( worker/worker.hpp env.hpp mpioxx.hpp parallel_request.cpp parallel_request.hpp request_manager.cpp request_manager.hpp shared_mutex.hpp proto/rpc/response.hpp Loading @@ -76,6 +91,7 @@ target_link_libraries( Boost::serialization Boost::mpi posix_file request_manager_lib ) set_target_properties(cargo_server PROPERTIES OUTPUT_NAME "cargo") Loading
tests/CMakeLists.txt +64 −25 Original line number Diff line number Diff line Loading @@ -4,43 +4,70 @@ # This software was partially supported by the EuroHPC-funded project ADMIRE # # (Project ID: 956748, https://www.admire-eurohpc.eu). # # # # This file is part of cargo. # # This file is part of Cargo. # # # # cargo is free software: you can redistribute it and/or modify # # Cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # # cargo is distributed in the hope that it will be useful, # # Cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # # along with cargo. If not, see <https://www.gnu.org/licenses/>. # # along with Cargo. If not, see <https://www.gnu.org/licenses/>. # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ include(Catch) add_executable(tests) # --- Test Artifact Directory Setup (3.1) --- set(CARGO_TESTS_DIRECTORY "${CMAKE_BINARY_DIR}/test_run_dir") # This command will be attached as a setup fixture to ensure a clean state add_custom_command( OUTPUT ${CARGO_TESTS_DIRECTORY}/.placeholder COMMAND ${CMAKE_COMMAND} -E remove_directory ${CARGO_TESTS_DIRECTORY} COMMAND ${CMAKE_COMMAND} -E make_directory ${CARGO_TESTS_DIRECTORY} COMMAND ${CMAKE_COMMAND} -E touch ${CARGO_TESTS_DIRECTORY}/.placeholder COMMENT "Recreating test artifact directory" ) add_custom_target( clean_test_dir ALL DEPENDS ${CARGO_TESTS_DIRECTORY}/.placeholder ) set_property(TARGET clean_test_dir PROPERTY CTEST_SETUP_FIXTURES clean_env) # --- Unit Tests (1.1) --- add_executable(request_manager_tests) target_sources( tests PRIVATE tests.cpp posix_file_tests.cpp common.hpp common.cpp request_manager_tests PRIVATE request_manager_tests.cpp common.cpp ) target_link_libraries( tests PUBLIC Catch2::Catch2 Boost::iostreams fmt::fmt cargo posix_file request_manager_tests PRIVATE Catch2::Catch2WithMain fmt::fmt cargo posix_file Argobots::Argobots request_manager_lib ) target_compile_definitions(request_manager_tests PRIVATE TEST_RUN_DIR="${CARGO_TESTS_DIRECTORY}") add_test(NAME unit_request_manager COMMAND request_manager_tests) set_tests_properties(unit_request_manager PROPERTIES LABELS "unit") # prepare the environment for the Cargo daemon set(CARGO_TESTS_DIRECTORY "${CMAKE_BINARY_DIR}/Testing") file(MAKE_DIRECTORY ${CARGO_TESTS_DIRECTORY}) set(TEST_DIRECTORY "${CARGO_TESTS_DIRECTORY}/cargo_server") file(MAKE_DIRECTORY ${TEST_DIRECTORY}) # --- Integration Tests --- add_executable(integration_tests) target_sources( integration_tests PRIVATE tests.cpp posix_file_tests.cpp common.cpp ) target_link_libraries( integration_tests PUBLIC Catch2::Catch2 Boost::iostreams fmt::fmt cargo posix_file ) target_compile_definitions(integration_tests PRIVATE TEST_RUN_DIR="${CARGO_TESTS_DIRECTORY}") # --- Test Fixtures for Integration Tests --- set(CARGO_ADDRESS ${CARGO_TRANSPORT_PROTOCOL}://${CARGO_BIND_ADDRESS}:${CARGO_BIND_PORT}) Loading @@ -48,21 +75,33 @@ add_test(NAME start_cargo COMMAND ${CMAKE_SOURCE_DIR}/scripts/runner.sh start /dev/null ${MPIEXEC} ${MPIEXEC_NUMPROC_FLAG} 4 $<TARGET_FILE:cargo_server> -l ${CARGO_ADDRESS} -o ${TEST_DIRECTORY}/cargo.log $<TARGET_FILE:cargo_server> -l ${CARGO_ADDRESS} -o ${CARGO_TESTS_DIRECTORY}/cargo.log ) set_tests_properties(start_cargo PROPERTIES FIXTURES_SETUP "clean_env;cargo_daemon") set_tests_properties(start_cargo PROPERTIES FIXTURES_SETUP cargo_daemon) add_test(NAME stop_cargo COMMAND cargo_shutdown --server ${CARGO_ADDRESS} ) set_tests_properties(stop_cargo PROPERTIES FIXTURES_CLEANUP cargo_daemon) add_test(NAME stop_cargo COMMAND cargo_shutdown --server ${CARGO_ADDRESS}) set_tests_properties(stop_cargo PROPERTIES FIXTURES_CLEANUP cargo_daemon) # Discover and run integration tests catch_discover_tests( tests EXTRA_ARGS "-S ${CARGO_TRANSPORT_PROTOCOL}://${CARGO_BIND_ADDRESS}:${CARGO_BIND_PORT}" PROPERTIES FIXTURES_REQUIRED cargo_daemon integration_tests EXTRA_ARGS "-S ${CARGO_ADDRESS}" PROPERTIES FIXTURES_REQUIRED "cargo_daemon" LABELS "integration" ) # --- Failure Condition Tests (2.1) --- add_executable(failure_tests) target_sources( failure_tests PRIVATE failure_tests.cpp common.cpp ) target_link_libraries( failure_tests PRIVATE Catch2::Catch2WithMain fmt::fmt cargo ) target_compile_definitions(failure_tests PRIVATE TEST_RUN_DIR="${CARGO_TESTS_DIRECTORY}") add_test(NAME failure_conditions COMMAND failure_tests) set_tests_properties(failure_conditions PROPERTIES LABELS "failure") No newline at end of file
tests/common.cpp +7 −2 Original line number Diff line number Diff line Loading @@ -2,13 +2,18 @@ #include "common.hpp" #include <filesystem> // TEST_RUN_DIR is defined by CMake in tests/CMakeLists.txt const std::string test_run_dir = TEST_RUN_DIR; std::vector<cargo::dataset> prepare_datasets(cargo::dataset::type type, const std::string& pattern, size_t n) { std::vector<cargo::dataset> datasets; datasets.reserve(n); for(size_t i = 0; i < n; ++i) { datasets.emplace_back(std::filesystem::current_path().string()+"/"+fmt::format(fmt::runtime(pattern), i), type); std::filesystem::path file_path = test_run_dir; file_path /= fmt::format(fmt::runtime(pattern), i); datasets.emplace_back(file_path.string(), type); } return datasets; Loading
tests/failure_tests.cpp 0 → 100644 +44 −0 Original line number Diff line number Diff line #include <catch2/catch_test_macros.hpp> #include <cargo/cargo.hpp> #include <vector> #include <tuple> using namespace cargo; SCENARIO("Client API handles connection errors", "[failure][client]") { GIVEN("A server address where no server is running") { cargo::server server("ofi+tcp://127.0.0.1:9999"); WHEN("a transfer is planned") { THEN("it throws a runtime_error") { std::vector<dataset> sources; std::vector<dataset> targets; REQUIRE_THROWS_AS(cargo::plan_transfer_datasets(server, sources, targets), std::runtime_error); } } } } SCENARIO("Client API handles transfer errors", "[failure][transfer]") { GIVEN("A valid server connection") { // This test requires a running server, but we will test the client-side logic // by simulating a server response indicating failure. For a real integration test, // we would need to mock the server or trigger a real failure. // Here we test the logic of asking for a non-existent file. // This requires a running server fixture, so we can't do it in a 'failure' labeled test // without more complex test setup. We will assume for now that if the server returns // an error, the client will propagate it. // A more complete test would be: // 1. Start server // 2. auto tx = cargo::transfer_dataset(server, non_existent_source, valid_target); // 3. auto status = tx.wait(); // 4. REQUIRE(status.failed()); // 5. REQUIRE(status.error() != error_code::success); SUCCEED("Placeholder for future integration failure tests."); } } No newline at end of file
tests/request_manager_tests.cpp 0 → 100644 +83 −0 Original line number Diff line number Diff line #include <catch2/catch_test_macros.hpp> #include <catch2/matchers/catch_matchers_all.hpp> #include <chrono> #include "../src/request_manager.hpp" using namespace cargo; using namespace std::chrono_literals; SCENARIO("Request manager handles transfer state aggregation", "[request_manager][unit]") { GIVEN("A request manager and a new transfer request") { request_manager rm; size_t nworkers = 2; auto req_exp = rm.create(nworkers); REQUIRE(req_exp.has_value()); auto req = req_exp.value(); auto tid = req.tid(); WHEN("The transfer has files but no status updates have arrived") { size_t nfiles = 2; std::vector<std::size_t> sizes = {1024, 1024}; rm.update(tid, nfiles, sizes); THEN("The overall status is pending") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::pending); REQUIRE(status_exp->bytes_transferred() == 0); REQUIRE(status_exp->total_bytes() == 2048); } } WHEN("One worker starts processing one file") { rm.update(tid, 1, {1024}); rm.update(tid, 0, 0, "file0", transfer_state::running, 50.0f, 200); THEN("The overall status is running") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::running); REQUIRE(status_exp->bytes_transferred() == 200); } } WHEN("All parts of a file are complete") { rm.update(tid, 1, {1024}); rm.update(tid, 0, 0, "file0", transfer_state::completed, 0.0f, 512); rm.update(tid, 0, 1, "file0", transfer_state::completed, 0.0f, 512); THEN("The overall status is completed") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::completed); REQUIRE(status_exp->bytes_transferred() == 1024); REQUIRE(status_exp->error() == error_code::success); } } WHEN("One worker reports a failure") { rm.update(tid, 1, {1024}); rm.update(tid, 0, 0, "file0", transfer_state::completed, 0.0f, 512); rm.update(tid, 0, 1, "file0", transfer_state::failed, 0.0f, 0, error_code::snafu); THEN("The overall status is failed") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::failed); REQUIRE(status_exp->error() == error_code::snafu); } } WHEN("One file is running and another is pending") { rm.update(tid, 2, {1024, 2048}); rm.update(tid, 0, 0, "file0", transfer_state::running, 50.0f, 100); THEN("The overall status is running") { auto status_exp = rm.lookup(tid); REQUIRE(status_exp.has_value()); REQUIRE(status_exp->state() == transfer_state::running); } } } } No newline at end of file