Verified Commit db6ac1bf authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Reformat RPC logging and add peer address information

parent 75748bfe
Loading
Loading
Loading
Loading
+119 −0
Original line number Diff line number Diff line
@@ -73,6 +73,13 @@ struct margo_context {
// forward declarations
struct endpoint;

namespace utils {

std::string
get_address(hg_handle_t h);

} // namespace utils

struct engine {

    enum class execution_mode : bool {
@@ -123,6 +130,74 @@ struct engine {
    endpoint
    lookup(const std::string& address) const;

    std::string
    self_address() const {

        struct addr_handle {
            addr_handle(margo_instance_id mid, hg_addr_t addr)
                : m_mid(mid), m_addr(addr) {}

            ~addr_handle() {
                if(m_addr) {
                    margo_addr_free(m_mid, m_addr);
                }
            }

            hg_addr_t
            native() const {
                return m_addr;
            }

            margo_instance_id m_mid;
            hg_addr_t m_addr;
        };

        const auto self_addr = addr_handle{
                m_context->m_mid, [mid = m_context->m_mid]() -> hg_addr_t {
                    hg_addr_t tmp;

                    hg_return_t ret = margo_addr_self(mid, &tmp);

                    if(ret != HG_SUCCESS) {
                        LOGGER_WARN(fmt::format(
                                "Error finding out self address: {}",
                                HG_Error_to_string(ret)));
                        return nullptr;
                    }

                    return tmp;
                }()};

        if(!self_addr.native()) {
            return "unknown";
        }

        hg_size_t expected_length;
        hg_return_t ret =
                margo_addr_to_string(m_context->m_mid, nullptr,
                                     &expected_length, self_addr.native());

        if(ret != HG_SUCCESS) {
            LOGGER_WARN(fmt::format("Error finding out self address: {}",
                                    HG_Error_to_string(ret)));
            return "unknown";
        }

        std::vector<char> tmp;
        tmp.reserve(expected_length);

        ret = margo_addr_to_string(m_context->m_mid, tmp.data(),
                                   &expected_length, self_addr.native());

        if(ret != HG_SUCCESS) {
            LOGGER_WARN(fmt::format("Error finding out self address: {}",
                                    HG_Error_to_string(ret)));
            return "unknown";
        }

        return {tmp.data()};
    }

    std::shared_ptr<detail::margo_context> m_context;
};

@@ -149,6 +224,11 @@ public:
        return m_handle;
    }

    std::string
    origin() const {
        return utils::get_address(m_handle);
    }

private:
    hg_handle_t m_handle;
    Output m_output;
@@ -299,6 +379,45 @@ struct rpc_acceptor : engine {
        : engine(format_address(protocol, bind_address, port)) {}
};

namespace utils {

inline std::string
get_address(hg_handle_t h) {

    const hg_info* hgi = margo_get_info(h);

    if(!hgi) {
        LOGGER_WARN("Unable to get information from hg_handle");
        return "unknown";
    }

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    hg_size_t expected_length;
    hg_return_t ret =
            margo_addr_to_string(mid, nullptr, &expected_length, hgi->addr);

    if(ret != HG_SUCCESS) {
        LOGGER_WARN("Error finding out client address: {}",
                    HG_Error_to_string(ret));
        return "unknown";
    }

    std::vector<char> tmp;
    tmp.reserve(expected_length);

    ret = margo_addr_to_string(mid, tmp.data(), &expected_length, hgi->addr);

    if(ret != HG_SUCCESS) {
        LOGGER_WARN("Error finding out client address: {}",
                    HG_Error_to_string(ret));
        return "unknown";
    }

    return {tmp.data()};
}

} // namespace utils

} // namespace scord::network

+58 −32
Original line number Diff line number Diff line
@@ -29,6 +29,8 @@
#include <admire_types.hpp>
#include "impl.hpp"

using namespace std::literals;

void
rpc_registration_cb(scord::network::rpc_client* client) {

@@ -154,7 +156,7 @@ rpc_registration_cb(scord::network::rpc_client* client) {
                 ADM_get_statistics_out_t, NULL, true);
}

namespace admire {
namespace api {

struct remote_procedure {
    static std::uint64_t
@@ -164,7 +166,7 @@ struct remote_procedure {
    }
};

} // namespace admire
} // namespace api

namespace admire::detail {

@@ -173,14 +175,18 @@ ping(const server& srv) {

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    const auto rpc_id = admire::remote_procedure::new_id();
    const auto rpc_id = ::api::remote_procedure::new_id();

    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("RPC ID {} (ADM_{}) => {{}}", rpc_id, __FUNCTION__);
    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()));

    const auto rpc = endp.call("ADM_ping");

    LOGGER_INFO("RPC ID {} (ADM_{}) <= {{retval: {}}}", rpc_id, __FUNCTION__,
    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()),
                ADM_SUCCESS);
    return ADM_SUCCESS;
}
@@ -190,11 +196,13 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    const auto rpc_id = admire::remote_procedure::new_id();
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("RPC ID {} (ADM_{}) => {{job_requirements: {}}}", rpc_id,
                __FUNCTION__, reqs);
    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job_requirements: "
                "{}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), reqs);

    auto rpc_reqs = api::convert(reqs);

@@ -204,15 +212,20 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {
    const auto rpc = endp.call("ADM_register_job", &in, &out);

    if(out.retval < 0) {
        LOGGER_ERROR("RPC ID {} (ADM_{}) <= {}", rpc_id, __FUNCTION__,
                     out.retval);
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {}", rpc_id,
                     std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), out.retval);
        return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
    }

    const admire::job job = api::convert(out.job);

    LOGGER_INFO("RPC ID {} (ADM_{}) <= {{retval: {}, job: {}}}", rpc_id,
                __FUNCTION__, ADM_SUCCESS, job.id());
    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}, job: "
                "{}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), ADM_SUCCESS, job.id()

    );

    return job;
}
@@ -222,11 +235,14 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) {

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    const auto rpc_id = admire::remote_procedure::new_id();
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("RPC ID {} (ADM_{}) => {{job: {}, job_requirements: {}}}",
                rpc_id, __FUNCTION__, job, reqs);
    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job: {}, "
                "job_requirements: "
                "{}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), job, reqs);

    const auto rpc_job = api::convert(job);
    const auto rpc_reqs = api::convert(reqs);
@@ -236,15 +252,16 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) {

    const auto rpc = endp.call("ADM_update_job", &in, &out);


    if(out.retval < 0) {
        const auto retval = static_cast<admire::error_code>(out.retval);
        LOGGER_ERROR("RPC ID {} (ADM_{}) <= {{retval: {}}}", rpc_id,
                     __FUNCTION__, retval);
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {{retval: {}}}",
                     rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), retval);
        return retval;
    }

    LOGGER_INFO("RPC ID {} (ADM_{}) <= {{retval: {}}}", rpc_id, __FUNCTION__,
    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()),
                ADM_SUCCESS);
    return ADM_SUCCESS;
}
@@ -254,10 +271,12 @@ remove_job(const server& srv, const job& job) {

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    const auto rpc_id = admire::remote_procedure::new_id();
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("RPC ID {} (ADM_{}) => {{job: {}}}", rpc_id, __FUNCTION__, job);
    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), job);

    const auto rpc_job = api::convert(job);

@@ -268,12 +287,14 @@ remove_job(const server& srv, const job& job) {

    if(out.retval < 0) {
        const auto retval = static_cast<admire::error_code>(out.retval);
        LOGGER_ERROR("RPC ID {} (ADM_{}) <= {{retval: {}}}", rpc_id,
                     __FUNCTION__, retval);
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {{retval: {}}}",
                     rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), retval);
        return retval;
    }

    LOGGER_INFO("RPC ID {} (ADM_{}) <= {{retval: {}}}", rpc_id, __FUNCTION__,
    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()),
                ADM_SUCCESS);
    return ADM_SUCCESS;
}
@@ -287,12 +308,14 @@ transfer_datasets(const server& srv, const job& job,

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    const auto rpc_id = admire::remote_procedure::new_id();
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("RPC ID {} (ADM_{}) => {{job: {}, sources: {}, targets: {}, "
                "limits: {}, mapping: {}}}",
                rpc_id, __FUNCTION__, job, sources, targets, limits, mapping);
    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job: {}, sources: {}, "
                "targets: {}, limits: {}, mapping: {}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), job, sources, targets,
                limits, mapping);

    const auto rpc_job = api::convert(job);
    const auto rpc_sources = api::convert(sources);
@@ -308,15 +331,18 @@ transfer_datasets(const server& srv, const job& job,
            endp.call("ADM_transfer_datasets", &in, &out);

    if(out.retval < 0) {
        LOGGER_ERROR("RPC ID {} (ADM_{}) <= {{retval: {}}}", rpc_id,
                     __FUNCTION__, out.retval);
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {{retval: {}}}",
                     rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), out.retval);
        return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
    }

    const admire::transfer tx = api::convert(out.tx);

    LOGGER_INFO("RPC ID {} (ADM_{}) <= {{retval: {}, transfer: {}}}", rpc_id,
                __FUNCTION__, ADM_SUCCESS, tx);
    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}, transfer: "
                "{}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), ADM_SUCCESS, tx);
    return tx;
}

+53 −15
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@
 *****************************************************************************/

#include <logger/logger.hpp>
#include <net/engine.hpp>
#include <net/proto/rpc_types.h>
#include <admire.hpp>
#include <api/convert.hpp>
@@ -39,14 +40,19 @@ struct remote_procedure {
static void
ADM_ping(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("RPC ID {} ({}) => {{}}", id, __FUNCTION__);

    LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__,
    LOGGER_INFO("rpc id: {} name: {}, from: {} => body: {{}}", id,
                std::quoted(__FUNCTION__), std::quoted(get_address(h)));

    LOGGER_INFO("rpc id: {} name: {}, to: {} <= body: {{retval: {}}}", id,
                std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                ADM_SUCCESS);

    ret = margo_destroy(h);
@@ -58,6 +64,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_ping);
static void
ADM_register_job(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_register_job_in_t in;
@@ -71,7 +79,9 @@ ADM_register_job(hg_handle_t h) {
    const admire::job_requirements reqs(&in.reqs);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("RPC ID {} ({}) => {{job_requirements: {}}}", id, __FUNCTION__,
    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job_requirements: "
                "{}}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                reqs);

    const auto job = admire::job{42};
@@ -81,8 +91,9 @@ ADM_register_job(hg_handle_t h) {
    out.retval = rv;
    out.job = admire::api::convert(job).release();

    LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, job: {}}}", id, __FUNCTION__,
                rv, job);
    LOGGER_INFO("rpc id: {} name: {} to: {} <= body: {{retval: {}, job: {}}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv,
                job);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);
@@ -100,6 +111,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_job);
static void
ADM_update_job(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_update_job_in_t in;
@@ -114,13 +127,14 @@ ADM_update_job(hg_handle_t h) {
    const admire::job_requirements reqs(&in.reqs);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("RPC ID {} ({}) => {{job: {}, job_requirements: {}}}", id,
                __FUNCTION__, job, reqs);
    LOGGER_INFO("RPC ID {} ({}) => body: {{job: {}, job_requirements: {}}}", id,
                std::quoted(__FUNCTION__), job, reqs);

    admire::error_code rv = ADM_SUCCESS;
    out.retval = rv;

    LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv);
    LOGGER_INFO("RPC ID {} ({}) <= body: {{retval: {}}}", id,
                std::quoted(__FUNCTION__), rv);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);
@@ -138,6 +152,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_job);
static void
ADM_remove_job(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_remove_job_in_t in;
@@ -151,12 +167,14 @@ ADM_remove_job(hg_handle_t h) {
    const admire::job job(in.job);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("RPC ID {} ({}) => {{job: {}}}", id, __FUNCTION__, job);
    LOGGER_INFO("RPC ID {} ({}) => body: {{job: {}}}", id,
                std::quoted(__FUNCTION__), job);

    admire::error_code rv = ADM_SUCCESS;
    out.retval = rv;

    LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv);
    LOGGER_INFO("RPC ID {} ({}) <= body: {{retval: {}}}", id,
                std::quoted(__FUNCTION__), rv);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);
@@ -173,6 +191,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_remove_job);
static void
ADM_register_adhoc_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_register_adhoc_storage_in_t in;
@@ -204,6 +224,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_adhoc_storage);
static void
ADM_update_adhoc_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_update_adhoc_storage_in_t in;
@@ -235,6 +257,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage);
static void
ADM_remove_adhoc_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_remove_adhoc_storage_in_t in;
@@ -266,6 +290,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage);
static void
ADM_deploy_adhoc_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_deploy_adhoc_storage_in_t in;
@@ -297,6 +323,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage);
static void
ADM_register_pfs_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_register_pfs_storage_in_t in;
@@ -328,6 +356,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_pfs_storage);
static void
ADM_update_pfs_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_update_pfs_storage_in_t in;
@@ -359,6 +389,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_pfs_storage);
static void
ADM_remove_pfs_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_remove_pfs_storage_in_t in;
@@ -974,6 +1006,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_in_transit_ops)
static void
ADM_transfer_datasets(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_transfer_datasets_in_t in;
@@ -994,9 +1028,11 @@ ADM_transfer_datasets(hg_handle_t h) {
    const auto mapping = static_cast<admire::transfer::mapping>(in.mapping);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("RPC ID {} ({}) => {{job: {}, sources: {}, targets: {}, "
                "limits: {}, mapping: {}}}",
                id, __FUNCTION__, job, sources, targets, limits, mapping);
    LOGGER_INFO(
            "rpc id: {} name: {} from: {} => "
            "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}",
            id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job,
            sources, targets, limits, mapping);

    admire::error_code rv = ADM_SUCCESS;

@@ -1005,8 +1041,10 @@ ADM_transfer_datasets(hg_handle_t h) {
    out.retval = rv;
    out.tx = admire::api::convert(transfer).release();

    LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, transfer: {}}}", id,
                __FUNCTION__, rv, transfer);
    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}, transfer: {}}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv,
                transfer);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);