Verified Commit 4f2d492f authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Move `deploy_adhoc_storage` logic to `scord-ctl`

`scord` now just forwards the request to the appropriate `scord-ctl`
instance.
parent bd901bcc
Loading
Loading
Loading
Loading
+77 −6
Original line number Diff line number Diff line
@@ -26,6 +26,10 @@
#include <net/serialization.hpp>
#include "rpc_server.hpp"

// required for ::waitpid()
#include <sys/types.h>
#include <sys/wait.h>

using namespace std::literals;

struct remote_procedure {
@@ -76,24 +80,91 @@ rpc_server::ping(const network::request& req) {
}

void
rpc_server::deploy_adhoc_storage(const network::request& req) {
rpc_server::deploy_adhoc_storage(
        const network::request& req,
        const enum scord::adhoc_storage::type adhoc_type,
        const scord::adhoc_storage::ctx& adhoc_ctx,
        const scord::adhoc_storage::resources& adhoc_resources) {

    using network::generic_response;
    using network::get_address;

    const auto rpc_name = "ADM_"s + __FUNCTION__;
    const auto rpc_id = remote_procedure::new_id();

    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{}}",
                "body: {{type: {}, ctx: {}, resources: {}}}",
                rpc_id, std::quoted(__FUNCTION__),
                std::quoted(get_address(req)));
                std::quoted(get_address(req)), adhoc_type, adhoc_ctx,
                adhoc_resources);

    auto ec = scord::error_code::success;

    if(adhoc_type == scord::adhoc_storage::type::gekkofs) {
        /* Number of nodes */
        const std::string nodes =
                std::to_string(adhoc_resources.nodes().size());

        /* Walltime */
        const std::string walltime = std::to_string(adhoc_ctx.walltime());

        /* Launch script */
        switch(const auto pid = fork()) {
            case 0: {
                std::vector<const char*> args;
                args.push_back("gkfs");
                // args.push_back("-c");
                // args.push_back("gkfs.conf");
                args.push_back("-n");
                args.push_back(nodes.c_str());
                // args.push_back("-w");
                // args.push_back(walltime.c_str());
                args.push_back("--srun");
                args.push_back("start");
                args.push_back(NULL);
                std::vector<const char*> env;
                env.push_back(NULL);

                execvpe("gkfs", const_cast<char* const*>(args.data()),
                        const_cast<char* const*>(env.data()));
                LOGGER_INFO("ADM_deploy_adhoc_storage() script didn't execute");
                exit(EXIT_FAILURE);
                break;
            }
            case -1: {
                ec = scord::error_code::other;
                LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
                             "body: {{retval: {}}}",
                             rpc_id, std::quoted(rpc_name),
                             std::quoted(get_address(req)), ec);
                break;
            }
            default: {
                int wstatus = 0;
                pid_t retwait = ::waitpid(pid, &wstatus, 0);
                if(retwait == -1) {
                    LOGGER_ERROR(
                            "rpc id: {} error_msg: \"Error waitpid code: {}\"",
                            rpc_id, retwait);
                    ec = scord::error_code::other;
                } else {
                    if(WEXITSTATUS(wstatus) != 0) {
                        ec = scord::error_code::other;
                    } else {
                        ec = scord::error_code::success;
                    }
                }
                break;
            }
        }
    }

    const auto resp = generic_response{rpc_id, scord::error_code::success};
    const auto resp = generic_response{rpc_id, ec};

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}}}",
                rpc_id, std::quoted(__FUNCTION__),
                std::quoted(get_address(req)), scord::error_code::success);
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                ec);

    req.respond(resp);
}
+6 −1
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@
#define SCORD_CTL_RPC_SERVER_HPP

#include <net/server.hpp>
#include <scord/types.hpp>

namespace scord_ctl {

@@ -42,7 +43,11 @@ private:
    ping(const network::request& req);

    void
    deploy_adhoc_storage(const network::request& req);
    deploy_adhoc_storage(
            const network::request& req,
            enum scord::adhoc_storage::type adhoc_type,
            const scord::adhoc_storage::ctx& adhoc_ctx,
            const scord::adhoc_storage::resources& adhoc_resources);
};

} // namespace scord_ctl
+29 −66
Original line number Diff line number Diff line
@@ -24,13 +24,10 @@

#include <scord/types.hpp>
#include <net/request.hpp>
#include <net/endpoint.hpp>
#include <net/serialization.hpp>
#include "rpc_server.hpp"

// required for ::waitpid()
#include <sys/types.h>
#include <sys/wait.h>

using namespace std::literals;

struct remote_procedure {
@@ -348,72 +345,38 @@ rpc_server::deploy_adhoc_storage(const network::request& req,

    auto ec = scord::error_code::success;

    // contact the adhoc controller and ask it to deploy the adhoc storage
    if(const auto am_result = m_adhoc_manager.find(adhoc_id);
       am_result.has_value()) {
        const auto& storage_info = am_result.value();
        const auto adhoc_storage = storage_info->adhoc_storage();

        if(adhoc_storage.type() == scord::adhoc_storage::type::gekkofs) {
            const auto adhoc_ctx = adhoc_storage.context();
            /* Number of nodes */
            const std::string nodes = std::to_string(
                    adhoc_storage.get_resources().nodes().size());

            /* Walltime */
            const std::string walltime = std::to_string(adhoc_ctx.walltime());

            /* Launch script */
            switch(const auto pid = fork()) {
                case 0: {
                    std::vector<const char*> args;
                    args.push_back("gkfs");
                    // args.push_back("-c");
                    // args.push_back("gkfs.conf");
                    args.push_back("-n");
                    args.push_back(nodes.c_str());
                    // args.push_back("-w");
                    // args.push_back(walltime.c_str());
                    args.push_back("--srun");
                    args.push_back("start");
                    args.push_back(NULL);
                    std::vector<const char*> env;
                    env.push_back(NULL);

                    execvpe("gkfs", const_cast<char* const*>(args.data()),
                            const_cast<char* const*>(env.data()));
                    LOGGER_INFO(
                            "ADM_deploy_adhoc_storage() script didn't execute");
                    exit(EXIT_FAILURE);
                    break;
                }
                case -1: {
                    ec = scord::error_code::other;
                    LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
                                 "body: {{retval: {}}}",
                                 rpc_id, std::quoted(rpc_name),
                                 std::quoted(get_address(req)), ec);
                    break;
                }
                default: {
                    int wstatus = 0;
                    pid_t retwait = ::waitpid(pid, &wstatus, 0);
                    if(retwait == -1) {
                        LOGGER_ERROR(
                                "rpc id: {} error_msg: \"Error waitpid code: {}\"",
                                rpc_id, retwait);
                        ec = scord::error_code::other;
                    } else {
                        if(WEXITSTATUS(wstatus) != 0) {
                            ec = scord::error_code::other;
                        } else {
                            ec = scord::error_code::success;
                        }
                    }
                    break;
                }
        const auto& adhoc_info = am_result.value();
        const auto adhoc_storage = adhoc_info->adhoc_storage();

        if(const auto lookup_rv =
                   lookup(adhoc_storage.context().controller_address());
           lookup_rv.has_value()) {
            const auto& endp = lookup_rv.value();

            LOGGER_INFO("rpc id: {} name: {} from: {} => "
                        "body: {{type: {}, ctx: {}, resources: {}}}",
                        rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                        std::quoted(self_address()), adhoc_storage.type(),
                        adhoc_storage.context(), adhoc_storage.get_resources());

            if(const auto call_rv = endp.call(
                       "ADM_"s + __FUNCTION__, adhoc_storage.type(),
                       adhoc_storage.context(), adhoc_storage.get_resources());
               call_rv.has_value()) {

                const network::generic_response resp{call_rv.value()};
                ec = resp.error_code();

                LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                            "rpc id: {} name: {} from: {} <= "
                            "body: {{retval: {}}} [op_id: {}]",
                            rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                            std::quoted(endp.address()), ec, resp.op_id());
            }
        }

    } else {
        ec = am_result.error();
        LOGGER_ERROR("rpc id: {} name: {} to: {} <= "