Verified Commit d8b2177c authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add op_id field to reply RPCs

parent f8c52b43
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -259,6 +259,7 @@ MERCURY_GEN_PROC(

MERCURY_GEN_PROC(
    ADM_register_job_out_t,
        ((hg_uint64_t) (op_id))
        ((int32_t) (retval))
        ((ADM_job_t) (job))
);
@@ -272,6 +273,7 @@ MERCURY_GEN_PROC(

MERCURY_GEN_PROC(
    ADM_update_job_out_t,
        ((hg_uint64_t) (op_id))
        ((int32_t) (retval))
);

@@ -283,6 +285,7 @@ MERCURY_GEN_PROC(

MERCURY_GEN_PROC(
    ADM_remove_job_out_t,
        ((hg_uint64_t) (op_id))
        ((int32_t) (retval))
);

@@ -421,6 +424,7 @@ MERCURY_GEN_PROC(

MERCURY_GEN_PROC(
    ADM_transfer_datasets_out_t,
        ((hg_uint64_t) (op_id))
        ((hg_int32_t) (retval))
        ((ADM_transfer_t) (tx)))

+47 −40
Original line number Diff line number Diff line
@@ -179,15 +179,17 @@ ping(const server& srv) {

    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__),
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()));

    const auto rpc = endp.call("ADM_ping");

    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()),
                ADM_SUCCESS);
    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), ADM_SUCCESS, "n/a");
    return ADM_SUCCESS;
}

@@ -199,8 +201,8 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job_requirements: "
                "{}}}",
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{job_requirements: {}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), reqs);

@@ -212,20 +214,19 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {
    const auto rpc = endp.call("ADM_register_job", &in, &out);

    if(out.retval < 0) {
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {}", rpc_id,
                     std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), out.retval);
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
                     "body: {} [op_id: {}]",
                     rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), out.retval, out.op_id);
        return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
    }

    const admire::job job = api::convert(out.job);

    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}, job: "
                "{}}}",
    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}, job: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), ADM_SUCCESS, job.id()

    );
                std::quoted(rpc.origin()), ADM_SUCCESS, job.id(), out.op_id);

    return job;
}
@@ -238,9 +239,8 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) {
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job: {}, "
                "job_requirements: "
                "{}}}",
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{job: {}, job_requirements: {}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), job, reqs);

@@ -254,15 +254,17 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) {

    if(out.retval < 0) {
        const auto retval = static_cast<admire::error_code>(out.retval);
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {{retval: {}}}",
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
                     "body: {{retval: {}}} [op_id: {}]",
                     rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), retval);
                     std::quoted(rpc.origin()), retval, out.op_id);
        return retval;
    }

    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()),
                ADM_SUCCESS);
    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), ADM_SUCCESS, out.op_id);
    return ADM_SUCCESS;
}

@@ -274,8 +276,9 @@ remove_job(const server& srv, const job& job) {
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__),
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{job: {}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), job);

    const auto rpc_job = api::convert(job);
@@ -287,15 +290,17 @@ remove_job(const server& srv, const job& job) {

    if(out.retval < 0) {
        const auto retval = static_cast<admire::error_code>(out.retval);
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {{retval: {}}}",
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
                     "body: {{retval: {}}} [op_id: {}]",
                     rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), retval);
                     std::quoted(rpc.origin()), retval, out.op_id);
        return retval;
    }

    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}}}", rpc_id,
                std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()),
                ADM_SUCCESS);
    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), ADM_SUCCESS, out.op_id);
    return ADM_SUCCESS;
}

@@ -311,8 +316,9 @@ transfer_datasets(const server& srv, const job& job,
    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job: {}, sources: {}, "
                "targets: {}, limits: {}, mapping: {}}}",
    LOGGER_INFO(
            "rpc id: {} name: {} from: {} => "
            "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}",
            rpc_id, std::quoted("ADM_"s + __FUNCTION__),
            std::quoted(rpc_client.self_address()), job, sources, targets,
            limits, mapping);
@@ -331,18 +337,19 @@ transfer_datasets(const server& srv, const job& job,
            endp.call("ADM_transfer_datasets", &in, &out);

    if(out.retval < 0) {
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= body: {{retval: {}}}",
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
                     "body: {{retval: {}}} [op_id: {}]",
                     rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), out.retval);
                     std::quoted(rpc.origin()), out.retval, out.op_id);
        return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
    }

    const admire::transfer tx = api::convert(out.tx);

    LOGGER_INFO("rpc id: {} name: {} from: {} <= body: {{retval: {}, transfer: "
                "{}}}",
    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}, transfer: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), ADM_SUCCESS, tx);
                std::quoted(rpc.origin()), ADM_SUCCESS, tx, out.op_id);
    return tx;
}

+26 −15
Original line number Diff line number Diff line
@@ -48,11 +48,13 @@ ADM_ping(hg_handle_t h) {

    const auto id = remote_procedure::new_id();

    LOGGER_INFO("rpc id: {} name: {}, from: {} => body: {{}}", id,
                std::quoted(__FUNCTION__), std::quoted(get_address(h)));
    LOGGER_INFO("rpc id: {} name: {}, from: {} => "
                "body: {{}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)));

    LOGGER_INFO("rpc id: {} name: {}, to: {} <= body: {{retval: {}}}", id,
                std::quoted(__FUNCTION__), std::quoted(get_address(h)),
    LOGGER_INFO("rpc id: {} name: {}, to: {} <= "
                "body: {{retval: {}}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                ADM_SUCCESS);

    ret = margo_destroy(h);
@@ -79,8 +81,8 @@ ADM_register_job(hg_handle_t h) {
    const admire::job_requirements reqs(&in.reqs);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("rpc id: {} name: {} from: {} => body: {{job_requirements: "
                "{}}}",
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{job_requirements: {}}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                reqs);

@@ -88,10 +90,12 @@ ADM_register_job(hg_handle_t h) {

    admire::error_code rv = ADM_SUCCESS;

    out.op_id = id;
    out.retval = rv;
    out.job = admire::api::convert(job).release();

    LOGGER_INFO("rpc id: {} name: {} to: {} <= body: {{retval: {}, job: {}}}",
    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}, job: {}}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv,
                job);

@@ -127,14 +131,17 @@ ADM_update_job(hg_handle_t h) {
    const admire::job_requirements reqs(&in.reqs);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("RPC ID {} ({}) => body: {{job: {}, job_requirements: {}}}", id,
                std::quoted(__FUNCTION__), job, reqs);
    LOGGER_INFO("RPC ID {} ({}) => "
                "body: {{job: {}, job_requirements: {}}}",
                id, std::quoted(__FUNCTION__), job, reqs);

    admire::error_code rv = ADM_SUCCESS;
    out.op_id = id;
    out.retval = rv;

    LOGGER_INFO("RPC ID {} ({}) <= body: {{retval: {}}}", id,
                std::quoted(__FUNCTION__), rv);
    LOGGER_INFO("RPC ID {} ({}) <= "
                "body: {{retval: {}}}",
                id, std::quoted(__FUNCTION__), rv);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);
@@ -167,14 +174,17 @@ ADM_remove_job(hg_handle_t h) {
    const admire::job job(in.job);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO("RPC ID {} ({}) => body: {{job: {}}}", id,
                std::quoted(__FUNCTION__), job);
    LOGGER_INFO("RPC ID {} ({}) => "
                "body: {{job: {}}}",
                id, std::quoted(__FUNCTION__), job);

    admire::error_code rv = ADM_SUCCESS;
    out.op_id = id;
    out.retval = rv;

    LOGGER_INFO("RPC ID {} ({}) <= body: {{retval: {}}}", id,
                std::quoted(__FUNCTION__), rv);
    LOGGER_INFO("RPC ID {} ({}) <= "
                "body: {{retval: {}}}",
                id, std::quoted(__FUNCTION__), rv);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);
@@ -1038,6 +1048,7 @@ ADM_transfer_datasets(hg_handle_t h) {

    const auto transfer = admire::transfer{42};

    out.op_id = id;
    out.retval = rv;
    out.tx = admire::api::convert(transfer).release();