Loading lib/cargo/error.hpp +2 −25 Original line number Diff line number Diff line Loading @@ -69,31 +69,8 @@ public: return static_cast<uint32_t>(m_value); } [[nodiscard]] constexpr std::string_view name() const { switch(m_category) { case error_category::generic_error: break; case error_category::system_error: return "CARGO_SYSTEM_ERROR"; case error_category::mpi_error: return "CARGO_MPI_ERROR"; default: return "CARGO_UNKNOWN_ERROR"; } switch(m_value) { case error_value::success: return "CARGO_SUCCESS"; case error_value::snafu: return "CARGO_SNAFU"; case error_value::not_implemented: return "CARGO_NOT_IMPLEMENTED"; default: return "CARGO_UNKNOWN_ERROR"; } } [[nodiscard]] std::string_view name() const; [[nodiscard]] std::string message() const; Loading lib/error.cpp +255 −1 Original line number Diff line number Diff line Loading @@ -26,8 +26,262 @@ #include <boost/mpi/error_string.hpp> #include "cargo/error.hpp" // clang-format off #define EXPAND(s) case s: return #s // clang-format on constexpr std::string_view errno_name(int ec) { switch(ec) { EXPAND(EPERM); EXPAND(ENOENT); EXPAND(ESRCH); EXPAND(EINTR); EXPAND(EIO); EXPAND(ENXIO); EXPAND(E2BIG); EXPAND(ENOEXEC); EXPAND(EBADF); EXPAND(ECHILD); EXPAND(EAGAIN); EXPAND(ENOMEM); EXPAND(EACCES); EXPAND(EFAULT); EXPAND(ENOTBLK); EXPAND(EBUSY); EXPAND(EEXIST); EXPAND(EXDEV); EXPAND(ENODEV); EXPAND(ENOTDIR); EXPAND(EISDIR); EXPAND(EINVAL); EXPAND(ENFILE); EXPAND(EMFILE); EXPAND(ENOTTY); EXPAND(ETXTBSY); EXPAND(EFBIG); EXPAND(ENOSPC); EXPAND(ESPIPE); EXPAND(EROFS); EXPAND(EMLINK); EXPAND(EPIPE); EXPAND(EDOM); EXPAND(ERANGE); EXPAND(EDEADLK); EXPAND(ENAMETOOLONG); EXPAND(ENOLCK); EXPAND(ENOSYS); EXPAND(ENOTEMPTY); EXPAND(ELOOP); // EXPAND(EWOULDBLOCK); EXPAND(ENOMSG); EXPAND(EIDRM); EXPAND(ECHRNG); EXPAND(EL2NSYNC); EXPAND(EL3HLT); EXPAND(EL3RST); EXPAND(ELNRNG); EXPAND(EUNATCH); EXPAND(ENOCSI); EXPAND(EL2HLT); EXPAND(EBADE); EXPAND(EBADR); EXPAND(EXFULL); EXPAND(ENOANO); EXPAND(EBADRQC); EXPAND(EBADSLT); // EXPAND(EDEADLOCK); EXPAND(EBFONT); EXPAND(ENOSTR); EXPAND(ENODATA); EXPAND(ETIME); EXPAND(ENOSR); EXPAND(ENONET); EXPAND(ENOPKG); EXPAND(EREMOTE); EXPAND(ENOLINK); EXPAND(EADV); EXPAND(ESRMNT); EXPAND(ECOMM); EXPAND(EPROTO); EXPAND(EMULTIHOP); EXPAND(EDOTDOT); EXPAND(EBADMSG); EXPAND(EOVERFLOW); EXPAND(ENOTUNIQ); EXPAND(EBADFD); EXPAND(EREMCHG); EXPAND(ELIBACC); EXPAND(ELIBBAD); EXPAND(ELIBSCN); EXPAND(ELIBMAX); EXPAND(ELIBEXEC); EXPAND(EILSEQ); EXPAND(ERESTART); EXPAND(ESTRPIPE); EXPAND(EUSERS); EXPAND(ENOTSOCK); EXPAND(EDESTADDRREQ); EXPAND(EMSGSIZE); EXPAND(EPROTOTYPE); EXPAND(ENOPROTOOPT); EXPAND(EPROTONOSUPPORT); EXPAND(ESOCKTNOSUPPORT); EXPAND(EOPNOTSUPP); EXPAND(EPFNOSUPPORT); EXPAND(EAFNOSUPPORT); EXPAND(EADDRINUSE); EXPAND(EADDRNOTAVAIL); EXPAND(ENETDOWN); EXPAND(ENETUNREACH); EXPAND(ENETRESET); EXPAND(ECONNABORTED); EXPAND(ECONNRESET); EXPAND(ENOBUFS); EXPAND(EISCONN); EXPAND(ENOTCONN); EXPAND(ESHUTDOWN); EXPAND(ETOOMANYREFS); EXPAND(ETIMEDOUT); EXPAND(ECONNREFUSED); EXPAND(EHOSTDOWN); EXPAND(EHOSTUNREACH); EXPAND(EALREADY); EXPAND(EINPROGRESS); EXPAND(ESTALE); EXPAND(EUCLEAN); EXPAND(ENOTNAM); EXPAND(ENAVAIL); EXPAND(EISNAM); EXPAND(EREMOTEIO); EXPAND(EDQUOT); EXPAND(ENOMEDIUM); EXPAND(EMEDIUMTYPE); EXPAND(ECANCELED); EXPAND(ENOKEY); EXPAND(EKEYEXPIRED); EXPAND(EKEYREVOKED); EXPAND(EKEYREJECTED); EXPAND(EOWNERDEAD); EXPAND(ENOTRECOVERABLE); EXPAND(ERFKILL); EXPAND(EHWPOISON); default: return "EUNKNOWN"; } } constexpr std::string_view mpi_error_name(int ec) { switch(ec) { EXPAND(MPI_SUCCESS); EXPAND(MPI_ERR_BUFFER); EXPAND(MPI_ERR_COUNT); EXPAND(MPI_ERR_TYPE); EXPAND(MPI_ERR_TAG); EXPAND(MPI_ERR_COMM); EXPAND(MPI_ERR_RANK); EXPAND(MPI_ERR_REQUEST); EXPAND(MPI_ERR_ROOT); EXPAND(MPI_ERR_GROUP); EXPAND(MPI_ERR_OP); EXPAND(MPI_ERR_TOPOLOGY); EXPAND(MPI_ERR_DIMS); EXPAND(MPI_ERR_ARG); EXPAND(MPI_ERR_UNKNOWN); EXPAND(MPI_ERR_TRUNCATE); EXPAND(MPI_ERR_OTHER); EXPAND(MPI_ERR_INTERN); EXPAND(MPI_ERR_IN_STATUS); EXPAND(MPI_ERR_PENDING); EXPAND(MPI_ERR_ACCESS); EXPAND(MPI_ERR_AMODE); EXPAND(MPI_ERR_ASSERT); EXPAND(MPI_ERR_BAD_FILE); EXPAND(MPI_ERR_BASE); EXPAND(MPI_ERR_CONVERSION); EXPAND(MPI_ERR_DISP); EXPAND(MPI_ERR_DUP_DATAREP); EXPAND(MPI_ERR_FILE_EXISTS); EXPAND(MPI_ERR_FILE_IN_USE); EXPAND(MPI_ERR_FILE); EXPAND(MPI_ERR_INFO_KEY); EXPAND(MPI_ERR_INFO_NOKEY); EXPAND(MPI_ERR_INFO_VALUE); EXPAND(MPI_ERR_INFO); EXPAND(MPI_ERR_IO); EXPAND(MPI_ERR_KEYVAL); EXPAND(MPI_ERR_LOCKTYPE); EXPAND(MPI_ERR_NAME); EXPAND(MPI_ERR_NO_MEM); EXPAND(MPI_ERR_NOT_SAME); EXPAND(MPI_ERR_NO_SPACE); EXPAND(MPI_ERR_NO_SUCH_FILE); EXPAND(MPI_ERR_PORT); EXPAND(MPI_ERR_QUOTA); EXPAND(MPI_ERR_READ_ONLY); EXPAND(MPI_ERR_RMA_CONFLICT); EXPAND(MPI_ERR_RMA_SYNC); EXPAND(MPI_ERR_SERVICE); EXPAND(MPI_ERR_SIZE); EXPAND(MPI_ERR_SPAWN); EXPAND(MPI_ERR_UNSUPPORTED_DATAREP); EXPAND(MPI_ERR_UNSUPPORTED_OPERATION); EXPAND(MPI_ERR_WIN); EXPAND(MPI_T_ERR_MEMORY); EXPAND(MPI_T_ERR_NOT_INITIALIZED); EXPAND(MPI_T_ERR_CANNOT_INIT); EXPAND(MPI_T_ERR_INVALID_INDEX); EXPAND(MPI_T_ERR_INVALID_ITEM); EXPAND(MPI_T_ERR_INVALID_HANDLE); EXPAND(MPI_T_ERR_OUT_OF_HANDLES); EXPAND(MPI_T_ERR_OUT_OF_SESSIONS); EXPAND(MPI_T_ERR_INVALID_SESSION); EXPAND(MPI_T_ERR_CVAR_SET_NOT_NOW); EXPAND(MPI_T_ERR_CVAR_SET_NEVER); EXPAND(MPI_T_ERR_PVAR_NO_STARTSTOP); EXPAND(MPI_T_ERR_PVAR_NO_WRITE); EXPAND(MPI_T_ERR_PVAR_NO_ATOMIC); EXPAND(MPI_ERR_RMA_RANGE); EXPAND(MPI_ERR_RMA_ATTACH); EXPAND(MPI_ERR_RMA_FLAVOR); EXPAND(MPI_ERR_RMA_SHARED); EXPAND(MPI_T_ERR_INVALID); EXPAND(MPI_T_ERR_INVALID_NAME); default: return "MPI_ERR_UNKNOWN"; } } namespace cargo { [[nodiscard]] std::string_view error_code::name() const { switch(m_category) { case error_category::generic_error: break; case error_category::system_error: return errno_name(static_cast<int>(m_value)); case error_category::mpi_error: return mpi_error_name(static_cast<int>(m_value)); default: return "CARGO_UNKNOWN_ERROR"; } switch(m_value) { case error_value::success: return "CARGO_SUCCESS"; case error_value::snafu: return "CARGO_SNAFU"; case error_value::not_implemented: return "CARGO_NOT_IMPLEMENTED"; default: return "CARGO_UNKNOWN_ERROR"; } }; [[nodiscard]] std::string error_code::message() const { Loading @@ -53,6 +307,6 @@ error_code::message() const { default: return "unknown error category"; } }; } } // namespace cargo No newline at end of file lib/fmt_formatters.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ struct fmt::formatter<cargo::transfer> : formatter<std::string_view> { template <typename FormatContext> auto format(const cargo::transfer& tx, FormatContext& ctx) const { const auto str = fmt::format("{{id: {}}}", tx.id()); const auto str = fmt::format("{{tid: {}}}", tx.id()); return formatter<std::string_view>::format(str, ctx); } }; Loading src/cargo.cpp +17 −4 Original line number Diff line number Diff line Loading @@ -63,8 +63,8 @@ parse_command_line(int argc, char* argv[]) { // force logging messages to file app.add_option("-o,--output", cfg.output_file, "Write any output to FILENAME rather than sending it to the " "console") "Write any output to FILENAME.<pid> rather than sending it " "to the console.") ->option_text("FILENAME"); app.add_option("-l,--listen", cfg.address, Loading Loading @@ -92,6 +92,12 @@ parse_command_line(int argc, char* argv[]) { } } std::filesystem::path get_process_output_file(std::filesystem::path base) { base += fmt::format(".{}", ::getpid()); return base; } } // namespace int Loading @@ -110,12 +116,19 @@ main(int argc, char* argv[]) { if(cfg.output_file) { srv.configure_logger(logger::logger_type::file, *cfg.output_file); get_process_output_file(*cfg.output_file)); } return srv.run(); } else { return cargo::worker{rank}.run(); cargo::worker w{cfg.progname, rank}; if(cfg.output_file) { w.set_output_file(get_process_output_file(*cfg.output_file)); } return w.run(); } } catch(const std::exception& ex) { fmt::print(stderr, Loading src/worker/worker.cpp +25 −10 Original line number Diff line number Diff line Loading @@ -55,7 +55,13 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, namespace cargo { worker::worker(int rank) : m_rank(rank) {} worker::worker(std::string name, int rank) : m_name(std::move(name)), m_rank(rank) {} void worker::set_output_file(std::filesystem::path output_file) { m_output_file = std::move(output_file); } int worker::run() { Loading @@ -69,12 +75,19 @@ worker::run() { ranks_to_exclude.end()), 0); LOGGER_INIT(fmt::format("worker_{:03}", world.rank()), logger::console_color); const logger::logger_config cfg{ fmt::format("{}:{:03}", m_name, world.rank()), m_output_file ? logger::file : logger::console_color, m_output_file}; logger::create_default_logger(cfg); // Initialization finished LOGGER_INFO("Staging process initialized ({}:{})", world.rank(), workers.rank()); const auto greeting = fmt::format("Starting staging process (pid {})", getpid()); LOGGER_INFO("{:=>{}}", "", greeting.size()); LOGGER_INFO(greeting); LOGGER_INFO("{:=>{}}", "", greeting.size()); bool done = false; Loading @@ -96,16 +109,18 @@ worker::run() { case tag::sequential: { transfer_message m; world.recv(0, msg->tag(), m); LOGGER_CRITICAL("Transfer request received: {}", m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); const auto op = operation::make_operation( t, workers, m.input_path(), m.output_path()); cargo::error_code ec = (*op)(); LOGGER_EVAL(ec, INFO, ERROR, "Transfer finished: {}", ec); world.send(msg->source(), static_cast<int>(tag::status), status_message{m.tid(), m.seqno(), ec}); const status_message st{m.tid(), m.seqno(), ec}; LOGGER_INFO("msg <= to: {} body: {{status: {}}}", msg->source(), st); world.send(msg->source(), static_cast<int>(tag::status), st); break; } Loading Loading
lib/cargo/error.hpp +2 −25 Original line number Diff line number Diff line Loading @@ -69,31 +69,8 @@ public: return static_cast<uint32_t>(m_value); } [[nodiscard]] constexpr std::string_view name() const { switch(m_category) { case error_category::generic_error: break; case error_category::system_error: return "CARGO_SYSTEM_ERROR"; case error_category::mpi_error: return "CARGO_MPI_ERROR"; default: return "CARGO_UNKNOWN_ERROR"; } switch(m_value) { case error_value::success: return "CARGO_SUCCESS"; case error_value::snafu: return "CARGO_SNAFU"; case error_value::not_implemented: return "CARGO_NOT_IMPLEMENTED"; default: return "CARGO_UNKNOWN_ERROR"; } } [[nodiscard]] std::string_view name() const; [[nodiscard]] std::string message() const; Loading
lib/error.cpp +255 −1 Original line number Diff line number Diff line Loading @@ -26,8 +26,262 @@ #include <boost/mpi/error_string.hpp> #include "cargo/error.hpp" // clang-format off #define EXPAND(s) case s: return #s // clang-format on constexpr std::string_view errno_name(int ec) { switch(ec) { EXPAND(EPERM); EXPAND(ENOENT); EXPAND(ESRCH); EXPAND(EINTR); EXPAND(EIO); EXPAND(ENXIO); EXPAND(E2BIG); EXPAND(ENOEXEC); EXPAND(EBADF); EXPAND(ECHILD); EXPAND(EAGAIN); EXPAND(ENOMEM); EXPAND(EACCES); EXPAND(EFAULT); EXPAND(ENOTBLK); EXPAND(EBUSY); EXPAND(EEXIST); EXPAND(EXDEV); EXPAND(ENODEV); EXPAND(ENOTDIR); EXPAND(EISDIR); EXPAND(EINVAL); EXPAND(ENFILE); EXPAND(EMFILE); EXPAND(ENOTTY); EXPAND(ETXTBSY); EXPAND(EFBIG); EXPAND(ENOSPC); EXPAND(ESPIPE); EXPAND(EROFS); EXPAND(EMLINK); EXPAND(EPIPE); EXPAND(EDOM); EXPAND(ERANGE); EXPAND(EDEADLK); EXPAND(ENAMETOOLONG); EXPAND(ENOLCK); EXPAND(ENOSYS); EXPAND(ENOTEMPTY); EXPAND(ELOOP); // EXPAND(EWOULDBLOCK); EXPAND(ENOMSG); EXPAND(EIDRM); EXPAND(ECHRNG); EXPAND(EL2NSYNC); EXPAND(EL3HLT); EXPAND(EL3RST); EXPAND(ELNRNG); EXPAND(EUNATCH); EXPAND(ENOCSI); EXPAND(EL2HLT); EXPAND(EBADE); EXPAND(EBADR); EXPAND(EXFULL); EXPAND(ENOANO); EXPAND(EBADRQC); EXPAND(EBADSLT); // EXPAND(EDEADLOCK); EXPAND(EBFONT); EXPAND(ENOSTR); EXPAND(ENODATA); EXPAND(ETIME); EXPAND(ENOSR); EXPAND(ENONET); EXPAND(ENOPKG); EXPAND(EREMOTE); EXPAND(ENOLINK); EXPAND(EADV); EXPAND(ESRMNT); EXPAND(ECOMM); EXPAND(EPROTO); EXPAND(EMULTIHOP); EXPAND(EDOTDOT); EXPAND(EBADMSG); EXPAND(EOVERFLOW); EXPAND(ENOTUNIQ); EXPAND(EBADFD); EXPAND(EREMCHG); EXPAND(ELIBACC); EXPAND(ELIBBAD); EXPAND(ELIBSCN); EXPAND(ELIBMAX); EXPAND(ELIBEXEC); EXPAND(EILSEQ); EXPAND(ERESTART); EXPAND(ESTRPIPE); EXPAND(EUSERS); EXPAND(ENOTSOCK); EXPAND(EDESTADDRREQ); EXPAND(EMSGSIZE); EXPAND(EPROTOTYPE); EXPAND(ENOPROTOOPT); EXPAND(EPROTONOSUPPORT); EXPAND(ESOCKTNOSUPPORT); EXPAND(EOPNOTSUPP); EXPAND(EPFNOSUPPORT); EXPAND(EAFNOSUPPORT); EXPAND(EADDRINUSE); EXPAND(EADDRNOTAVAIL); EXPAND(ENETDOWN); EXPAND(ENETUNREACH); EXPAND(ENETRESET); EXPAND(ECONNABORTED); EXPAND(ECONNRESET); EXPAND(ENOBUFS); EXPAND(EISCONN); EXPAND(ENOTCONN); EXPAND(ESHUTDOWN); EXPAND(ETOOMANYREFS); EXPAND(ETIMEDOUT); EXPAND(ECONNREFUSED); EXPAND(EHOSTDOWN); EXPAND(EHOSTUNREACH); EXPAND(EALREADY); EXPAND(EINPROGRESS); EXPAND(ESTALE); EXPAND(EUCLEAN); EXPAND(ENOTNAM); EXPAND(ENAVAIL); EXPAND(EISNAM); EXPAND(EREMOTEIO); EXPAND(EDQUOT); EXPAND(ENOMEDIUM); EXPAND(EMEDIUMTYPE); EXPAND(ECANCELED); EXPAND(ENOKEY); EXPAND(EKEYEXPIRED); EXPAND(EKEYREVOKED); EXPAND(EKEYREJECTED); EXPAND(EOWNERDEAD); EXPAND(ENOTRECOVERABLE); EXPAND(ERFKILL); EXPAND(EHWPOISON); default: return "EUNKNOWN"; } } constexpr std::string_view mpi_error_name(int ec) { switch(ec) { EXPAND(MPI_SUCCESS); EXPAND(MPI_ERR_BUFFER); EXPAND(MPI_ERR_COUNT); EXPAND(MPI_ERR_TYPE); EXPAND(MPI_ERR_TAG); EXPAND(MPI_ERR_COMM); EXPAND(MPI_ERR_RANK); EXPAND(MPI_ERR_REQUEST); EXPAND(MPI_ERR_ROOT); EXPAND(MPI_ERR_GROUP); EXPAND(MPI_ERR_OP); EXPAND(MPI_ERR_TOPOLOGY); EXPAND(MPI_ERR_DIMS); EXPAND(MPI_ERR_ARG); EXPAND(MPI_ERR_UNKNOWN); EXPAND(MPI_ERR_TRUNCATE); EXPAND(MPI_ERR_OTHER); EXPAND(MPI_ERR_INTERN); EXPAND(MPI_ERR_IN_STATUS); EXPAND(MPI_ERR_PENDING); EXPAND(MPI_ERR_ACCESS); EXPAND(MPI_ERR_AMODE); EXPAND(MPI_ERR_ASSERT); EXPAND(MPI_ERR_BAD_FILE); EXPAND(MPI_ERR_BASE); EXPAND(MPI_ERR_CONVERSION); EXPAND(MPI_ERR_DISP); EXPAND(MPI_ERR_DUP_DATAREP); EXPAND(MPI_ERR_FILE_EXISTS); EXPAND(MPI_ERR_FILE_IN_USE); EXPAND(MPI_ERR_FILE); EXPAND(MPI_ERR_INFO_KEY); EXPAND(MPI_ERR_INFO_NOKEY); EXPAND(MPI_ERR_INFO_VALUE); EXPAND(MPI_ERR_INFO); EXPAND(MPI_ERR_IO); EXPAND(MPI_ERR_KEYVAL); EXPAND(MPI_ERR_LOCKTYPE); EXPAND(MPI_ERR_NAME); EXPAND(MPI_ERR_NO_MEM); EXPAND(MPI_ERR_NOT_SAME); EXPAND(MPI_ERR_NO_SPACE); EXPAND(MPI_ERR_NO_SUCH_FILE); EXPAND(MPI_ERR_PORT); EXPAND(MPI_ERR_QUOTA); EXPAND(MPI_ERR_READ_ONLY); EXPAND(MPI_ERR_RMA_CONFLICT); EXPAND(MPI_ERR_RMA_SYNC); EXPAND(MPI_ERR_SERVICE); EXPAND(MPI_ERR_SIZE); EXPAND(MPI_ERR_SPAWN); EXPAND(MPI_ERR_UNSUPPORTED_DATAREP); EXPAND(MPI_ERR_UNSUPPORTED_OPERATION); EXPAND(MPI_ERR_WIN); EXPAND(MPI_T_ERR_MEMORY); EXPAND(MPI_T_ERR_NOT_INITIALIZED); EXPAND(MPI_T_ERR_CANNOT_INIT); EXPAND(MPI_T_ERR_INVALID_INDEX); EXPAND(MPI_T_ERR_INVALID_ITEM); EXPAND(MPI_T_ERR_INVALID_HANDLE); EXPAND(MPI_T_ERR_OUT_OF_HANDLES); EXPAND(MPI_T_ERR_OUT_OF_SESSIONS); EXPAND(MPI_T_ERR_INVALID_SESSION); EXPAND(MPI_T_ERR_CVAR_SET_NOT_NOW); EXPAND(MPI_T_ERR_CVAR_SET_NEVER); EXPAND(MPI_T_ERR_PVAR_NO_STARTSTOP); EXPAND(MPI_T_ERR_PVAR_NO_WRITE); EXPAND(MPI_T_ERR_PVAR_NO_ATOMIC); EXPAND(MPI_ERR_RMA_RANGE); EXPAND(MPI_ERR_RMA_ATTACH); EXPAND(MPI_ERR_RMA_FLAVOR); EXPAND(MPI_ERR_RMA_SHARED); EXPAND(MPI_T_ERR_INVALID); EXPAND(MPI_T_ERR_INVALID_NAME); default: return "MPI_ERR_UNKNOWN"; } } namespace cargo { [[nodiscard]] std::string_view error_code::name() const { switch(m_category) { case error_category::generic_error: break; case error_category::system_error: return errno_name(static_cast<int>(m_value)); case error_category::mpi_error: return mpi_error_name(static_cast<int>(m_value)); default: return "CARGO_UNKNOWN_ERROR"; } switch(m_value) { case error_value::success: return "CARGO_SUCCESS"; case error_value::snafu: return "CARGO_SNAFU"; case error_value::not_implemented: return "CARGO_NOT_IMPLEMENTED"; default: return "CARGO_UNKNOWN_ERROR"; } }; [[nodiscard]] std::string error_code::message() const { Loading @@ -53,6 +307,6 @@ error_code::message() const { default: return "unknown error category"; } }; } } // namespace cargo No newline at end of file
lib/fmt_formatters.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -65,7 +65,7 @@ struct fmt::formatter<cargo::transfer> : formatter<std::string_view> { template <typename FormatContext> auto format(const cargo::transfer& tx, FormatContext& ctx) const { const auto str = fmt::format("{{id: {}}}", tx.id()); const auto str = fmt::format("{{tid: {}}}", tx.id()); return formatter<std::string_view>::format(str, ctx); } }; Loading
src/cargo.cpp +17 −4 Original line number Diff line number Diff line Loading @@ -63,8 +63,8 @@ parse_command_line(int argc, char* argv[]) { // force logging messages to file app.add_option("-o,--output", cfg.output_file, "Write any output to FILENAME rather than sending it to the " "console") "Write any output to FILENAME.<pid> rather than sending it " "to the console.") ->option_text("FILENAME"); app.add_option("-l,--listen", cfg.address, Loading Loading @@ -92,6 +92,12 @@ parse_command_line(int argc, char* argv[]) { } } std::filesystem::path get_process_output_file(std::filesystem::path base) { base += fmt::format(".{}", ::getpid()); return base; } } // namespace int Loading @@ -110,12 +116,19 @@ main(int argc, char* argv[]) { if(cfg.output_file) { srv.configure_logger(logger::logger_type::file, *cfg.output_file); get_process_output_file(*cfg.output_file)); } return srv.run(); } else { return cargo::worker{rank}.run(); cargo::worker w{cfg.progname, rank}; if(cfg.output_file) { w.set_output_file(get_process_output_file(*cfg.output_file)); } return w.run(); } } catch(const std::exception& ex) { fmt::print(stderr, Loading
src/worker/worker.cpp +25 −10 Original line number Diff line number Diff line Loading @@ -55,7 +55,13 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, namespace cargo { worker::worker(int rank) : m_rank(rank) {} worker::worker(std::string name, int rank) : m_name(std::move(name)), m_rank(rank) {} void worker::set_output_file(std::filesystem::path output_file) { m_output_file = std::move(output_file); } int worker::run() { Loading @@ -69,12 +75,19 @@ worker::run() { ranks_to_exclude.end()), 0); LOGGER_INIT(fmt::format("worker_{:03}", world.rank()), logger::console_color); const logger::logger_config cfg{ fmt::format("{}:{:03}", m_name, world.rank()), m_output_file ? logger::file : logger::console_color, m_output_file}; logger::create_default_logger(cfg); // Initialization finished LOGGER_INFO("Staging process initialized ({}:{})", world.rank(), workers.rank()); const auto greeting = fmt::format("Starting staging process (pid {})", getpid()); LOGGER_INFO("{:=>{}}", "", greeting.size()); LOGGER_INFO(greeting); LOGGER_INFO("{:=>{}}", "", greeting.size()); bool done = false; Loading @@ -96,16 +109,18 @@ worker::run() { case tag::sequential: { transfer_message m; world.recv(0, msg->tag(), m); LOGGER_CRITICAL("Transfer request received: {}", m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); const auto op = operation::make_operation( t, workers, m.input_path(), m.output_path()); cargo::error_code ec = (*op)(); LOGGER_EVAL(ec, INFO, ERROR, "Transfer finished: {}", ec); world.send(msg->source(), static_cast<int>(tag::status), status_message{m.tid(), m.seqno(), ec}); const status_message st{m.tid(), m.seqno(), ec}; LOGGER_INFO("msg <= to: {} body: {{status: {}}}", msg->source(), st); world.send(msg->source(), static_cast<int>(tag::status), st); break; } Loading