Verified Commit 1264ea40 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Fix memory leak when handling RPC output values

parent 65b81d22
Loading
Loading
Loading
Loading
+31 −9
Original line number Diff line number Diff line
@@ -126,6 +126,35 @@ struct engine {
    std::shared_ptr<detail::margo_context> m_context;
};

template <typename Output>
class rpc_handle {
public:
    rpc_handle(hg_handle_t handle, Output output)
        : m_handle(handle), m_output(output) {}

    ~rpc_handle() {

        if(m_handle) {

            if(m_output) {
                margo_free_output(m_handle, m_output);
            }

            margo_destroy(m_handle);
        }
    }

    hg_handle_t
    native() {
        return m_handle;
    }

private:
    hg_handle_t m_handle;
    Output m_output;
};


struct endpoint {
private:
    // Endpoints should only be created by calling engine::lookup()
@@ -186,7 +215,7 @@ public:
     *
     **/
    template <typename T1 = void*, typename T2 = void*>
    void
    [[nodiscard]] rpc_handle<T2>
    call(const std::string& id, T1 input = nullptr, T2 output = nullptr) {

        const auto it = m_margo_context->m_rpc_names.find(id);
@@ -218,14 +247,7 @@ public:
            ret = ::margo_get_output(handle, output);
        }


        ret = ::margo_destroy(handle);

        if(ret != HG_SUCCESS) {
            throw std::runtime_error(
                    fmt::format("Error during endpoint::call(): {}",
                                ::HG_Error_to_string(ret)));
        }
        return rpc_handle<T2>{handle, output};
    }

private:
+25 −23
Original line number Diff line number Diff line
@@ -230,7 +230,7 @@ register_adhoc_storage(const server& srv, ADM_job_t job,
    ADM_register_adhoc_storage_in_t in{};
    ADM_register_adhoc_storage_out_t out;

    endp.call("ADM_register_adhoc_storage", &in, &out);
    const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_register_adhoc_storage() = {}", out.ret);
@@ -258,7 +258,7 @@ update_adhoc_storage(const server& srv, ADM_job_t job, ADM_adhoc_context_t ctx,
    ADM_update_adhoc_storage_in_t in{};
    ADM_update_adhoc_storage_out_t out;

    endp.call("ADM_update_adhoc_storage", &in, &out);
    const auto rpc = endp.call("ADM_update_adhoc_storage", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_update_adhoc_storage() = {}", out.ret);
@@ -285,7 +285,7 @@ remove_adhoc_storage(const server& srv, ADM_job_t job,
    ADM_remove_adhoc_storage_in_t in{};
    ADM_remove_adhoc_storage_out_t out;

    endp.call("ADM_remove_adhoc_storage", &in, &out);
    const auto rpc = endp.call("ADM_remove_adhoc_storage", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_remove_adhoc_storage() = {}", out.ret);
@@ -312,7 +312,7 @@ deploy_adhoc_storage(const server& srv, ADM_job_t job,
    ADM_deploy_adhoc_storage_in_t in{};
    ADM_deploy_adhoc_storage_out_t out;

    endp.call("ADM_deploy_adhoc_storage", &in, &out);
    const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_deploy_adhoc_storage() = {}", out.ret);
@@ -340,7 +340,7 @@ register_pfs_storage(const server& srv, ADM_job_t job, ADM_pfs_context_t ctx,
    ADM_register_pfs_storage_in_t in{};
    ADM_register_pfs_storage_out_t out;

    endp.call("ADM_register_pfs_storage", &in, &out);
    const auto rpc = endp.call("ADM_register_pfs_storage", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_register_pfs_storage() = {}", out.ret);
@@ -368,7 +368,7 @@ update_pfs_storage(const server& srv, ADM_job_t job, ADM_pfs_context_t ctx,
    ADM_update_pfs_storage_in_t in{};
    ADM_update_pfs_storage_out_t out;

    endp.call("ADM_update_pfs_storage", &in, &out);
    const auto rpc = endp.call("ADM_update_pfs_storage", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_update_pfs_storage() = {}", out.ret);
@@ -395,7 +395,7 @@ remove_pfs_storage(const server& srv, ADM_job_t job,
    ADM_remove_pfs_storage_in_t in{};
    ADM_remove_pfs_storage_out_t out;

    endp.call("ADM_remove_pfs_storage", &in, &out);
    const auto rpc = endp.call("ADM_remove_pfs_storage", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_remove_pfs_storage() = {}", out.ret);
@@ -432,7 +432,7 @@ transfer_dataset(const server& srv, ADM_job_t job, ADM_dataset_t** sources,
    in.qos_constraints = "constraints";
    in.distribution = "distribution";

    endp.call("ADM_transfer_dataset", &in, &out);
    const auto rpc = endp.call("ADM_transfer_dataset", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_transfer_dataset() = {}", out.ret);
@@ -462,7 +462,7 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,

    in.info = "info";

    endp.call("ADM_set_dataset_information", &in, &out);
    const auto rpc = endp.call("ADM_set_dataset_information", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_set_dataset_information() = {}", out.ret);
@@ -492,7 +492,7 @@ set_io_resources(const server& srv, ADM_job_t job, ADM_storage_t tier,

    in.resources = "resources";

    endp.call("ADM_set_io_resources", &in, &out);
    const auto rpc = endp.call("ADM_set_io_resources", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_set_io_resources() = {}", out.ret);
@@ -520,7 +520,7 @@ get_transfer_priority(const server& srv, ADM_job_t job, ADM_transfer_t transfer,
    ADM_get_transfer_priority_in_t in{};
    ADM_get_transfer_priority_out_t out;

    endp.call("ADM_get_transfer_priority", &in, &out);
    const auto rpc = endp.call("ADM_get_transfer_priority", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_get_transfer_priority() = {}", out.ret);
@@ -548,7 +548,7 @@ set_transfer_priority(const server& srv, ADM_job_t job, ADM_transfer_t transfer,
    ADM_set_transfer_priority_in_t in{};
    ADM_set_transfer_priority_out_t out;

    endp.call("ADM_set_transfer_priority", &in, &out);
    const auto rpc = endp.call("ADM_set_transfer_priority", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_set_transfer_priority() = {}", out.ret);
@@ -575,7 +575,7 @@ cancel_transfer(const server& srv, ADM_job_t job, ADM_transfer_t transfer) {
    ADM_cancel_transfer_in_t in{42};
    ADM_cancel_transfer_out_t out;

    endp.call("ADM_cancel_transfer", &in, &out);
    const auto rpc = endp.call("ADM_cancel_transfer", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_cancel_transfer() = {}", out.ret);
@@ -603,7 +603,7 @@ get_pending_transfers(const server& srv, ADM_job_t job,
    ADM_get_pending_transfers_in_t in{};
    ADM_get_pending_transfers_out_t out;

    endp.call("ADM_get_pending_transfers", &in, &out);
    const auto rpc = endp.call("ADM_get_pending_transfers", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_get_pending_transfers() = {}", out.ret);
@@ -636,7 +636,7 @@ set_qos_constraints(const server& srv, ADM_job_t job, ADM_qos_entity_t entity,
    in.qos_class = "class";
    in.class_value = "value";

    endp.call("ADM_set_qos_constraints", &in, &out);
    const auto rpc = endp.call("ADM_set_qos_constraints", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_set_qos_constraints() = {}", out.ret);
@@ -667,7 +667,7 @@ get_qos_constraints(const server& srv, ADM_job_t job, ADM_qos_entity_t entity,

    in.scope = "dataset";

    endp.call("ADM_get_qos_constraints", &in, &out);
    const auto rpc = endp.call("ADM_get_qos_constraints", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_get_qos_constraints() = {}", out.ret);
@@ -701,7 +701,7 @@ define_data_operation(const server& srv, ADM_job_t job, const char* path,
    in.operation_id = 1;
    in.arguments = "argument1 argument2";

    endp.call("ADM_define_data_operation", &in, &out);
    const auto rpc = endp.call("ADM_define_data_operation", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_define_data_operation() = {}", out.ret);
@@ -735,7 +735,7 @@ connect_data_operation(const server& srv, ADM_job_t job, ADM_dataset_t input,
    in.input = "/tmp";
    in.arguments = "argument1 argument2";

    endp.call("ADM_connect_data_operation", &in, &out);
    const auto rpc = endp.call("ADM_connect_data_operation", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_connect_data_operation() = {}", out.ret);
@@ -765,7 +765,7 @@ finalize_data_operation(const server& srv, ADM_job_t job,
    ADM_finalize_data_operation_in_t in{};
    ADM_finalize_data_operation_out_t out;

    endp.call("ADM_finalize_data_operation", &in, &out);
    const auto rpc = endp.call("ADM_finalize_data_operation", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_finalize_data_operation() = {}", out.ret);
@@ -778,7 +778,8 @@ finalize_data_operation(const server& srv, ADM_job_t job,

ADM_return_t
link_transfer_to_data_operation(const server& srv, ADM_job_t job,
                                ADM_data_operation_t op, ADM_transfer_t transfer, bool should_stream,
                                ADM_data_operation_t op,
                                ADM_transfer_t transfer, bool should_stream,
                                va_list args) {
    (void) srv;
    (void) job;
@@ -799,6 +800,7 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job,

    in.arguments = "argument1 argument2";

    const auto rpc =
            endp.call("ADM_link_transfer_to_data_operation", &in, &out);

    if(out.ret < 0) {
@@ -826,7 +828,7 @@ get_statistics(const server& srv, ADM_job_t job, ADM_job_stats_t** stats) {
    ADM_get_statistics_in_t in{};
    ADM_get_statistics_out_t out;

    endp.call("ADM_get_statistics", &in, &out);
    const auto rpc = endp.call("ADM_get_statistics", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_get_statistics() = {}", out.ret);
+4 −4
Original line number Diff line number Diff line
@@ -164,7 +164,7 @@ ping(const server& srv) {
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("ADM_ping()");
    endp.call("ADM_ping");
    const auto rpc = endp.call("ADM_ping");

    LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS);
    return ADM_SUCCESS;
@@ -185,7 +185,7 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {
    ADM_register_job_in_t in{*rpc_reqs.get()};
    ADM_register_job_out_t out;

    endp.call("ADM_register_job", &in, &out);
    const auto rpc = endp.call("ADM_register_job", &in, &out);

    if(out.retval < 0) {
        LOGGER_ERROR("RPC (ADM_{}) <= {}", __FUNCTION__, out.retval);
@@ -216,7 +216,7 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) {
    ADM_update_job_in_t in{rpc_job.get(), *rpc_reqs.get()};
    ADM_update_job_out_t out;

    endp.call("ADM_update_job", &in, &out);
    const auto rpc = endp.call("ADM_update_job", &in, &out);


    if(out.retval < 0) {
@@ -243,7 +243,7 @@ remove_job(const server& srv, const job& job) {
    ADM_remove_job_in_t in{rpc_job.get()};
    ADM_remove_job_out_t out;

    endp.call("ADM_remove_job", &in, &out);
    const auto rpc = endp.call("ADM_remove_job", &in, &out);

    if(out.retval < 0) {
        const auto retval = static_cast<admire::error_code>(out.retval);