Commit a3de90e3 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Merge branch...

Merge branch 'amiranda/10-support-waiting-for-a-transfer-request-to-complete-via-std-future' into 'main'

Resolve "Support waiting for a transfer request to complete"

Closes #10

See merge request !6
parents 044059fb 94ba5f12
Loading
Loading
Loading
Loading
Loading
+12 −3
Original line number Original line Diff line number Diff line
@@ -24,9 +24,10 @@


add_library(cargo SHARED)
add_library(cargo SHARED)


target_sources(cargo PRIVATE cargo.hpp fmt_formatters.hpp libcargo.cpp)
target_sources(cargo PRIVATE cargo.hpp fmt_formatters.hpp cargo/error.hpp
        libcargo.cpp error.cpp)


list(APPEND public_headers "cargo.hpp")
list(APPEND public_headers "cargo.hpp;cargo/error.hpp")
list(APPEND public_headers "fmt_formatters.hpp")
list(APPEND public_headers "fmt_formatters.hpp")


target_include_directories(
target_include_directories(
@@ -36,7 +37,15 @@ target_include_directories(


set_target_properties(cargo PROPERTIES PUBLIC_HEADER "${public_headers}")
set_target_properties(cargo PROPERTIES PUBLIC_HEADER "${public_headers}")


target_link_libraries(cargo PRIVATE logger::logger fmt::fmt thallium)
target_link_libraries(cargo PRIVATE
        logger::logger
        fmt::fmt
        thallium
        MPI::MPI_CXX
        Boost::serialization
        Boost::mpi
        net::rpc_client
)


## Install library + targets ###################################################
## Install library + targets ###################################################


+110 −12
Original line number Original line Diff line number Diff line
@@ -28,11 +28,12 @@


#include <string>
#include <string>
#include <vector>
#include <vector>
#include <chrono>
#include <cargo/error.hpp>


namespace cargo {
namespace cargo {


using transfer_id = std::uint64_t;
using transfer_id = std::uint64_t;
using error_code = std::int32_t;


/**
/**
 * A Cargo server
 * A Cargo server
@@ -85,28 +86,111 @@ private:
};
};




/**
 * The status of a Cargo transfer
 */
enum class transfer_state { pending, running, completed, failed };

class transfer_status;

/**
/**
 * A transfer handler
 * A transfer handler
 */
 */
class transfer {
class transfer {


public:
    friend transfer
    transfer() noexcept = default;
    transfer_datasets(const server& srv, const std::vector<dataset>& sources,
    explicit transfer(transfer_id id) noexcept;
                      const std::vector<dataset>& targets);

    explicit transfer(transfer_id id, server srv) noexcept;


    [[nodiscard]] transfer_id
    [[nodiscard]] transfer_id
    id() const noexcept;
    id() const noexcept;


    template <typename Archive>
public:
    void
    /**
    serialize(Archive& ar) {
     * Get the current status of the associated transfer.
        ar& m_id;
     *
    }
     * @return A `transfer_status` object containing detailed information about
     * the transfer status.
     */
    [[nodiscard]] transfer_status
    status() const;

    /**
     * Wait for the associated transfer to complete.
     *
     * @return A `transfer_status` object containing detailed information about
     * the transfer status.
     */
    [[nodiscard]] transfer_status
    wait() const;

    /**
     * Wait for the associated transfer to complete or for a timeout to occur.
     * @param timeout The maximum amount of time to wait for the transfer to
     * complete.
     * @return A `transfer_status` object containing detailed information about
     * the transfer status.
     */
    [[nodiscard]] transfer_status
    wait_for(const std::chrono::nanoseconds& timeout) const;


private:
private:
    transfer_id m_id;
    transfer_id m_id;
    server m_srv;
};
};


/**
 * Detailed status information for a transfer
 */
class transfer_status {

    friend transfer_status
    transfer::status() const;

    transfer_status(transfer_state status, error_code error) noexcept;

public:
    /**
     * Get the current status of the associated transfer.
     *
     * @return A `transfer_state` enum value representing the current status.
     */
    [[nodiscard]] transfer_state
    state() const noexcept;

    /**
     * Check whether the transfer has completed.
     *
     * @return true if the transfer has completed, false otherwise.
     */
    [[nodiscard]] bool
    done() const noexcept;

    /**
     * Check whether the transfer has failed.
     *
     * @return true if the transfer has failed, false otherwise.
     */
    [[nodiscard]] bool
    failed() const noexcept;

    /**
     * Retrieve the error code associated with a failed transfer.
     *
     * @return An error code describing a transfer failure or
     * `error_code::success` if the transfer succeeded.
     * If the transfer has not yet completed,
     * `error_code::transfer_in_progress` is returned.
     */
    [[nodiscard]] error_code
    error() const;

private:
    transfer_state m_state;
    error_code m_error;
};


/**
/**
 * Request the transfer of a dataset collection.
 * Request the transfer of a dataset collection.
@@ -116,10 +200,24 @@ private:
 * @param targets The output datasets that should be generated.
 * @param targets The output datasets that should be generated.
 * @return A transfer
 * @return A transfer
 */
 */
cargo::transfer
transfer
transfer_datasets(const server& srv, const std::vector<dataset>& sources,
transfer_datasets(const server& srv, const std::vector<dataset>& sources,
                  const std::vector<dataset>& targets);
                  const std::vector<dataset>& targets);


/**
 * Request the transfer of a single dataset.
 * This function is a convenience wrapper around the previous one.
 * It takes a single source and a single target dataset.
 *
 * @param srv The Cargo server that should execute the transfer.
 * @param source The input dataset that should be transferred.
 * @param target The output dataset that should be generated.
 * @return A transfer
 */
transfer
transfer_dataset(const server& srv, const dataset& source,
                 const dataset& target);

} // namespace cargo
} // namespace cargo


#endif // CARGO_HPP
#endif // CARGO_HPP

lib/cargo/error.hpp

0 → 100644
+120 −0
Original line number Original line Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#ifndef CARGO_ERROR_HPP
#define CARGO_ERROR_HPP

#include <cstdint>
#include <string_view>

namespace cargo {

enum class error_category : std::uint32_t {
    generic_error = 0,
    system_error = 1,
    mpi_error = 2,
};

class error_code {

    enum class error_value : std::uint32_t {
        success = 0,
        snafu = 1,
        not_implemented = 2,
        no_such_transfer = 3,
        transfer_in_progress = 4,
    };

public:
    static const error_code success;
    static const error_code snafu;
    static const error_code not_implemented;
    static const error_code no_such_transfer;
    static const error_code transfer_in_progress;

    constexpr error_code() : error_code(error_value::success) {}
    constexpr explicit error_code(error_value v)
        : m_category(error_category::generic_error), m_value(v) {}
    constexpr error_code(error_category c, std::uint32_t v)
        : m_category(c), m_value(static_cast<error_value>(v)) {}

    constexpr explicit operator bool() const {
        return m_value != error_value::success;
    }

    [[nodiscard]] constexpr error_category
    category() const {
        return m_category;
    }

    [[nodiscard]] constexpr std::uint32_t
    value() const {
        return static_cast<uint32_t>(m_value);
    }

    [[nodiscard]] std::string_view
    name() const;

    [[nodiscard]] std::string
    message() const;

    template <typename Archive>
    void
    serialize(Archive&& ar, std::uint32_t version) {
        (void) version;
        ar & m_category;
        ar & m_value;
    }

private:
    error_category m_category;
    error_value m_value;
};

constexpr error_code error_code::success{error_value::success};
constexpr error_code error_code::snafu{error_value::snafu};
constexpr error_code error_code::not_implemented{error_value::not_implemented};
constexpr error_code error_code::no_such_transfer{
        error_value::no_such_transfer};
constexpr error_code error_code::transfer_in_progress{
        error_value::transfer_in_progress};

static constexpr cargo::error_code
make_system_error(std::uint32_t ec) {
    return cargo::error_code{cargo::error_category::system_error, ec};
}

static constexpr cargo::error_code
make_mpi_error(std::uint32_t ec) {
    return cargo::error_code{cargo::error_category::mpi_error, ec};
}

constexpr bool
operator==(const error_code& lhs, const error_code& rhs) {
    return lhs.category() == rhs.category() && lhs.value() == rhs.value();
}

} // namespace cargo

#endif // CARGO_ERROR_HPP

lib/error.cpp

0 → 100644
+320 −0
Original line number Original line Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include <system_error>
#include <boost/mpi/error_string.hpp>
#include "cargo/error.hpp"

// clang-format off
#define EXPAND(s) case s: return #s
// clang-format on

constexpr std::string_view
errno_name(int ec) {
    switch(ec) {
        EXPAND(EPERM);
        EXPAND(ENOENT);
        EXPAND(ESRCH);
        EXPAND(EINTR);
        EXPAND(EIO);
        EXPAND(ENXIO);
        EXPAND(E2BIG);
        EXPAND(ENOEXEC);
        EXPAND(EBADF);
        EXPAND(ECHILD);
        EXPAND(EAGAIN);
        EXPAND(ENOMEM);
        EXPAND(EACCES);
        EXPAND(EFAULT);
        EXPAND(ENOTBLK);
        EXPAND(EBUSY);
        EXPAND(EEXIST);
        EXPAND(EXDEV);
        EXPAND(ENODEV);
        EXPAND(ENOTDIR);
        EXPAND(EISDIR);
        EXPAND(EINVAL);
        EXPAND(ENFILE);
        EXPAND(EMFILE);
        EXPAND(ENOTTY);
        EXPAND(ETXTBSY);
        EXPAND(EFBIG);
        EXPAND(ENOSPC);
        EXPAND(ESPIPE);
        EXPAND(EROFS);
        EXPAND(EMLINK);
        EXPAND(EPIPE);
        EXPAND(EDOM);
        EXPAND(ERANGE);
        EXPAND(EDEADLK);
        EXPAND(ENAMETOOLONG);
        EXPAND(ENOLCK);
        EXPAND(ENOSYS);
        EXPAND(ENOTEMPTY);
        EXPAND(ELOOP);
        // EXPAND(EWOULDBLOCK);
        EXPAND(ENOMSG);
        EXPAND(EIDRM);
        EXPAND(ECHRNG);
        EXPAND(EL2NSYNC);
        EXPAND(EL3HLT);
        EXPAND(EL3RST);
        EXPAND(ELNRNG);
        EXPAND(EUNATCH);
        EXPAND(ENOCSI);
        EXPAND(EL2HLT);
        EXPAND(EBADE);
        EXPAND(EBADR);
        EXPAND(EXFULL);
        EXPAND(ENOANO);
        EXPAND(EBADRQC);
        EXPAND(EBADSLT);
        // EXPAND(EDEADLOCK);
        EXPAND(EBFONT);
        EXPAND(ENOSTR);
        EXPAND(ENODATA);
        EXPAND(ETIME);
        EXPAND(ENOSR);
        EXPAND(ENONET);
        EXPAND(ENOPKG);
        EXPAND(EREMOTE);
        EXPAND(ENOLINK);
        EXPAND(EADV);
        EXPAND(ESRMNT);
        EXPAND(ECOMM);
        EXPAND(EPROTO);
        EXPAND(EMULTIHOP);
        EXPAND(EDOTDOT);
        EXPAND(EBADMSG);
        EXPAND(EOVERFLOW);
        EXPAND(ENOTUNIQ);
        EXPAND(EBADFD);
        EXPAND(EREMCHG);
        EXPAND(ELIBACC);
        EXPAND(ELIBBAD);
        EXPAND(ELIBSCN);
        EXPAND(ELIBMAX);
        EXPAND(ELIBEXEC);
        EXPAND(EILSEQ);
        EXPAND(ERESTART);
        EXPAND(ESTRPIPE);
        EXPAND(EUSERS);
        EXPAND(ENOTSOCK);
        EXPAND(EDESTADDRREQ);
        EXPAND(EMSGSIZE);
        EXPAND(EPROTOTYPE);
        EXPAND(ENOPROTOOPT);
        EXPAND(EPROTONOSUPPORT);
        EXPAND(ESOCKTNOSUPPORT);
        EXPAND(EOPNOTSUPP);
        EXPAND(EPFNOSUPPORT);
        EXPAND(EAFNOSUPPORT);
        EXPAND(EADDRINUSE);
        EXPAND(EADDRNOTAVAIL);
        EXPAND(ENETDOWN);
        EXPAND(ENETUNREACH);
        EXPAND(ENETRESET);
        EXPAND(ECONNABORTED);
        EXPAND(ECONNRESET);
        EXPAND(ENOBUFS);
        EXPAND(EISCONN);
        EXPAND(ENOTCONN);
        EXPAND(ESHUTDOWN);
        EXPAND(ETOOMANYREFS);
        EXPAND(ETIMEDOUT);
        EXPAND(ECONNREFUSED);
        EXPAND(EHOSTDOWN);
        EXPAND(EHOSTUNREACH);
        EXPAND(EALREADY);
        EXPAND(EINPROGRESS);
        EXPAND(ESTALE);
        EXPAND(EUCLEAN);
        EXPAND(ENOTNAM);
        EXPAND(ENAVAIL);
        EXPAND(EISNAM);
        EXPAND(EREMOTEIO);
        EXPAND(EDQUOT);
        EXPAND(ENOMEDIUM);
        EXPAND(EMEDIUMTYPE);
        EXPAND(ECANCELED);
        EXPAND(ENOKEY);
        EXPAND(EKEYEXPIRED);
        EXPAND(EKEYREVOKED);
        EXPAND(EKEYREJECTED);
        EXPAND(EOWNERDEAD);
        EXPAND(ENOTRECOVERABLE);
        EXPAND(ERFKILL);
        EXPAND(EHWPOISON);
        default:
            return "EUNKNOWN";
    }
}

constexpr std::string_view
mpi_error_name(int ec) {

    switch(ec) {
        EXPAND(MPI_SUCCESS);
        EXPAND(MPI_ERR_BUFFER);
        EXPAND(MPI_ERR_COUNT);
        EXPAND(MPI_ERR_TYPE);
        EXPAND(MPI_ERR_TAG);
        EXPAND(MPI_ERR_COMM);
        EXPAND(MPI_ERR_RANK);
        EXPAND(MPI_ERR_REQUEST);
        EXPAND(MPI_ERR_ROOT);
        EXPAND(MPI_ERR_GROUP);
        EXPAND(MPI_ERR_OP);
        EXPAND(MPI_ERR_TOPOLOGY);
        EXPAND(MPI_ERR_DIMS);
        EXPAND(MPI_ERR_ARG);
        EXPAND(MPI_ERR_UNKNOWN);
        EXPAND(MPI_ERR_TRUNCATE);
        EXPAND(MPI_ERR_OTHER);
        EXPAND(MPI_ERR_INTERN);
        EXPAND(MPI_ERR_IN_STATUS);
        EXPAND(MPI_ERR_PENDING);
        EXPAND(MPI_ERR_ACCESS);
        EXPAND(MPI_ERR_AMODE);
        EXPAND(MPI_ERR_ASSERT);
        EXPAND(MPI_ERR_BAD_FILE);
        EXPAND(MPI_ERR_BASE);
        EXPAND(MPI_ERR_CONVERSION);
        EXPAND(MPI_ERR_DISP);
        EXPAND(MPI_ERR_DUP_DATAREP);
        EXPAND(MPI_ERR_FILE_EXISTS);
        EXPAND(MPI_ERR_FILE_IN_USE);
        EXPAND(MPI_ERR_FILE);
        EXPAND(MPI_ERR_INFO_KEY);
        EXPAND(MPI_ERR_INFO_NOKEY);
        EXPAND(MPI_ERR_INFO_VALUE);
        EXPAND(MPI_ERR_INFO);
        EXPAND(MPI_ERR_IO);
        EXPAND(MPI_ERR_KEYVAL);
        EXPAND(MPI_ERR_LOCKTYPE);
        EXPAND(MPI_ERR_NAME);
        EXPAND(MPI_ERR_NO_MEM);
        EXPAND(MPI_ERR_NOT_SAME);
        EXPAND(MPI_ERR_NO_SPACE);
        EXPAND(MPI_ERR_NO_SUCH_FILE);
        EXPAND(MPI_ERR_PORT);
        EXPAND(MPI_ERR_QUOTA);
        EXPAND(MPI_ERR_READ_ONLY);
        EXPAND(MPI_ERR_RMA_CONFLICT);
        EXPAND(MPI_ERR_RMA_SYNC);
        EXPAND(MPI_ERR_SERVICE);
        EXPAND(MPI_ERR_SIZE);
        EXPAND(MPI_ERR_SPAWN);
        EXPAND(MPI_ERR_UNSUPPORTED_DATAREP);
        EXPAND(MPI_ERR_UNSUPPORTED_OPERATION);
        EXPAND(MPI_ERR_WIN);
        EXPAND(MPI_T_ERR_MEMORY);
        EXPAND(MPI_T_ERR_NOT_INITIALIZED);
        EXPAND(MPI_T_ERR_CANNOT_INIT);
        EXPAND(MPI_T_ERR_INVALID_INDEX);
        EXPAND(MPI_T_ERR_INVALID_ITEM);
        EXPAND(MPI_T_ERR_INVALID_HANDLE);
        EXPAND(MPI_T_ERR_OUT_OF_HANDLES);
        EXPAND(MPI_T_ERR_OUT_OF_SESSIONS);
        EXPAND(MPI_T_ERR_INVALID_SESSION);
        EXPAND(MPI_T_ERR_CVAR_SET_NOT_NOW);
        EXPAND(MPI_T_ERR_CVAR_SET_NEVER);
        EXPAND(MPI_T_ERR_PVAR_NO_STARTSTOP);
        EXPAND(MPI_T_ERR_PVAR_NO_WRITE);
        EXPAND(MPI_T_ERR_PVAR_NO_ATOMIC);
        EXPAND(MPI_ERR_RMA_RANGE);
        EXPAND(MPI_ERR_RMA_ATTACH);
        EXPAND(MPI_ERR_RMA_FLAVOR);
        EXPAND(MPI_ERR_RMA_SHARED);
        EXPAND(MPI_T_ERR_INVALID);
        EXPAND(MPI_T_ERR_INVALID_NAME);
        default:
            return "MPI_ERR_UNKNOWN";
    }
}

namespace cargo {

[[nodiscard]] std::string_view
error_code::name() const {

    switch(m_category) {
        case error_category::generic_error:
            break;
        case error_category::system_error:
            return errno_name(static_cast<int>(m_value));
        case error_category::mpi_error:
            return mpi_error_name(static_cast<int>(m_value));
        default:
            return "CARGO_UNKNOWN_ERROR";
    }

    switch(m_value) {
        case error_value::success:
            return "CARGO_SUCCESS";
        case error_value::snafu:
            return "CARGO_SNAFU";
        case error_value::not_implemented:
            return "CARGO_NOT_IMPLEMENTED";
        case error_value::no_such_transfer:
            return "CARGO_NO_SUCH_TRANSFER";
        case error_value::transfer_in_progress:
            return "CARGO_TRANSFER_IN_PROGRESS";
        default:
            return "CARGO_UNKNOWN_ERROR";
    }
};

[[nodiscard]] std::string
error_code::message() const {

    switch(m_category) {
        case error_category::generic_error:
            switch(m_value) {
                case error_value::success:
                    return "success";
                case error_value::snafu:
                    return "snafu";
                case error_value::not_implemented:
                    return "not implemented";
                case error_value::no_such_transfer:
                    return "no such transfer";
                case error_value::transfer_in_progress:
                    return "transfer in progress";
                default:
                    return "unknown error";
            }
        case error_category::system_error: {
            std::error_code std_ec =
                    std::make_error_code(static_cast<std::errc>(m_value));
            return std_ec.message();
        }
        case error_category::mpi_error:
            return boost::mpi::error_string(static_cast<int>(m_value));
        default:
            return "unknown error category";
    }
}

} // namespace cargo
 No newline at end of file
+46 −1
Original line number Original line Diff line number Diff line
@@ -27,7 +27,9 @@


#include <iomanip>
#include <iomanip>
#include <string_view>
#include <string_view>
#include <optional>
#include <fmt/format.h>
#include <fmt/format.h>
#include "cargo/error.hpp"


namespace cargo {
namespace cargo {


@@ -64,9 +66,52 @@ struct fmt::formatter<cargo::transfer> : formatter<std::string_view> {
    template <typename FormatContext>
    template <typename FormatContext>
    auto
    auto
    format(const cargo::transfer& tx, FormatContext& ctx) const {
    format(const cargo::transfer& tx, FormatContext& ctx) const {
        const auto str = fmt::format("{{id: {}}}", tx.id());
        const auto str = fmt::format("{{tid: {}}}", tx.id());
        return formatter<std::string_view>::format(str, ctx);
        return formatter<std::string_view>::format(str, ctx);
    }
    }
};
};


template <>
struct fmt::formatter<cargo::error_code> : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
    template <typename FormatContext>
    auto
    format(const cargo::error_code& ec, FormatContext& ctx) const {
        return formatter<std::string_view>::format(ec.name(), ctx);
    }
};

template <>
struct fmt::formatter<cargo::transfer_state> : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
    template <typename FormatContext>
    auto
    format(const cargo::transfer_state& s, FormatContext& ctx) const {
        switch(s) {
            case cargo::transfer_state::pending:
                return formatter<std::string_view>::format("pending", ctx);
            case cargo::transfer_state::running:
                return formatter<std::string_view>::format("running", ctx);
            case cargo::transfer_state::completed:
                return formatter<std::string_view>::format("completed", ctx);
            case cargo::transfer_state::failed:
                return formatter<std::string_view>::format("failed", ctx);
            default:
                return formatter<std::string_view>::format("unknown", ctx);
        }
    }
};

template <typename T>
struct fmt::formatter<std::optional<T>> : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
    template <typename FormatContext>
    auto
    format(const std::optional<T>& v, FormatContext& ctx) const {
        return formatter<std::string_view>::format(
                v ? fmt::format("{}", v.value()) : "none", ctx);
    }
};


#endif // CARGO_FMT_FORMATTERS_HPP
#endif // CARGO_FMT_FORMATTERS_HPP
Loading