diff --git a/CMakeLists.txt b/CMakeLists.txt index 2c66a9552ea0479804118bc4cc3d87ad5bec32af..3da3f2024ee64e8ef63722b147e56abc74acef2c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.3.1 + VERSION 0.3.2 LANGUAGES C CXX ) diff --git a/README.md b/README.md index 1fccc2dfea3eac1b1fbb0bd5047759b9c83feee7..6881db1f92fb209c01eab531fd9bdf23da201395 100644 --- a/README.md +++ b/README.md @@ -170,3 +170,24 @@ mpirun -np 4 ${INSTALL_DIR}/bin/cargo -l ofi+tcp://127.0.0.1:62000 cd build RUNNER_SKIP_START=1 ctest -VV --output-on-failure --stop-on-failure -j 8 ``` + + +## Options +Cargo supports the following option: +``` +b --blocksize (default is 512). Transfers will use this blocksize in kbytes. +``` + +## Utilities +There are a few utility command line programs that can be used to interact with Cargo. + +```shell +cli/ccp --server ofi+tcp://127.0.0.1:62000 --input /directory/subdir --output /directorydst/subdirdst --if --of +``` +`--input` and `--output` are required arguments, and can be a directory or a file path. +`--if` and `--of`select the specific transfer method, on V0.3.2 there are only to possibilities: + +`--if mpio` (It will read in parallel from i.e. lustre using MPI, and write using posix calls.) +`--of mpio` (It will read using posix calls, and write using MPI (i.e. to lustre)) + +Other commands are `ping`, `shutdown` and `shaping` (for bw control). \ No newline at end of file diff --git a/cli/shaping.cpp b/cli/shaping.cpp index edefb7cbc00616b64e9d560624cf0b9a0bb396a3..75d937149c157179db322de2b4ceca51f6b13acb 100644 --- a/cli/shaping.cpp +++ b/cli/shaping.cpp @@ -89,7 +89,7 @@ main(int argc, char* argv[]) { if(const auto result = rpc_client.lookup(address); result.has_value()) { const auto& endpoint = result.value(); - const auto retval = endpoint.call("bw_shaping", cfg.tid, cfg.shaping); + const auto retval = endpoint.call("bw_control", cfg.tid, cfg.shaping); if(retval.has_value()) { @@ -100,7 +100,7 @@ main(int argc, char* argv[]) { return EXIT_SUCCESS; } - fmt::print(stderr, "bw_shaping RPC failed\n"); + fmt::print(stderr, "bw_control RPC failed\n"); return EXIT_FAILURE; } else { diff --git a/etc/cargo.conf.in b/etc/cargo.conf.in index 18995224e5ca2527b11fdd5c041ab3e01b11f40c..ddf4622eced953287d0f173f2e501ce69e7cb44e 100644 --- a/etc/cargo.conf.in +++ b/etc/cargo.conf.in @@ -23,4 +23,7 @@ global_settings: [ # number of worker threads to serve I/O requests workers: 4, + + # Block Size (in kb) for I/O requestss + blocksize: 512, ] diff --git a/lib/cargo.hpp b/lib/cargo.hpp index 90d0c9af7312856847838a68eda12e238deb4811..ca15aa75c234f9b9892a6548a3a924bf842295d4 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -61,12 +61,12 @@ private: class dataset { public: - enum class type { posix, parallel }; + enum class type { posix, parallel, none, gekkofs, hercules, expand, dataclay }; dataset() noexcept = default; explicit dataset(std::string path, - dataset::type type = dataset::type::posix) noexcept; + dataset::type type = dataset::type::none) noexcept; [[nodiscard]] std::string path() const noexcept; @@ -86,7 +86,7 @@ public: private: std::string m_path; - dataset::type m_type = dataset::type::posix; + dataset::type m_type = dataset::type::none; }; @@ -121,6 +121,15 @@ public: [[nodiscard]] transfer_status status() const; + + /** + * @brief updates the bw control of the transfer + * + * @param bw_control + */ + void + bw_control (std::int16_t bw_control) const; + /** * Wait for the associated transfer to complete. * diff --git a/lib/fmt_formatters.hpp b/lib/fmt_formatters.hpp index dbcb9aa03856f697c0aa9b7a4cfbb308244ced3f..b57d5b1247a4e811eab06cc508ab7a64a2dac792 100644 --- a/lib/fmt_formatters.hpp +++ b/lib/fmt_formatters.hpp @@ -26,6 +26,7 @@ #define CARGO_FMT_FORMATTERS_HPP #include +#include #include #include #include diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 4cf89d24442f461a31cd02f0aa4c9ab2f61c5e7a..65fc7262e9d2faa9955f83d28d051c54b2cc14ff 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -112,7 +112,7 @@ transfer::status() const { const response_type resp{call_rv.value()}; const auto& [s, bw, ec] = resp.value(); - LOGGER_EVAL(resp.error_code(), INFO, ERROR, + LOGGER_EVAL(resp.error_code(), ERROR, INFO, "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, resp.error_code(), resp.op_id()); @@ -128,6 +128,41 @@ transfer::status() const { throw std::runtime_error("rpc lookup failed"); } +void +transfer::bw_control(std::int16_t bw_control) const { + + using proto::generic_response; + + network::client rpc_client{m_srv.protocol()}; + const auto rpc = network::rpc_info::create("bw_control", m_srv.address()); + using response_type = generic_response; + + if(const auto lookup_rv = rpc_client.lookup(m_srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{tid: {}}}", rpc, m_id); + + if(const auto call_rv = endp.call(rpc.name(), m_id, bw_control); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + + LOGGER_EVAL(resp.error_code(), ERROR, INFO, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); + + if(resp.error_code()) { + throw std::runtime_error( + fmt::format("rpc call failed: {}", resp.error_code())); + } + } + return; + } + + throw std::runtime_error("rpc lookup failed"); +} + transfer_status::transfer_status(transfer_state status, float bw, error_code error) noexcept : m_state(status), m_bw(bw), m_error(error) {} @@ -189,7 +224,7 @@ transfer_datasets(const server& srv, const std::vector& sources, const response_with_id resp{call_rv.value()}; - LOGGER_EVAL(resp.error_code(), INFO, ERROR, + LOGGER_EVAL(resp.error_code(), ERROR, INFO, "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, resp.error_code(), resp.op_id()); diff --git a/src/cargo.cpp b/src/cargo.cpp index a0f3a0cc7164e603cc5131943a7dd998cafa9c5c..0560aaa334ccf42f331f95ac03141c4ea2dd0835 100644 --- a/src/cargo.cpp +++ b/src/cargo.cpp @@ -49,6 +49,7 @@ struct cargo_config { bool daemonize = false; std::optional output_file; std::string address; + std::uint64_t blocksize; }; cargo_config @@ -75,6 +76,11 @@ parse_command_line(int argc, char* argv[]) { "Check `fi_info` to see the list of available protocols.\n") ->option_text("ADDRESS") ->required(); + + app.add_option("-b,--blocksize", cfg.blocksize, + "Number of bytes to send in each message (in kb). Defaults to 512(kb).\n") + ->option_text("BLOCKSIZE") + ->default_val(512); app.add_flag_function( "-v,--version", @@ -112,7 +118,7 @@ main(int argc, char* argv[]) { try { if(const auto rank = world.rank(); rank == 0) { cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize, - fs::current_path()}; + fs::current_path(), cfg.blocksize}; if(cfg.output_file) { srv.configure_logger(logger::logger_type::file, @@ -128,6 +134,8 @@ main(int argc, char* argv[]) { w.set_output_file(get_process_output_file(*cfg.output_file)); } + w.set_block_size(cfg.blocksize); + return w.run(); } } catch(const std::exception& ex) { diff --git a/src/master.cpp b/src/master.cpp index 8583b3ed492fce963082f1aa4d5cec0aa13a7821..a75526ebcfaafc6f207e91b4c601e84fc9904633 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -71,9 +71,9 @@ using namespace std::literals; namespace cargo { master_server::master_server(std::string name, std::string address, - bool daemonize, std::filesystem::path rundir, + bool daemonize, std::filesystem::path rundir, std::uint64_t block_size, std::optional pidfile) - : server(std::move(name), std::move(address), daemonize, std::move(rundir), + : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(block_size), std::move(pidfile)), provider(m_network_engine, 0), m_mpi_listener_ess(thallium::xstream::create()), @@ -85,7 +85,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(shutdown)); provider::define(EXPAND(transfer_datasets)); provider::define(EXPAND(transfer_status)); - provider::define(EXPAND(bw_shaping)); + provider::define(EXPAND(bw_control)); #undef EXPAND @@ -121,7 +121,7 @@ master_server::mpi_listener_ult() { case tag::status: { status_message m; world.recv(msg->source(), msg->tag(), m); - LOGGER_INFO("msg => from: {} body: {{payload: {}}}", + LOGGER_DEBUG("msg => from: {} body: {{payload: {}}}", msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, @@ -171,7 +171,7 @@ master_server::ping(const network::request& req) { void -master_server::bw_shaping(const network::request& req, std::uint64_t tid, +master_server::bw_control(const network::request& req, std::uint64_t tid, std::int16_t shaping) { using network::get_address; using network::rpc_info; @@ -238,10 +238,13 @@ master_server::transfer_datasets(const network::request& req, // We need to expand directories to single files on the s // Then create a new message for each file and append the // file to the d prefix - // We will asume that the path is the original relative + // We will asume that the path is the original absolute + // The prefix selects the method of transfer + // And if not specified then we will use none // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then // bbb/xxx -> ttt/xxx const auto& p = s.path(); + std::vector files; if(std::filesystem::is_directory(p)) { LOGGER_INFO("Expanding input directory {}", p); diff --git a/src/master.hpp b/src/master.hpp index 6243dafa26f39b6547607197c08c0f6e884e2fdf..0adfcb73550b646d3c540df75453eb3e533bf049 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -35,7 +35,7 @@ class master_server : public network::server, public network::provider { public: master_server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir, + std::filesystem::path rundir, std::uint64_t block_size, std::optional pidfile = {}); ~master_server(); @@ -61,7 +61,7 @@ private: // Receives a request to increase or decrease BW // -1 faster, 0 , +1 slower void - bw_shaping(const network::request& req, std::uint64_t tid, std::int16_t shaping); + bw_control(const network::request& req, std::uint64_t tid, std::int16_t shaping); private: // Dedicated execution stream for the MPI listener ULT diff --git a/src/net/server.cpp b/src/net/server.cpp index ac9c297fb2dac4df70588b14adba4998989e53f5..2bcade2b7c73ff18ffb1b895df40dd17b0730551 100644 --- a/src/net/server.cpp +++ b/src/net/server.cpp @@ -78,13 +78,13 @@ write_pidfile(const std::filesystem::path& pidfile) { namespace network { server::server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir, + std::filesystem::path rundir, std::uint64_t block_size, std::optional pidfile) : m_name(std::move(name)), m_address(std::move(address)), m_daemonize(daemonize), m_rundir(std::move(rundir)), m_pidfile(daemonize ? std::make_optional(m_rundir / (m_name + ".pid")) - : std::move(pidfile)), + : std::move(pidfile)), m_kb_size(block_size), m_logger_config(m_name, logger::logger_type::console_color), m_network_engine(m_address, THALLIUM_SERVER_MODE) {} @@ -267,6 +267,7 @@ server::print_configuration() const { LOGGER_INFO(" - pidfile: {}", *m_pidfile); } + LOGGER_INFO(" - block_size: {} kb", m_kb_size); LOGGER_INFO(" - address for remote requests: {}", self_address()); LOGGER_INFO(""); } diff --git a/src/net/server.hpp b/src/net/server.hpp index 9d9855345f9bcbe9ba23754f256bff8af45013d1..1c64d82d571e2a9d333dbe7dbeb01f08f389c971 100644 --- a/src/net/server.hpp +++ b/src/net/server.hpp @@ -46,7 +46,7 @@ class server { public: server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir, + std::filesystem::path rundir, std::uint64_t block_size, std::optional pidfile = {}); ~server(); @@ -106,7 +106,8 @@ private: bool m_daemonize; std::filesystem::path m_rundir; std::optional m_pidfile; - logger::logger_config m_logger_config; + std::uint64_t m_kb_size; + logger::logger_config m_logger_config; protected: thallium::engine m_network_engine; diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index e16064d8d73856b57e3f112ff4f843d26026d758..e9f60241bec0068033c223473f091c04d67856ab 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -32,9 +32,9 @@ namespace cargo { mpio_read::mpio_read(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path) + std::filesystem::path output_path, std::uint64_t block_size) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} cargo::error_code mpio_read::operator()() { @@ -48,7 +48,7 @@ mpio_read::operator()() { m_workers, m_input_path, mpioxx::file_open_mode::rdonly); mpioxx::offset file_size = input_file.size(); - std::size_t block_size = 512 * 1024u; + std::size_t block_size = m_kb_size * 1024u; // create block type MPI_Datatype block_type; diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index d135b05bbbc82c85f6e71af60678adbe3c7c0cc5..ce17c9876e546ad0fe4afe56e22dfee3a6963bc4 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -38,7 +38,7 @@ class mpio_read : public operation { public: mpio_read(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path); + std::filesystem::path output_path, std::uint64_t block_size); cargo::error_code operator()() final; @@ -62,6 +62,7 @@ private: std::size_t m_block_size; memory_buffer m_buffer; std::vector m_buffer_regions; + std::uint64_t m_kb_size; }; diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index c4b51e8e4d361772c0ef137177306743e2908bb4..5a84ca75ff9ce22d04315efa0849c8e0cacfb066 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -40,7 +40,7 @@ mpio_write::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); - std::size_t block_size = 512 * 1024u; // 512 kb + std::size_t block_size = m_kb_size * 1024u; std::size_t file_size = std::filesystem::file_size(m_input_path); // compute the number of blocks in the file diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 8c3d5c1cb054332ed15b26564314b11bc570e606..05755ff55a2576acd2b542f4142c1b77fd8cbf6f 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -38,9 +38,9 @@ class mpio_write : public operation { public: mpio_write(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path) + std::filesystem::path output_path, std::uint64_t block_size) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} cargo::error_code operator()() final; @@ -70,6 +70,7 @@ private: memory_buffer m_buffer; std::vector m_buffer_regions; std::size_t m_bytes_per_rank; + std::uint64_t m_kb_size; }; } // namespace cargo diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 288991fc10324dcd3003c7cfd408b3f3e052151f..3e380dd595e809c9bc0771a1f7ed915747de0fa8 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -34,21 +34,21 @@ namespace cargo { std::unique_ptr operation::make_operation(cargo::tag t, mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path) { + std::filesystem::path output_path, std::uint64_t block_size) { using cargo::tag; switch(t) { case tag::pread: return std::make_unique(std::move(workers), std::move(input_path), - std::move(output_path)); + std::move(output_path), block_size); case tag::pwrite: return std::make_unique(std::move(workers), std::move(input_path), - std::move(output_path)); + std::move(output_path), block_size); case tag::sequential: return std::make_unique(std::move(workers), std::move(input_path), - std::move(output_path)); + std::move(output_path), block_size); default: return {}; } diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index 637cdc7a45a553890883d89d1cfef9503292d021..b7e0eb8e1647a96e1b9ee2484262663dcb073eef 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -41,7 +41,7 @@ public: static std::unique_ptr make_operation(cargo::tag t, boost::mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path); + std::filesystem::path output_path, std::uint64_t block_size); virtual ~operation() = default; diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 259514218141b87f30e3a11fefce81d8c807ca5c..cb5860bd30c87f2405441c62c40926ed173bf61f 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -35,9 +35,9 @@ class seq_operation : public operation { public: seq_operation(mpi::communicator comm, std::filesystem::path input_path, - std::filesystem::path output_path) + std::filesystem::path output_path, std::uint64_t block_size) : m_comm(std::move(comm)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} cargo::error_code operator()() final; @@ -52,6 +52,7 @@ private: std::filesystem::path m_input_path; std::filesystem::path m_output_path; cargo::error_code m_status; + std::uint64_t m_kb_size; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 56e21438f121f43aa53f7ea010912de3642b83ff..00a3b3c1a83746825b98fa70d722ffbeb200539b 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -58,7 +58,7 @@ update_state(int rank, std::uint64_t tid, std::uint32_t seqno, mpi::communicator world; const cargo::status_message m{tid, seqno, st, bw, ec}; - LOGGER_INFO("msg <= to: {} body: {{payload: {}}}", rank, m); + LOGGER_DEBUG("msg <= to: {} body: {{payload: {}}}", rank, m); world.send(rank, static_cast(cargo::tag::status), m); } @@ -74,6 +74,11 @@ worker::set_output_file(std::filesystem::path output_file) { m_output_file = std::move(output_file); } +void +worker::set_block_size(std::uint64_t block_size) { + m_block_size = block_size; +} + int worker::run() { @@ -155,7 +160,7 @@ worker::run() { make_pair(m.input_path(), m.output_path()), make_pair(operation::make_operation(t, workers, m.input_path(), - m.output_path()), + m.output_path(), m_block_size), 0))); const auto op = @@ -185,7 +190,7 @@ worker::run() { const auto op = I->second.first.get(); if(op) { - op->set_bw_shaping(0); + op->set_bw_shaping(m.shaping()); } else { LOGGER_INFO("Operation non existent", msg->source(), m); } diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp index 1890974bd11aae2bd92fe63279df800963e31509..84e877a8ded6291f3ee1be371ca135054850aa91 100644 --- a/src/worker/worker.hpp +++ b/src/worker/worker.hpp @@ -38,6 +38,9 @@ public: void set_output_file(std::filesystem::path output_file); + void + set_block_size(std::uint64_t block_size); + int run(); @@ -46,6 +49,7 @@ private: std::string m_name; int m_rank; std::optional m_output_file; + std::uint64_t m_block_size; };