Verified Commit 1c24afc1 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

RPC clients and servers can now choose which RPCs to register

parent e29108b2
Loading
Loading
Loading
Loading
+119 −47
Original line number Diff line number Diff line
@@ -48,6 +48,105 @@ init_logger() {
    scord::logger::create_global_logger("libadm_iosched", "console color");
}

void
rpc_registration_cb(scord::network::rpc_client* client) {

    REGISTER_RPC(client, "ADM_ping", void, void, ADM_ping, false);
    REGISTER_RPC(client, "ADM_input", ADM_input_in_t, ADM_input_out_t,
                 ADM_input, true);


    REGISTER_RPC(client, "ADM_output", ADM_output_in_t, ADM_output_out_t,
                 ADM_output, true);

    REGISTER_RPC(client, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t,
                 ADM_inout, true);

    REGISTER_RPC(client, "ADM_adhoc_context", ADM_adhoc_context_in_t,
                 ADM_adhoc_context_out_t, ADM_adhoc_context, true);

    REGISTER_RPC(client, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t,
                 ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true);

    REGISTER_RPC(client, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t,
                 ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true);

    REGISTER_RPC(client, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t,
                 ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true);

    REGISTER_RPC(client, "ADM_adhoc_access", ADM_adhoc_access_in_t,
                 ADM_adhoc_access_out_t, ADM_adhoc_access, true);

    REGISTER_RPC(client, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t,
                 ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true);

    REGISTER_RPC(client, "ADM_adhoc_background_flush",
                 ADM_adhoc_background_flush_in_t,
                 ADM_adhoc_background_flush_out_t, ADM_adhoc_background_flush,
                 true);

    REGISTER_RPC(client, "ADM_in_situ_ops", ADM_in_situ_ops_in_t,
                 ADM_in_situ_ops_out_t, ADM_in_situ_ops, true);

    REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t,
                 ADM_in_transit_ops_out_t, ADM_in_transit_ops, true);

    REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t,
                 ADM_transfer_dataset_out_t, ADM_transfer_dataset, true);

    REGISTER_RPC(client, "ADM_set_dataset_information",
                 ADM_set_dataset_information_in_t,
                 ADM_set_dataset_information_out_t, ADM_set_dataset_information,
                 true);

    REGISTER_RPC(client, "ADM_set_io_resources", ADM_set_io_resources_in_t,
                 ADM_set_io_resources_out_t, ADM_set_io_resources, true);

    REGISTER_RPC(
            client, "ADM_get_transfer_priority", ADM_get_transfer_priority_in_t,
            ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, true);

    REGISTER_RPC(
            client, "ADM_set_transfer_priority", ADM_set_transfer_priority_in_t,
            ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, true);

    REGISTER_RPC(client, "ADM_cancel_transfer", ADM_cancel_transfer_in_t,
                 ADM_cancel_transfer_out_t, ADM_cancel_transfer, true);

    REGISTER_RPC(
            client, "ADM_get_pending_transfers", ADM_get_pending_transfers_in_t,
            ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, true);

    REGISTER_RPC(client, "ADM_set_qos_constraints",
                 ADM_set_qos_constraints_in_t, ADM_set_qos_constraints_out_t,
                 ADM_set_qos_constraints, true);

    REGISTER_RPC(client, "ADM_get_qos_constraints",
                 ADM_get_qos_constraints_in_t, ADM_get_qos_constraints_out_t,
                 ADM_get_qos_constraints, true);

    REGISTER_RPC(
            client, "ADM_define_data_operation", ADM_define_data_operation_in_t,
            ADM_define_data_operation_out_t, ADM_define_data_operation, true);

    REGISTER_RPC(client, "ADM_connect_data_operation",
                 ADM_connect_data_operation_in_t,
                 ADM_connect_data_operation_out_t, ADM_connect_data_operation,
                 true);

    REGISTER_RPC(client, "ADM_finalize_data_operation",
                 ADM_finalize_data_operation_in_t,
                 ADM_finalize_data_operation_out_t, ADM_finalize_data_operation,
                 true);

    REGISTER_RPC(client, "ADM_link_transfer_to_data_operation",
                 ADM_link_transfer_to_data_operation_in_t,
                 ADM_link_transfer_to_data_operation_out_t,
                 ADM_link_transfer_to_data_operation, true);

    REGISTER_RPC(client, "ADM_get_statistics", ADM_get_statistics_in_t,
                 ADM_get_statistics_out_t, ADM_get_statistics, true);
}

} // namespace

@@ -83,8 +182,7 @@ update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs) {
    (void) job;
    (void) reqs;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -109,8 +207,7 @@ remove_job(const server& srv, ADM_job_t job) {
    (void) srv;
    (void) job;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -139,8 +236,7 @@ register_adhoc_storage(const server& srv, ADM_job_t job,
    (void) ctx;
    (void) adhoc_handle;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -168,8 +264,7 @@ update_adhoc_storage(const server& srv, ADM_job_t job, ADM_adhoc_context_t ctx,
    (void) ctx;
    (void) adhoc_handle;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -196,8 +291,7 @@ remove_adhoc_storage(const server& srv, ADM_job_t job,
    (void) job;
    (void) adhoc_handle;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -224,8 +318,7 @@ deploy_adhoc_storage(const server& srv, ADM_job_t job,
    (void) job;
    (void) adhoc_handle;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -258,8 +351,7 @@ transfer_dataset(const server& srv, ADM_job_t job,
    (void) mapping;
    (void) tx_handle;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -287,8 +379,7 @@ set_dataset_information(const server& srv, ADM_job_t job,
    (void) target;
    (void) info;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -316,8 +407,7 @@ set_io_resources(const server& srv, ADM_job_t job, ADM_storage_handle_t tier,
    (void) tier;
    (void) resources;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -346,8 +436,7 @@ get_transfer_priority(const server& srv, ADM_job_t job,
    (void) tx_handle;
    (void) priority;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -375,8 +464,7 @@ set_transfer_priority(const server& srv, ADM_job_t job,
    (void) tx_handle;
    (void) incr;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -403,8 +491,7 @@ cancel_transfer(const server& srv, ADM_job_t job,
    (void) job;
    (void) tx_handle;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -432,8 +519,7 @@ get_pending_transfers(const server& srv, ADM_job_t job,
    (void) job;
    (void) pending_transfers;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -460,9 +546,7 @@ set_qos_constraints(const server& srv, ADM_job_t job, ADM_limit_t limit) {
    (void) job;
    (void) limit;

    scord::network::rpc_client rpc_client{srv.m_protocol};

    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -492,9 +576,7 @@ get_qos_constraints(const server& srv, ADM_job_t job, ADM_qos_scope_t scope,
    (void) entity;
    (void) limits;

    scord::network::rpc_client rpc_client{srv.m_protocol};

    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -524,9 +606,7 @@ define_data_operation(const server& srv, ADM_job_t job, const char* path,
    (void) op;
    (void) args;

    scord::network::rpc_client rpc_client{srv.m_protocol};

    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -558,9 +638,7 @@ connect_data_operation(const server& srv, ADM_job_t job,
    (void) should_stream;
    (void) args;

    scord::network::rpc_client rpc_client{srv.m_protocol};

    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -590,9 +668,7 @@ finalize_data_operation(const server& srv, ADM_job_t job,
    (void) op;
    (void) status;

    scord::network::rpc_client rpc_client{srv.m_protocol};

    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -623,9 +699,7 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job,
    (void) should_stream;
    (void) args;

    scord::network::rpc_client rpc_client{srv.m_protocol};

    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -652,9 +726,7 @@ get_statistics(const server& srv, ADM_job_t job, ADM_job_stats_t** stats) {
    (void) job;
    (void) stats;

    scord::network::rpc_client rpc_client{srv.m_protocol};

    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

+102 −4
Original line number Diff line number Diff line
@@ -26,13 +26,112 @@
#include <engine.hpp>
#include "impl.hpp"

void
rpc_registration_cb(scord::network::rpc_client* client) {

    REGISTER_RPC(client, "ADM_ping", void, void, ADM_ping, false);
    REGISTER_RPC(client, "ADM_input", ADM_input_in_t, ADM_input_out_t,
                 ADM_input, true);


    REGISTER_RPC(client, "ADM_output", ADM_output_in_t, ADM_output_out_t,
                 ADM_output, true);

    REGISTER_RPC(client, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t,
                 ADM_inout, true);

    REGISTER_RPC(client, "ADM_adhoc_context", ADM_adhoc_context_in_t,
                 ADM_adhoc_context_out_t, ADM_adhoc_context, true);

    REGISTER_RPC(client, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t,
                 ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true);

    REGISTER_RPC(client, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t,
                 ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true);

    REGISTER_RPC(client, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t,
                 ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true);

    REGISTER_RPC(client, "ADM_adhoc_access", ADM_adhoc_access_in_t,
                 ADM_adhoc_access_out_t, ADM_adhoc_access, true);

    REGISTER_RPC(client, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t,
                 ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true);

    REGISTER_RPC(client, "ADM_adhoc_background_flush",
                 ADM_adhoc_background_flush_in_t,
                 ADM_adhoc_background_flush_out_t, ADM_adhoc_background_flush,
                 true);

    REGISTER_RPC(client, "ADM_in_situ_ops", ADM_in_situ_ops_in_t,
                 ADM_in_situ_ops_out_t, ADM_in_situ_ops, true);

    REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t,
                 ADM_in_transit_ops_out_t, ADM_in_transit_ops, true);

    REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t,
                 ADM_transfer_dataset_out_t, ADM_transfer_dataset, true);

    REGISTER_RPC(client, "ADM_set_dataset_information",
                 ADM_set_dataset_information_in_t,
                 ADM_set_dataset_information_out_t, ADM_set_dataset_information,
                 true);

    REGISTER_RPC(client, "ADM_set_io_resources", ADM_set_io_resources_in_t,
                 ADM_set_io_resources_out_t, ADM_set_io_resources, true);

    REGISTER_RPC(
            client, "ADM_get_transfer_priority", ADM_get_transfer_priority_in_t,
            ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, true);

    REGISTER_RPC(
            client, "ADM_set_transfer_priority", ADM_set_transfer_priority_in_t,
            ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, true);

    REGISTER_RPC(client, "ADM_cancel_transfer", ADM_cancel_transfer_in_t,
                 ADM_cancel_transfer_out_t, ADM_cancel_transfer, true);

    REGISTER_RPC(
            client, "ADM_get_pending_transfers", ADM_get_pending_transfers_in_t,
            ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, true);

    REGISTER_RPC(client, "ADM_set_qos_constraints",
                 ADM_set_qos_constraints_in_t, ADM_set_qos_constraints_out_t,
                 ADM_set_qos_constraints, true);

    REGISTER_RPC(client, "ADM_get_qos_constraints",
                 ADM_get_qos_constraints_in_t, ADM_get_qos_constraints_out_t,
                 ADM_get_qos_constraints, true);

    REGISTER_RPC(
            client, "ADM_define_data_operation", ADM_define_data_operation_in_t,
            ADM_define_data_operation_out_t, ADM_define_data_operation, true);

    REGISTER_RPC(client, "ADM_connect_data_operation",
                 ADM_connect_data_operation_in_t,
                 ADM_connect_data_operation_out_t, ADM_connect_data_operation,
                 true);

    REGISTER_RPC(client, "ADM_finalize_data_operation",
                 ADM_finalize_data_operation_in_t,
                 ADM_finalize_data_operation_out_t, ADM_finalize_data_operation,
                 true);

    REGISTER_RPC(client, "ADM_link_transfer_to_data_operation",
                 ADM_link_transfer_to_data_operation_in_t,
                 ADM_link_transfer_to_data_operation_out_t,
                 ADM_link_transfer_to_data_operation, true);

    REGISTER_RPC(client, "ADM_get_statistics", ADM_get_statistics_in_t,
                 ADM_get_statistics_out_t, ADM_get_statistics, true);
}

namespace admire::detail {

admire::error_code
ping(const server& srv) {

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

@@ -48,8 +147,7 @@ register_job(const admire::server& srv, ADM_job_requirements_t reqs) {
    (void) srv;
    (void) reqs;

    scord::network::rpc_client rpc_client{srv.m_protocol};
    rpc_client.register_rpcs();
    scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.m_address);

+19 −177

File changed.

Preview size limit exceeded, changes collapsed.

+9 −1
Original line number Diff line number Diff line
@@ -150,7 +150,15 @@ main(int argc, char* argv[]) {

    try {
        scord::server daemon;
        daemon.configure(cfg);
        const auto rpc_registration_cb = [](auto&& ctx) {
            LOGGER_INFO(" * Registering RPCs handlers...");

            REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false);

            // TODO: add internal RPCs for communication with scord
        };

        daemon.configure(cfg, rpc_registration_cb);
        return daemon.run();
    } catch(const std::exception& ex) {
        fmt::print(stderr,
+114 −1
Original line number Diff line number Diff line
@@ -150,7 +150,120 @@ main(int argc, char* argv[]) {

    try {
        scord::server daemon;
        daemon.configure(cfg);

        const auto rpc_registration_cb = [](auto&& ctx) {
            LOGGER_INFO(" * Registering RPCs handlers...");

            REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false);
            REGISTER_RPC(ctx, "ADM_input", ADM_input_in_t, ADM_input_out_t,
                         ADM_input, true);


            REGISTER_RPC(ctx, "ADM_output", ADM_output_in_t, ADM_output_out_t,
                         ADM_output, true);

            REGISTER_RPC(ctx, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t,
                         ADM_inout, true);

            REGISTER_RPC(ctx, "ADM_adhoc_context", ADM_adhoc_context_in_t,
                         ADM_adhoc_context_out_t, ADM_adhoc_context, true);

            REGISTER_RPC(ctx, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t,
                         ADM_adhoc_context_id_out_t, ADM_adhoc_context_id,
                         true);

            REGISTER_RPC(ctx, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t,
                         ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true);

            REGISTER_RPC(ctx, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t,
                         ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true);

            REGISTER_RPC(ctx, "ADM_adhoc_access", ADM_adhoc_access_in_t,
                         ADM_adhoc_access_out_t, ADM_adhoc_access, true);

            REGISTER_RPC(
                    ctx, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t,
                    ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true);

            REGISTER_RPC(ctx, "ADM_adhoc_background_flush",
                         ADM_adhoc_background_flush_in_t,
                         ADM_adhoc_background_flush_out_t,
                         ADM_adhoc_background_flush, true);

            REGISTER_RPC(ctx, "ADM_in_situ_ops", ADM_in_situ_ops_in_t,
                         ADM_in_situ_ops_out_t, ADM_in_situ_ops, true);

            REGISTER_RPC(ctx, "ADM_in_transit_ops", ADM_in_transit_ops_in_t,
                         ADM_in_transit_ops_out_t, ADM_in_transit_ops, true);

            REGISTER_RPC(ctx, "ADM_transfer_dataset", ADM_transfer_dataset_in_t,
                         ADM_transfer_dataset_out_t, ADM_transfer_dataset,
                         true);

            REGISTER_RPC(ctx, "ADM_set_dataset_information",
                         ADM_set_dataset_information_in_t,
                         ADM_set_dataset_information_out_t,
                         ADM_set_dataset_information, true);

            REGISTER_RPC(ctx, "ADM_set_io_resources", ADM_set_io_resources_in_t,
                         ADM_set_io_resources_out_t, ADM_set_io_resources,
                         true);

            REGISTER_RPC(ctx, "ADM_get_transfer_priority",
                         ADM_get_transfer_priority_in_t,
                         ADM_get_transfer_priority_out_t,
                         ADM_get_transfer_priority, true);

            REGISTER_RPC(ctx, "ADM_set_transfer_priority",
                         ADM_set_transfer_priority_in_t,
                         ADM_set_transfer_priority_out_t,
                         ADM_set_transfer_priority, true);

            REGISTER_RPC(ctx, "ADM_cancel_transfer", ADM_cancel_transfer_in_t,
                         ADM_cancel_transfer_out_t, ADM_cancel_transfer, true);

            REGISTER_RPC(ctx, "ADM_get_pending_transfers",
                         ADM_get_pending_transfers_in_t,
                         ADM_get_pending_transfers_out_t,
                         ADM_get_pending_transfers, true);

            REGISTER_RPC(ctx, "ADM_set_qos_constraints",
                         ADM_set_qos_constraints_in_t,
                         ADM_set_qos_constraints_out_t, ADM_set_qos_constraints,
                         true);

            REGISTER_RPC(ctx, "ADM_get_qos_constraints",
                         ADM_get_qos_constraints_in_t,
                         ADM_get_qos_constraints_out_t, ADM_get_qos_constraints,
                         true);

            REGISTER_RPC(ctx, "ADM_define_data_operation",
                         ADM_define_data_operation_in_t,
                         ADM_define_data_operation_out_t,
                         ADM_define_data_operation, true);

            REGISTER_RPC(ctx, "ADM_connect_data_operation",
                         ADM_connect_data_operation_in_t,
                         ADM_connect_data_operation_out_t,
                         ADM_connect_data_operation, true);

            REGISTER_RPC(ctx, "ADM_finalize_data_operation",
                         ADM_finalize_data_operation_in_t,
                         ADM_finalize_data_operation_out_t,
                         ADM_finalize_data_operation, true);

            REGISTER_RPC(ctx, "ADM_link_transfer_to_data_operation",
                         ADM_link_transfer_to_data_operation_in_t,
                         ADM_link_transfer_to_data_operation_out_t,
                         ADM_link_transfer_to_data_operation, true);

            REGISTER_RPC(ctx, "ADM_get_statistics", ADM_get_statistics_in_t,
                         ADM_get_statistics_out_t, ADM_get_statistics, true);

            // TODO: add internal RPCs for communication with scord-ctl
        };

        daemon.configure(cfg, rpc_registration_cb);
        return daemon.run();
    } catch(const std::exception& ex) {
        fmt::print(stderr,
Loading