Commit 859fd00a authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Merge branch...

Merge branch 'amiranda/93-api-offers-no-way-to-tear-down-an-adhoc-storage-system-after-it-has-been-deployed' into 'main'

Resolve "API offers no way to tear down an adhoc storage system after it has been deployed"

Closes #93

See merge request !87
parents a3dd90c1 787f9961
Loading
Loading
Loading
Loading
Loading
+29 −20
Original line number Diff line number Diff line
#!/usr/bin/env python3
import pprint
from loguru import logger
import re
import sys
from pathlib import Path
@@ -12,6 +12,7 @@ RPC_NAMES = {
    'ADM_register_job', 'ADM_update_job', 'ADM_remove_job',
    'ADM_register_adhoc_storage', 'ADM_update_adhoc_storage',
    'ADM_remove_adhoc_storage', 'ADM_deploy_adhoc_storage',
    'ADM_tear_down_adhoc_storage',
    'ADM_register_pfs_storage', 'ADM_update_pfs_storage',
    'ADM_remove_pfs_storage',
    'ADM_transfer_datasets', 'ADM_get_transfer_priority',
@@ -161,8 +162,9 @@ class RemoteProcedure:
        for extra_keys, rpc in zip([self_extra_keys, other_extra_keys],
                                   [self, other]):
            if len(extra_keys) != 0:
                print("ERROR: Extra fields were found when comparing an rpc to "
                      "its counterpart\n"
                logger.error(
                    "\nExtra fields were found when comparing an rpc "
                    "to its counterpart\n"
                    f"    extra fields: {extra_keys}"
                    f"    line number: {rpc.meta.lineno}"
                    f"    line contents: {rpc.meta.line}", file=sys.stderr)
@@ -170,8 +172,8 @@ class RemoteProcedure:

        for k in self_keys:
            if self._body[k] != other._body[k]:
                print("ERROR: Mismatching values were found when comparing an "
                      "rpc to its counterpart\n"
                logger.error("\nMismatching values were found when "
                             "comparing an rpc to its counterpart\n"
                             f"    value1 (line: {self.meta.lineno}): {k}: "
                             f"{self._body[k]}\n"
                             f"    value2 (line: {other.meta.lineno}): {k}: "
@@ -318,14 +320,16 @@ if __name__ == "__main__":
    for lf, n in zip([client_logfile, server_logfile], ['CLIENT_LOGFILE',
                                                        'SERVER_LOGFILE']):
        if not lf.is_file():
            print(f"ERROR: {n} '{lf}' is not a file", file=sys.stderr)
            logger.error(f"{n} '{lf}' is not a file", file=sys.stderr)
            sys.exit(1)

    rpc_name = sys.argv[3]

    if rpc_name not in RPC_NAMES:
        print(f"ERROR: '{rpc_name}' is not a valid rpc name", file=sys.stderr)
        print(f"  Valid names: {', '.join(sorted(RPC_NAMES))}", file=sys.stderr)
        logger.error(f"'{rpc_name}' is not a valid rpc name",
                     file=sys.stderr)
        logger.error(f"  Valid names: {', '.join(sorted(RPC_NAMES))}",
                     file=sys.stderr)
        sys.exit(1)

    logfiles = [client_logfile, server_logfile]
@@ -345,8 +349,9 @@ if __name__ == "__main__":
                if rpc.is_request:
                    found_rpcs[rpc.id] = rpc
                else:
                    print(f"ERROR: Found RPC reply without corresponding "
                          f"request at line {rpc.meta.lineno}\n"
                    logger.error(f"\nFound server reply for RPC without "
                                 f"a corresponding client request at line"
                                 f" {rpc.meta.lineno}\n"
                                 f"    raw: '{rpc.meta.line}'", file=sys.stderr)
                    sys.exit(1)
            else:
@@ -358,7 +363,11 @@ if __name__ == "__main__":
    ec = 0
    for k in client_ops.keys():

        assert (k in server_ops)
        if k not in server_ops:
            logger.error(
                f"Operation ID '{k}' found in client log but missing "
                f"in server log")
            ec = 1

        if client_ops[k] != server_ops[k]:
            ec = 1
+154 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of scord.
 *
 * scord is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * scord is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with scord.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include <stdlib.h>
#include <stdio.h>
#include <scord/scord.h>
#include "common.h"

#define NADHOC_NODES 25
#define NINPUTS      10
#define NOUTPUTS     5

int
main(int argc, char* argv[]) {

    if(argc != 2) {
        fprintf(stderr, "ERROR: no location provided\n");
        fprintf(stderr, "Usage: ADM_deploy_adhoc_storage <SERVER_ADDRESS>\n");
        exit(EXIT_FAILURE);
    }

    int exit_status = EXIT_FAILURE;
    ADM_return_t ret = ADM_SUCCESS;
    ADM_server_t server = NULL;

    // adhoc information
    const char* adhoc_name = "adhoc_storage_42";

    ADM_node_t* adhoc_nodes = NULL;
    ADM_adhoc_resources_t adhoc_resources = NULL;
    ADM_adhoc_context_t adhoc_ctx = NULL;
    ADM_adhoc_context_t new_adhoc_ctx = NULL;
    ADM_adhoc_storage_t adhoc_storage = NULL;


    // Let's prepare all the information required by the API calls.
    // ADM_update_adhoc_storage() obviously requires an adhoc storage to have
    // been registered onto the system, so let's prepare first the data required
    // to call ADM_register_adhoc_storage():

    // 1. the jobs required by the associated adhoc storage
    adhoc_nodes = prepare_nodes(NADHOC_NODES);

    if(adhoc_nodes == NULL) {
        fprintf(stderr, "Fatal error preparing adhoc nodes\n");
        goto cleanup;
    }

    // 2. the adhoc storage resources
    adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES);

    if(adhoc_resources == NULL) {
        fprintf(stderr, "Fatal error preparing adhoc resources\n");
        goto cleanup;
    }

    // 3. the adhoc storage execution context
    adhoc_ctx = ADM_adhoc_context_create(ADM_ADHOC_MODE_SEPARATE_NEW,
                                         ADM_ADHOC_ACCESS_RDWR, 100, false);

    if(adhoc_ctx == NULL) {
        fprintf(stderr, "Fatal error preparing adhoc context\n");
        goto cleanup;
    }


    // All the information required by the ADM_register_adhoc_storage() API is
    // now ready. Let's actually contact the server:

    // 1. Find the server endpoint
    if((server = ADM_server_create("tcp", argv[1])) == NULL) {
        fprintf(stderr, "Fatal error creating server\n");
        goto cleanup;
    }

    // 2. Register the adhoc storage
    if(ADM_register_adhoc_storage(
               server, adhoc_name, ADM_ADHOC_STORAGE_DATACLAY, adhoc_ctx,
               adhoc_resources, &adhoc_storage) != ADM_SUCCESS) {
        fprintf(stderr, "ADM_register_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
        goto cleanup;
    }


    // Now that we have an existing adhoc storage registered into the
    // system, let's prepare a new execution context for the adhoc
    // storage system

    new_adhoc_ctx = ADM_adhoc_context_create(ADM_ADHOC_MODE_SEPARATE_NEW,
                                             ADM_ADHOC_ACCESS_RDWR, 200, false);

    if(new_adhoc_ctx == NULL) {
        fprintf(stderr, "Fatal error preparing new adhoc context\n");
        goto cleanup;
    }

    // We can now request the deployment to the server
    if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) {
        fprintf(stderr, "ADM_deploy_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
        goto cleanup;
    }

    // We can noe request the tear down of the adhoc storage
    if((ret = ADM_tear_down_adhoc_storage(server, adhoc_storage)) !=
       ADM_SUCCESS) {
        fprintf(stderr, "ADM_tear_down_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
        goto cleanup;
    }

    // At this point, the adhoc storage has been torn down...
    exit_status = EXIT_SUCCESS;

    // Once the adhoc storage is no longer required we need to notify the server
    if((ret = ADM_remove_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) {
        fprintf(stderr, "ADM_remove_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
        adhoc_storage = NULL;
        exit_status = EXIT_FAILURE;
        // intentionally fall through...
    }


cleanup:
    ADM_server_destroy(server);
    ADM_adhoc_context_destroy(new_adhoc_ctx);
    ADM_adhoc_context_destroy(adhoc_ctx);
    ADM_adhoc_resources_destroy(adhoc_resources);
    destroy_nodes(adhoc_nodes, NADHOC_NODES);
    exit(exit_status);
}
+1 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ list(APPEND examples_c
  ADM_register_job ADM_update_job ADM_remove_job
  # adhoc storage
  ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage
  ADM_deploy_adhoc_storage
  ADM_deploy_adhoc_storage ADM_tear_down_adhoc_storage
  # pfs storage
  ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage
  # transfers
+79 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of scord.
 *
 * scord is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * scord is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with scord.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include <fmt/format.h>
#include <scord/scord.hpp>
#include "common.hpp"

#define NJOB_NODES   50
#define NADHOC_NODES 25
#define NINPUTS      10
#define NOUTPUTS     5

int
main(int argc, char* argv[]) {

    if(argc != 2) {
        fmt::print(stderr, "ERROR: no location provided\n");
        fmt::print(stderr,
                   "Usage: ADM_tear_down_adhoc_storage <SERVER_ADDRESS>\n");
        exit(EXIT_FAILURE);
    }

    scord::server server{"tcp", argv[1]};

    const auto adhoc_nodes = prepare_nodes(NADHOC_NODES);
    const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS);
    const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS);

    std::string name = "adhoc_storage_42";
    const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{
            scord::adhoc_storage::execution_mode::separate_new,
            scord::adhoc_storage::access_type::read_write, 100, false};
    const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes};

    try {
        const auto adhoc_storage = scord::register_adhoc_storage(
                server, name, scord::adhoc_storage::type::dataclay,
                adhoc_storage_ctx, adhoc_resources);

        fmt::print(stdout,
                   "ADM_register_adhoc_storage() remote procedure completed "
                   "successfully\n");

        scord::deploy_adhoc_storage(server, adhoc_storage);

        scord::tear_down_adhoc_storage(server, adhoc_storage);

    } catch(const std::exception& e) {
        fmt::print(stderr,
                   "FATAL: ADM_register_adhoc_storage() or "
                   "ADM_deploy_adhoc_storage() failed: {}\n",
                   e.what());
        exit(EXIT_FAILURE);
    }

    fmt::print(stdout, "ADM_deploy_adhoc_storage() remote procedure completed "
                       "successfully\n");
}
+1 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ list(APPEND examples_cxx
  ADM_register_job ADM_update_job ADM_remove_job
  # adhoc storage
  ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage
  ADM_deploy_adhoc_storage
  ADM_deploy_adhoc_storage ADM_tear_down_adhoc_storage
  # pfs storage
  ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage
  # transfers
Loading