Verified Commit 49b7a3c8 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Separate RPC type definitions from RPC handlers

parent 91570eef
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -44,3 +44,9 @@ add_library(common::network::engine ALIAS _network_engine)
target_include_directories(_rpc_server INTERFACE
  ${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::network::rpc_server ALIAS _rpc_server)


target_include_directories(_rpc_types INTERFACE
  ${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::network::rpc_types ALIAS _rpc_types)
+2 −0
Original line number Diff line number Diff line
@@ -43,3 +43,5 @@ target_sources(
)

target_link_libraries(_rpc_server PUBLIC common::config _network_engine)

add_subdirectory(proto)
+12 −6
Original line number Diff line number Diff line
@@ -22,10 +22,16 @@
# SPDX-License-Identifier: GPL-3.0-or-later                                    #
################################################################################

add_library(scord_ctl_private_rpcs STATIC)
target_sources(scord_ctl_private_rpcs
  PRIVATE private.cpp private.hpp)
add_library(_rpc_types STATIC)

set_property(TARGET _rpc_types PROPERTY POSITION_INDEPENDENT_CODE ON)

target_include_directories(_rpc_types PUBLIC ${CMAKE_SOURCE_DIR}/src/lib)
target_sources(
  _rpc_types
  PRIVATE rpc_types.h rpc_types.c ${CMAKE_SOURCE_DIR}/src/lib/admire.h
          ${CMAKE_SOURCE_DIR}/src/lib/admire_types.h
)

target_link_libraries(_rpc_types PRIVATE Mercury::Mercury Margo::Margo)
target_include_directories(scord_ctl_private_rpcs INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(scord_ctl_private_rpcs PUBLIC common::logger Margo::Margo)
set_property(TARGET scord_ctl_private_rpcs PROPERTY POSITION_INDEPENDENT_CODE ON)
+1 −7
Original line number Diff line number Diff line
@@ -22,11 +22,5 @@
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#ifndef SCORD_RPCS_PRIVATE_HPP
#define SCORD_RPCS_PRIVATE_HPP
#include "rpc_types.h"
#include <margo.h>

DECLARE_MARGO_RPC_HANDLER(ADM_ping);

#endif // SCORD_RPCS_PRIVATE_HPP
+190 −86
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2021, Barcelona Supercomputing Center (BSC), Spain
 * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
@@ -22,100 +22,253 @@
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

// clang-format off
#ifndef SCORD_RPCS_PUBLIC_HPP
#define SCORD_RPCS_PUBLIC_HPP
#ifndef SCORD_PROTO_TYPES_HPP
#define SCORD_PROTO_TYPES_HPP

#include <margo.h>
#include <mercury.h>
#include <stdlib.h>
#include <mercury_macros.h>
#include <mercury_proc_string.h>
#include <logger/logger.hpp>
#include <admire_types.h>

#ifdef __cplusplus
extern "C" {
#endif // __cplusplus

/**
 * N.B. MERCURY_GEN_STRUCT_PROC requires a `typedef` as its first argument, but
 * admire_types.h also requires types to be defined as `struct T`s. Defining RPC
 * types as `typedef struct T { ... } T;` solves both problems
 */

typedef struct adm_node {
    const char* n_hostname;
} adm_node;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_node,
        ((hg_const_string_t) (n_hostname))
);
// clang-format on

typedef struct adm_dataset {
    const char* d_id;
} adm_dataset;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_dataset,
        ((hg_const_string_t) (d_id))
);
// clang-format on

typedef struct adm_job {
    uint64_t j_id;
} adm_job;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_job,
        ((hg_uint64_t) (j_id))
);
// clang-format on

struct adm_qos_entity {
    ADM_qos_scope_t e_scope;
    union {
        ADM_node_t e_node;
        ADM_job_t e_job;
        ADM_dataset_t e_dataset;
        ADM_transfer_t e_transfer;
    };
};

// TODO: union decoder

struct adm_qos_limit {
    ADM_qos_entity_t l_entity;
    ADM_qos_class_t l_class;
    uint64_t l_value;
};

#if 0
// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_qos_limit,
        ((adm_qos_entity) (l_entity))
        ((int)            (l_class))
        ((hg_uint64_t)    (l_value))
);
// clang-format on
#endif

typedef struct adm_transfer {
    // TODO: undefined for now
    int32_t placeholder;
} adm_transfer;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_transfer,
    ((hg_int32_t) (placeholder))
);
// clang-format on

typedef struct adm_dataset_info {
    // TODO: undefined for now
    int32_t placeholder;
} adm_dataset_info;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_dataset_info,
    ((hg_int32_t) (placeholder))
);
// clang-format on

// TODO: union decoder
struct adm_storage {
    const char* s_id;
    ADM_storage_type_t s_type;
    union {
        ADM_adhoc_context_t s_adhoc_ctx;
        ADM_pfs_context_t s_pfs_ctx;
    };
};

typedef struct adm_storage_resources {
    // TODO: undefined for now
    int32_t placeholder;
} adm_storage_resources;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_storage_resources,
        ((hg_int32_t) (placeholder))
);
// clang-format on

typedef struct adm_data_operation {
    // TODO: undefined for now
    int32_t placeholder;
} adm_data_operation;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_data_operation,
        ((hg_int32_t) (placeholder))
);
// clang-format on

typedef struct adm_adhoc_context {
    /** The adhoc storage system execution mode */
    ADM_adhoc_mode_t c_mode;
    /** The adhoc storage system access type */
    ADM_adhoc_access_t c_access;
    /** The number of nodes for the adhoc storage system */
    uint32_t c_nodes;
    /** The adhoc storage system walltime */
    uint32_t c_walltime;
    /** Whether the adhoc storage system should flush data in the background */
    bool c_should_bg_flush;
} adm_adhoc_context;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_adhoc_context,
        ((hg_int32_t)  (c_mode))
        ((hg_int32_t)  (c_access))
        ((hg_uint32_t) (c_nodes))
        ((hg_uint32_t) (c_walltime))
        ((hg_bool_t)   (c_should_bg_flush))
)
// clang-format on

typedef struct adm_pfs_context {
    /** The PFS mount point */
    const char* c_mount;
} adm_pfs_context;

// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_pfs_context,
        ((hg_const_string_t) (c_mount))
);
// clang-format on

/** The I/O requirements for a job */
typedef struct adm_job_requirements {
    /** An array of input datasets */
    ADM_dataset_t* r_inputs;
    /** The number of datasets in r_inputs */
    size_t r_num_inputs;
    /** A list of output datasets */
    ADM_dataset_t* r_outputs;
    /** The number of datasets in r_outputs */
    size_t r_num_outputs;
    /** An optional definition for a specific adhoc storage instance */
    ADM_adhoc_context_t r_adhoc_ctx;
} adm_job_requirements;

// FIXME: cannot be in a namespace due to Margo limitations
// namespace scord::network::rpc {
// TODO: MERCURY_GEN_STRUCT_PROC

/// ADM_ping
DECLARE_MARGO_RPC_HANDLER(ADM_ping);

/// ADM_register_job
MERCURY_GEN_PROC(ADM_register_job_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_register_job_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_register_job);

/// ADM_update_job
MERCURY_GEN_PROC(ADM_update_job_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_update_job_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_update_job);

/// ADM_remove_job
MERCURY_GEN_PROC(ADM_remove_job_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_remove_job_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_remove_job);

/// ADM_register_adhoc_storage
MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_register_adhoc_storage);

/// ADM_update_adhoc_storage
MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_update_adhoc_storage_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage);

/// ADM_remove_adhoc_storage
MERCURY_GEN_PROC(ADM_remove_adhoc_storage_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_remove_adhoc_storage_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage);

/// ADM_deploy_adhoc_storage
MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage);

/// ADM_register_pfs_storage
MERCURY_GEN_PROC(ADM_register_pfs_storage_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_register_pfs_storage_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_register_pfs_storage);

/// ADM_update_pfs_storage
MERCURY_GEN_PROC(ADM_update_pfs_storage_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_update_pfs_storage_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_update_pfs_storage);

/// ADM_remove_pfs_storage
MERCURY_GEN_PROC(ADM_remove_pfs_storage_in_t, ((int32_t) (reqs)))

MERCURY_GEN_PROC(ADM_remove_pfs_storage_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_remove_pfs_storage);

/// ADM_input
MERCURY_GEN_PROC(ADM_input_in_t,
                 ((hg_const_string_t) (origin))((hg_const_string_t) (target)))

MERCURY_GEN_PROC(ADM_input_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_input);

/// ADM_output

MERCURY_GEN_PROC(ADM_output_in_t,
@@ -123,8 +276,6 @@ MERCURY_GEN_PROC(ADM_output_in_t,

MERCURY_GEN_PROC(ADM_output_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_output);

/// ADM_inout

MERCURY_GEN_PROC(ADM_inout_in_t,
@@ -132,8 +283,6 @@ MERCURY_GEN_PROC(ADM_inout_in_t,

MERCURY_GEN_PROC(ADM_inout_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_inout);

/// ADM_adhoc_context

MERCURY_GEN_PROC(ADM_adhoc_context_in_t, ((hg_const_string_t) (context)))
@@ -141,32 +290,24 @@ MERCURY_GEN_PROC(ADM_adhoc_context_in_t, ((hg_const_string_t) (context)))
MERCURY_GEN_PROC(ADM_adhoc_context_out_t,
                 ((int32_t) (ret))((int32_t) (adhoc_context)))

DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_context);

/// ADM_adhoc_context_id

MERCURY_GEN_PROC(ADM_adhoc_context_id_in_t, ((int32_t) (context_id)))

MERCURY_GEN_PROC(ADM_adhoc_context_id_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_context_id);

/// ADM_adhoc_nodes

MERCURY_GEN_PROC(ADM_adhoc_nodes_in_t, ((int32_t) (nodes)))

MERCURY_GEN_PROC(ADM_adhoc_nodes_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_nodes)

/// ADM_adhoc_walltime

MERCURY_GEN_PROC(ADM_adhoc_walltime_in_t, ((int32_t) (walltime)))

MERCURY_GEN_PROC(ADM_adhoc_walltime_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_walltime);


/// ADM_adhoc_access

@@ -174,8 +315,6 @@ MERCURY_GEN_PROC(ADM_adhoc_access_in_t, ((hg_const_string_t) (access)))

MERCURY_GEN_PROC(ADM_adhoc_access_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_access);

/// ADM_adhoc_distribution

MERCURY_GEN_PROC(ADM_adhoc_distribution_in_t,
@@ -183,32 +322,24 @@ MERCURY_GEN_PROC(ADM_adhoc_distribution_in_t,

MERCURY_GEN_PROC(ADM_adhoc_distribution_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_distribution);

/// ADM_adhoc_background_flush

MERCURY_GEN_PROC(ADM_adhoc_background_flush_in_t, ((hg_bool_t) (b_flush)))

MERCURY_GEN_PROC(ADM_adhoc_background_flush_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_background_flush);

/// ADM_in_situ_ops

MERCURY_GEN_PROC(ADM_in_situ_ops_in_t, ((hg_const_string_t) (in_situ)))

MERCURY_GEN_PROC(ADM_in_situ_ops_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_in_situ_ops);

/// ADM_in_transit_ops

MERCURY_GEN_PROC(ADM_in_transit_ops_in_t, ((hg_const_string_t) (in_transit)))

MERCURY_GEN_PROC(ADM_in_transit_ops_out_t, ((int32_t) (ret)))

DECLARE_MARGO_RPC_HANDLER(ADM_in_transit_ops);


/// ADM_transfer_dataset

@@ -221,8 +352,6 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(ADM_transfer_dataset_out_t,
                 ((int32_t) (ret))((hg_const_string_t) (transfer_handle)))

DECLARE_MARGO_RPC_HANDLER(ADM_transfer_dataset);

/// ADM_set_dataset_information

MERCURY_GEN_PROC(ADM_set_dataset_information_in_t,
@@ -232,8 +361,6 @@ MERCURY_GEN_PROC(ADM_set_dataset_information_in_t,
MERCURY_GEN_PROC(ADM_set_dataset_information_out_t,
                 ((int32_t) (ret))((int32_t) (status)))

DECLARE_MARGO_RPC_HANDLER(ADM_set_dataset_information);

/// ADM_set_io_resources

MERCURY_GEN_PROC(ADM_set_io_resources_in_t,
@@ -243,8 +370,6 @@ MERCURY_GEN_PROC(ADM_set_io_resources_in_t,
MERCURY_GEN_PROC(ADM_set_io_resources_out_t,
                 ((int32_t) (ret))((int32_t) (status)))

DECLARE_MARGO_RPC_HANDLER(ADM_set_io_resources);

/// ADM_get_transfer_priority

MERCURY_GEN_PROC(ADM_get_transfer_priority_in_t, ((int32_t) (transfer_id)))
@@ -252,8 +377,6 @@ MERCURY_GEN_PROC(ADM_get_transfer_priority_in_t, ((int32_t) (transfer_id)))
MERCURY_GEN_PROC(ADM_get_transfer_priority_out_t,
                 ((int32_t) (ret))((int32_t) (priority)))

DECLARE_MARGO_RPC_HANDLER(ADM_get_transfer_priority);

/// ADM_set_transfer_priority

MERCURY_GEN_PROC(ADM_set_transfer_priority_in_t,
@@ -262,8 +385,6 @@ MERCURY_GEN_PROC(ADM_set_transfer_priority_in_t,
MERCURY_GEN_PROC(ADM_set_transfer_priority_out_t,
                 ((int32_t) (ret))((int32_t) (status)))

DECLARE_MARGO_RPC_HANDLER(ADM_set_transfer_priority);

/// ADM_cancel_transfer

MERCURY_GEN_PROC(ADM_cancel_transfer_in_t, ((int32_t) (transfer_id)))
@@ -271,8 +392,6 @@ MERCURY_GEN_PROC(ADM_cancel_transfer_in_t, ((int32_t) (transfer_id)))
MERCURY_GEN_PROC(ADM_cancel_transfer_out_t,
                 ((int32_t) (ret))((int32_t) (status)))

DECLARE_MARGO_RPC_HANDLER(ADM_cancel_transfer);

/// ADM_get_pending_transfers

MERCURY_GEN_PROC(ADM_get_pending_transfers_in_t, ((hg_const_string_t) (value)))
@@ -280,8 +399,6 @@ MERCURY_GEN_PROC(ADM_get_pending_transfers_in_t, ((hg_const_string_t) (value)))
MERCURY_GEN_PROC(ADM_get_pending_transfers_out_t,
                 ((int32_t) (ret))((hg_const_string_t) (pending_transfers)))

DECLARE_MARGO_RPC_HANDLER(ADM_get_pending_transfers);

/// ADM_set_qos_constraints

MERCURY_GEN_PROC(
@@ -292,8 +409,6 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(ADM_set_qos_constraints_out_t,
                 ((int32_t) (ret))((int32_t) (status)))

DECLARE_MARGO_RPC_HANDLER(ADM_set_qos_constraints);

/// ADM_get_qos_constraints

MERCURY_GEN_PROC(ADM_get_qos_constraints_in_t,
@@ -302,8 +417,6 @@ MERCURY_GEN_PROC(ADM_get_qos_constraints_in_t,
MERCURY_GEN_PROC(ADM_get_qos_constraints_out_t,
                 ((int32_t) (ret))((hg_const_string_t) (list)))

DECLARE_MARGO_RPC_HANDLER(ADM_get_qos_constraints);

/// ADM_define_data_operation

MERCURY_GEN_PROC(ADM_define_data_operation_in_t,
@@ -313,8 +426,6 @@ MERCURY_GEN_PROC(ADM_define_data_operation_in_t,
MERCURY_GEN_PROC(ADM_define_data_operation_out_t,
                 ((int32_t) (ret))((int32_t) (status)))

DECLARE_MARGO_RPC_HANDLER(ADM_define_data_operation);

/// ADM_connect_data_operation

MERCURY_GEN_PROC(ADM_connect_data_operation_in_t,
@@ -326,8 +437,6 @@ MERCURY_GEN_PROC(ADM_connect_data_operation_out_t,
                 ((int32_t) (ret))((hg_const_string_t) (data))(
                         (hg_const_string_t) (operation_handle)))

DECLARE_MARGO_RPC_HANDLER(ADM_connect_data_operation);

/// ADM_finalize_data_operation

MERCURY_GEN_PROC(ADM_finalize_data_operation_in_t, ((int32_t) (operation_id)))
@@ -336,8 +445,6 @@ MERCURY_GEN_PROC(ADM_finalize_data_operation_out_t,
                 ((int32_t) (ret))((int32_t) (status)))


DECLARE_MARGO_RPC_HANDLER(ADM_finalize_data_operation);

/// ADM_link_transfer_to_data_operation

MERCURY_GEN_PROC(ADM_link_transfer_to_data_operation_in_t,
@@ -348,8 +455,6 @@ MERCURY_GEN_PROC(ADM_link_transfer_to_data_operation_in_t,
MERCURY_GEN_PROC(ADM_link_transfer_to_data_operation_out_t,
                 ((int32_t) (ret))((hg_const_string_t) (operation_handle)))

DECLARE_MARGO_RPC_HANDLER(ADM_link_transfer_to_data_operation);

/// ADM_get_statistics

MERCURY_GEN_PROC(ADM_get_statistics_in_t,
@@ -358,10 +463,9 @@ MERCURY_GEN_PROC(ADM_get_statistics_in_t,
MERCURY_GEN_PROC(ADM_get_statistics_out_t,
                 ((int32_t) (ret))((hg_const_string_t) (job_statistics)))

DECLARE_MARGO_RPC_HANDLER(ADM_get_statistics);

#ifdef __cplusplus
};     // extern "C"
#endif // __cplusplus

//} // namespace scord::network::rpc

#endif // SCORD_RPCS_PUBLIC_HPP
// clang-format on
#endif // SCORD_PROTO_TYPES_HPP
Loading