Commit ca408d23 authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'amiranda/25-implement-complex-rpc-arguments-for-adm_register_job' into 'main'

Resolve "Implement complex RPC arguments for ADM_register_job(), ADM_update_job() and ADM_remove_job()"

This MR implements the transmission of the complex RPC arguments required for the RPCs `ADM_register_job`, `ADM_update_job`, and `ADM_remove_job`.

It also makes the following deep changes to the source code:

1. The library core no longer uses C types for those RPCs. Also, C++ types 
   for those RPCs follow the [pimpl](https://en.cppreference.com/w/cpp/language/pimpl) 
   idiom to hide implementation details from the public API.
2. Refactors admire types by moving them to a dedicated 'admire_types' CMake target
   This makes it much simpler to share type definitions between the library
   and the daemons. Code structure is affected as follows:
    
   - C++ types are now declared in admire_types.hpp
   - C & C++ types are defined in types.cpp. The reason for them being in
     the same file is that the C type-creation functions use logging
     facilities that are C++ code. Until we offer a C interface for them,
     C-type functions need to be compiled with a C++ compiler.
3. Adds specific types to enable converting between RPC and API types
    
    This commit adds two generic templated types (managed_rpc_type<T> and
    unmanaged_rpc_type<T>) that simplify the lifetime management of any C-types
    created by Margo RPCs. They also offer conversion functions that allow
    converting between C RPC types and C++ API types.
    
    Both types internally create and store pointers to the C-structures required
    (or produced) by the RPC engine, but differ in how they manage these
    structures:
    
    - The `managed_rpc_type<T>` template internally manages the lifetime of the
      C underlying object and ensures that its destruction function will be
      called when the destructor for `managed_rpc_type<T>` is called.
    
    - The `unmanaged_rpc_type<T>` template only creates the C underlying
      object, but does not destroy it. This is useful when conversions
      are needed but the underlying data will be automatically destroyed by Margo.
    
    Specific conversions are implemented as template specializations. For
    example, `template <> struct admire::managed_rpc_type<admire::job>`
    allows converting from an `admire::job` instance to a newly-created `ADM_job_t`
    via its `get()` function. On the other hand,
    `template <> struct admire::managed_rpc_type<ADM_job_t>` allows converting
    from an existing `ADM_job_t` to an `admire::job` instance, also via its
    `get()` function.

Closes #25

See merge request !16
parents 56df2d7e 2478cf00
Loading
Loading
Loading
Loading
Loading
+21 −7
Original line number Diff line number Diff line
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <admire.h>

#define NINPUTS  10
@@ -8,7 +9,7 @@
int
main(int argc, char* argv[]) {

    if(argc != 3) {
    if(argc != 2) {
        fprintf(stderr, "ERROR: no location provided\n");
        fprintf(stderr, "Usage: ADM_register_job <REMOTE_IP> <JOB_REQS>\n");
        exit(EXIT_FAILURE);
@@ -17,14 +18,13 @@ main(int argc, char* argv[]) {
    int exit_status = EXIT_SUCCESS;
    ADM_server_t server = ADM_server_create("tcp", argv[1]);


    ADM_job_t job;
    ADM_dataset_t inputs[NINPUTS];

    for(int i = 0; i < NINPUTS; ++i) {
        const char* pattern = "input-dataset-%d";
        size_t n = snprintf(NULL, 0, pattern, i);
        char* id = (char*) malloc(n + 1);
        char* id = (char*) alloca(n + 1);
        snprintf(id, n + 1, pattern, i);
        inputs[i] = ADM_dataset_create(id);
    }
@@ -34,13 +34,22 @@ main(int argc, char* argv[]) {
    for(int i = 0; i < NOUTPUTS; ++i) {
        const char* pattern = "output-dataset-%d";
        size_t n = snprintf(NULL, 0, pattern, i);
        char* id = (char*) malloc(n + 1);
        char* id = (char*) alloca(n + 1);
        snprintf(id, n + 1, pattern, i);
        outputs[i] = ADM_dataset_create(id);
    }

    ADM_job_requirements_t reqs = ADM_job_requirements_create(
            inputs, NINPUTS, outputs, NOUTPUTS, NULL);
    ADM_adhoc_context_t ctx = ADM_adhoc_context_create(
            ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false);
    assert(ctx);

    ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx);
    assert(st);

    ADM_job_requirements_t reqs =
            ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st);
    assert(reqs);

    ADM_return_t ret = ADM_register_job(server, reqs, &job);

    if(ret != ADM_SUCCESS) {
@@ -54,7 +63,6 @@ main(int argc, char* argv[]) {
                    "successfully\n");

cleanup:

    for(int i = 0; i < NINPUTS; ++i) {
        ADM_dataset_destroy(inputs[i]);
    }
@@ -63,6 +71,12 @@ cleanup:
        ADM_dataset_destroy(outputs[i]);
    }

    ADM_storage_destroy(st);

    ADM_adhoc_context_destroy(ctx);

    ADM_job_requirements_destroy(reqs);

    ADM_server_destroy(server);
    exit(exit_status);
}
+23 −5
Original line number Diff line number Diff line
#include <fmt/format.h>
#include <admire.hpp>

#define NINPUTS  10
#define NOUTPUTS 5

int
main(int argc, char* argv[]) {

    if(argc != 3) {
        fmt::print(stderr, "ERROR: no location provided\n");
        fmt::print(stderr, "Usage: ADM_register_job <REMOTE_IP> <JOB_REQS>\n");
    if(argc != 2) {
        fmt::print(stderr, "ERROR: no server address provided\n");
        fmt::print(stderr, "Usage: ADM_register_job <SERVER_ADDRESS>\n");
        exit(EXIT_FAILURE);
    }

    admire::server server{"tcp", argv[1]};

    ADM_job_requirements_t reqs{};
    std::vector<admire::dataset> inputs;
    inputs.reserve(NINPUTS);
    for(int i = 0; i < NINPUTS; ++i) {
        inputs.emplace_back(fmt::format("input-dataset-{}", i));
    }

    try {
    std::vector<admire::dataset> outputs;
    outputs.reserve(NOUTPUTS);
    for(int i = 0; i < NOUTPUTS; ++i) {
        outputs.emplace_back(fmt::format("output-dataset-{}", i));
    }

    auto p = std::make_unique<admire::adhoc_storage>(
            admire::storage::type::gekkofs, "foobar",
            admire::adhoc_storage::execution_mode::separate_new,
            admire::adhoc_storage::access_type::read_write, 42, 100, false);

    admire::job_requirements reqs(inputs, outputs, std::move(p));

    try {
        [[maybe_unused]] const auto job = admire::register_job(server, reqs);

        // do something with job
+8 −14
Original line number Diff line number Diff line
@@ -5,30 +5,24 @@
int
main(int argc, char* argv[]) {

    if(argc != 3) {
        fmt::print(stderr, "ERROR: no location provided\n");
        fmt::print(stderr, "Usage: ADM_remove_job <REMOTE_IP> <JOB_REQS>\n");
    if(argc != 2) {
        fmt::print(stderr, "ERROR: no server address provided\n");
        fmt::print(stderr, "Usage: ADM_remove_job <SERVER_ADDRESS>\n");
        exit(EXIT_FAILURE);
    }

    admire::server server{"tcp", argv[1]};

    ADM_job_t job{};
    ADM_return_t ret = ADM_SUCCESS;
    admire::job job{42};

    try {
        ret = admire::remove_job(server, job);
        [[maybe_unused]] const auto ret = admire::remove_job(server, job);
        fmt::print(stdout, "ADM_remove_job() remote procedure completed "
                           "successfully\n");
        exit(EXIT_SUCCESS);
    } catch(const std::exception& e) {
        fmt::print(stderr, "FATAL: ADM_remove_job() failed: {}\n", e.what());
        exit(EXIT_FAILURE);
    }

    if(ret != ADM_SUCCESS) {
        fmt::print(stdout, "ADM_remove_job() remote procedure not completed "
                           "successfully\n");
        exit(EXIT_FAILURE);
    }

    fmt::print(stdout, "ADM_remove_job() remote procedure completed "
                       "successfully\n");
}
+24 −5
Original line number Diff line number Diff line
#include <fmt/format.h>
#include <admire.hpp>

#define NINPUTS  10
#define NOUTPUTS 5

int
main(int argc, char* argv[]) {

    if(argc != 3) {
        fmt::print(stderr, "ERROR: no location provided\n");
        fmt::print(stderr, "Usage: ADM_update_job <REMOTE_IP> <JOB_REQS>\n");
    if(argc != 2) {
        fmt::print(stderr, "ERROR: no server address provided\n");
        fmt::print(stderr, "Usage: ADM_update_job <SERVER_ADDRESS>\n");
        exit(EXIT_FAILURE);
    }

    admire::server server{"tcp", argv[1]};

    ADM_job_t job{};
    ADM_job_requirements_t reqs{};
    std::vector<admire::dataset> inputs;
    inputs.reserve(NINPUTS);
    for(int i = 0; i < NINPUTS; ++i) {
        inputs.emplace_back(fmt::format("input-dataset-{}", i));
    }

    std::vector<admire::dataset> outputs;
    outputs.reserve(NOUTPUTS);
    for(int i = 0; i < NOUTPUTS; ++i) {
        outputs.emplace_back(fmt::format("output-dataset-{}", i));
    }

    auto p = std::make_unique<admire::adhoc_storage>(
            admire::storage::type::gekkofs, "foobar",
            admire::adhoc_storage::execution_mode::separate_new,
            admire::adhoc_storage::access_type::read_write, 42, 100, false);

    admire::job job{42};
    admire::job_requirements reqs{inputs, outputs, std::move(p)};
    ADM_return_t ret = ADM_SUCCESS;

    try {
+6 −1
Original line number Diff line number Diff line
@@ -37,10 +37,15 @@ add_subdirectory(logger)
target_include_directories(_logger INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::logger ALIAS _logger)

add_subdirectory(network)
add_subdirectory(net)
target_include_directories(_network_engine INTERFACE
  ${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::network::engine ALIAS _network_engine)
target_include_directories(_rpc_server INTERFACE
  ${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::network::rpc_server ALIAS _rpc_server)


target_include_directories(_rpc_types INTERFACE
  ${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::network::rpc_types ALIAS _rpc_types)
Loading