Commit 2c547961 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Merge branch...

Merge branch 'rnou/54-refactor-library-rpc-implementation-of-admire-deploy_adhoc_storage' into 'main'

Resolve "Refactor library RPC implementation of `admire::deploy_adhoc_storage`"

This MR moves the implementation of the RPC construction for
`ADM_deploy_adhoc_storage` to `admire::detail` in `impl.[ch]pp`
similarly to other RPCs already refactored. This includes changing
the C ADM_types to native C++ types.

It also provides a first implementation of the deployment mechanism
for GekkoFS.

Closes #54

See merge request !40
parents ee8d7163 67c2160c
Loading
Loading
Loading
Loading
Loading
+5 −2
Original line number Diff line number Diff line
@@ -25,7 +25,9 @@
#include <stdlib.h>
#include <stdio.h>
#include <admire.h>
#include <assert.h>
#include "common.h"
#include <net/proto/rpc_types.h>

#define NADHOC_NODES 25
#define NINPUTS      10
@@ -96,7 +98,7 @@ main(int argc, char* argv[]) {
    }

    // 2. Register the adhoc storage
    if(ADM_register_adhoc_storage(server, adhoc_name, ADM_STORAGE_GEKKOFS,
    if(ADM_register_adhoc_storage(server, adhoc_name, ADM_STORAGE_DATACLAY,
                                  adhoc_ctx, &adhoc_storage) != ADM_SUCCESS) {
        fprintf(stderr, "ADM_register_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
@@ -118,7 +120,8 @@ main(int argc, char* argv[]) {
    }

    // We can now request the deployment to the server
    if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) {
    if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) !=
       ADM_SUCCESS) {
        fprintf(stderr, "ADM_deploy_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
        goto cleanup;
+26 −10
Original line number Diff line number Diff line
@@ -24,7 +24,12 @@

#include <fmt/format.h>
#include <admire.hpp>
#include "common.hpp"

#define NJOB_NODES   50
#define NADHOC_NODES 25
#define NINPUTS      10
#define NOUTPUTS     5

int
main(int argc, char* argv[]) {
@@ -38,21 +43,32 @@ main(int argc, char* argv[]) {

    admire::server server{"tcp", argv[1]};

    ADM_storage_t adhoc_storage{};
    ADM_return_t ret = ADM_SUCCESS;
    const auto adhoc_nodes = prepare_nodes(NADHOC_NODES);
    const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS);
    const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS);

    std::string name = "adhoc_storage_42";
    const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{
            admire::adhoc_storage::execution_mode::separate_new,
            admire::adhoc_storage::access_type::read_write,
            admire::adhoc_storage::resources{adhoc_nodes}, 100, false};

    try {
        ret = admire::deploy_adhoc_storage(server, adhoc_storage);
    } catch(const std::exception& e) {
        fmt::print(stderr, "FATAL: ADM_deploy_adhoc_storage() failed: {}\n",
                   e.what());
        exit(EXIT_FAILURE);
    }
        const auto adhoc_storage = admire::register_adhoc_storage(
                server, name, admire::storage::type::dataclay,
                adhoc_storage_ctx);

    if(ret != ADM_SUCCESS) {
        fmt::print(stdout,
                   "ADM_deploy_adhoc_storage() remote procedure not completed "
                   "ADM_register_adhoc_storage() remote procedure completed "
                   "successfully\n");

        admire::deploy_adhoc_storage(server, adhoc_storage);

    } catch(const std::exception& e) {
        fmt::print(
                stderr,
                "FATAL: ADM_register_adhoc_storage() or ADM_deploy_adhoc_storage() failed: {}\n",
                e.what());
        exit(EXIT_FAILURE);
    }

+1 −1
Original line number Diff line number Diff line
@@ -55,7 +55,7 @@ main(int argc, char* argv[]) {

    try {
        const auto adhoc_storage = admire::register_adhoc_storage(
                server, name, admire::storage::type::gekkofs,
                server, name, admire::storage::type::dataclay,
                adhoc_storage_ctx);

        fmt::print(stdout,
+6 −2
Original line number Diff line number Diff line
@@ -174,6 +174,7 @@ typedef struct adm_pfs_context {
    const char* c_mount;
} adm_pfs_context;


// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_pfs_context, // NOLINT
@@ -371,9 +372,12 @@ MERCURY_GEN_PROC(
);

/// ADM_deploy_adhoc_storage
MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((int32_t) (reqs)))
MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((hg_uint64_t) (id)))

MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, ((int32_t) (ret)))
MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, 
        ((hg_uint64_t) (op_id))
        ((hg_int32_t) (retval))
);

/// ADM_register_pfs_storage
MERCURY_GEN_PROC(ADM_register_pfs_storage_in_t, ((int32_t) (reqs)))
+8 −22
Original line number Diff line number Diff line
@@ -283,30 +283,16 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) {
    }
}

ADM_return_t
deploy_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage) {

    (void) srv;
    (void) adhoc_storage;

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("ADM_deploy_adhoc_storage(...)");

    ADM_deploy_adhoc_storage_in_t in{};
    ADM_deploy_adhoc_storage_out_t out;
void
deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) {
   
    const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out);
    const auto ec = detail::deploy_adhoc_storage(srv, adhoc_storage);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_deploy_adhoc_storage() = {}", out.ret);
        return static_cast<ADM_return_t>(out.ret);
    if(!ec) {
        throw std::runtime_error(fmt::format(
                "ADM_deploy_adhoc_storage() error: {}", ec.message()));
    }
  
    LOGGER_INFO("ADM_deploy_adhoc_storage() = {}", ADM_SUCCESS);
    return ADM_SUCCESS;
}

ADM_return_t
Loading