Verified Commit 44beab2a authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

scord: Rewrite `ADM_deploy_adhoc_storage` RPC

parent be8d7d1d
Loading
Loading
Loading
Loading
+23 −28
Original line number Diff line number Diff line
@@ -393,43 +393,38 @@ register_adhoc_storage(const server& srv, const std::string& name,
admire::error_code
deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) {

    (void) srv;
    (void) adhoc_storage;

    return admire::error_code::snafu;

#if 0
    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
    scord::network::client rpc_client{srv.protocol()};

    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    if(const auto& lookup_rv = rpc_client.lookup(srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO("rpc id: {} name: {} from: {} => "
                    "body: {{adhoc_id: {}}}",
                    rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                    std::quoted(rpc_client.self_address()), adhoc_storage.id());

    ADM_deploy_adhoc_storage_in_t in{adhoc_storage.id()};
    ADM_deploy_adhoc_storage_out_t out;
        if(const auto& call_rv =
                   endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id());
           call_rv.has_value()) {

    const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out);
            const scord::network::generic_response resp{call_rv.value()};

    if(const auto rv = admire::error_code{out.retval}; !rv) {
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
            LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                        "rpc id: {} name: {} from: {} <= "
                        "body: {{retval: {}}} [op_id: {}]",
                        rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc_client.self_address()), rv, out.op_id);
        return rv;
    }
                        std::quoted(endp.address()), resp.error_code(),
                        resp.op_id());

    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()),
                admire::error_code::success, out.op_id);
            return resp.error_code();
        }
    }

    return admire::error_code::success;
#endif
    LOGGER_ERROR("rpc call failed");
    return admire::error_code::other;
}

tl::expected<transfer, error_code>
+100 −119
Original line number Diff line number Diff line
@@ -229,6 +229,106 @@ remove_adhoc_storage(const request& req, std::uint64_t adhoc_id) {
    req.respond(resp);
}

void
deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id) {

    using scord::network::get_address;

    const auto rpc_name = "ADM_"s + __FUNCTION__;
    const auto rpc_id = remote_procedure::new_id();

    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{adhoc_id: {}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                adhoc_id);

    auto ec = admire::error_code::success;
    auto& adhoc_manager = scord::adhoc_storage_manager::instance();

    if(const auto am_result = adhoc_manager.find(adhoc_id);
       am_result.has_value()) {
        const auto& storage_info = am_result.value();
        const auto adhoc_storage = storage_info->adhoc_storage();

        if(adhoc_storage.type() == admire::adhoc_storage::type::gekkofs) {
            const auto adhoc_ctx = adhoc_storage.context();
            /* Number of nodes */
            const std::string nodes =
                    std::to_string(adhoc_ctx.resources().nodes().size());

            /* Walltime */
            const std::string walltime = std::to_string(adhoc_ctx.walltime());

            /* Launch script */
            switch(const auto pid = fork()) {
                case 0: {
                    std::vector<const char*> args;
                    args.push_back("gkfs");
                    // args.push_back("-c");
                    // args.push_back("gkfs.conf");
                    args.push_back("-n");
                    args.push_back(nodes.c_str());
                    // args.push_back("-w");
                    // args.push_back(walltime.c_str());
                    args.push_back("--srun");
                    args.push_back("start");
                    args.push_back(NULL);
                    std::vector<const char*> env;
                    env.push_back(NULL);

                    execvpe("gkfs", const_cast<char* const*>(args.data()),
                            const_cast<char* const*>(env.data()));
                    LOGGER_INFO(
                            "ADM_deploy_adhoc_storage() script didn't execute");
                    exit(EXIT_FAILURE);
                    break;
                }
                case -1: {
                    ec = admire::error_code::other;
                    LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
                                 "body: {{retval: {}}}",
                                 rpc_id, std::quoted(rpc_name),
                                 std::quoted(get_address(req)), ec);
                    break;
                }
                default: {
                    int wstatus = 0;
                    pid_t retwait = waitpid(pid, &wstatus, 0);
                    if(retwait == -1) {
                        LOGGER_ERROR(
                                "rpc id: {} error_msg: \"Error waitpid code: {}\"",
                                rpc_id, retwait);
                        ec = admire::error_code::other;
                    } else {
                        if(WEXITSTATUS(wstatus) != 0) {
                            ec = admire::error_code::other;
                        } else {
                            ec = admire::error_code::success;
                        }
                    }
                    break;
                }
            }
        }

    } else {
        ec = am_result.error();
        LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
                     "body: {{retval: {}}}",
                     rpc_id, std::quoted(rpc_name),
                     std::quoted(get_address(req)), ec);
    }

    const auto resp = generic_response{rpc_id, ec};

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                ec);

    req.respond(resp);
}

} // namespace scord::network::handlers


@@ -348,125 +448,6 @@ ADM_remove_job(hg_handle_t h) {

DEFINE_MARGO_RPC_HANDLER(ADM_remove_job);

static void
ADM_deploy_adhoc_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_deploy_adhoc_storage_in_t in;
    ADM_deploy_adhoc_storage_out_t out;

    [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);


    const auto rpc_id = remote_procedure::new_id();
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{adhoc_id: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                in.id);

    auto ec = admire::error_code::success;
    auto& adhoc_manager = scord::adhoc_storage_manager::instance();

    if(const auto am_result = adhoc_manager.find(in.id);
       am_result.has_value()) {
        const auto& storage_info = am_result.value();
        const auto adhoc_storage = storage_info->adhoc_storage();

        if(adhoc_storage.type() == admire::adhoc_storage::type::gekkofs) {
            const auto adhoc_ctx = adhoc_storage.context();
            /* Number of nodes */
            const std::string nodes =
                    std::to_string(adhoc_ctx.resources().nodes().size());

            /* Walltime */
            const std::string walltime = std::to_string(adhoc_ctx.walltime());

            /* Launch script */
            switch(const auto pid = fork()) {
                case 0: {
                    std::vector<const char*> args;
                    args.push_back("gkfs");
                    // args.push_back("-c");
                    // args.push_back("gkfs.conf");
                    args.push_back("-n");
                    args.push_back(nodes.c_str());
                    // args.push_back("-w");
                    // args.push_back(walltime.c_str());
                    args.push_back("--srun");
                    args.push_back("start");
                    args.push_back(NULL);
                    std::vector<const char*> env;
                    env.push_back(NULL);

                    execvpe("gkfs", const_cast<char* const*>(args.data()),
                            const_cast<char* const*>(env.data()));
                    LOGGER_INFO(
                            "ADM_deploy_adhoc_storage() script didn't execute");
                    exit(EXIT_FAILURE);
                    break;
                }
                case -1: {
                    ec = admire::error_code::other;
                    LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
                                 "body: {{retval: {}}}",
                                 rpc_id, std::quoted(__FUNCTION__),
                                 std::quoted(get_address(h)), ec);
                    break;
                }
                default: {
                    int wstatus = 0;
                    pid_t retwait = waitpid(pid, &wstatus, 0);
                    if(retwait == -1) {
                        LOGGER_ERROR(
                                "rpc id: {} error_msg: \"Error waitpid code: {}\"",
                                rpc_id, retwait);
                        ec = admire::error_code::other;
                    } else {
                        if(WEXITSTATUS(wstatus) != 0) {
                            ec = admire::error_code::other;
                        } else {
                            ec = admire::error_code::success;
                        }
                    }
                    break;
                }
            }
        }

    } else {
        ec = am_result.error();
        LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
                     "body: {{retval: {}}}",
                     rpc_id, std::quoted(__FUNCTION__),
                     std::quoted(get_address(h)), ec);
    }

    out.op_id = rpc_id;
    out.retval = ec;

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                ec);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);
}

DEFINE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage);

static void
ADM_register_pfs_storage(hg_handle_t h) {

+3 −3
Original line number Diff line number Diff line
@@ -43,6 +43,9 @@ update_adhoc_storage(const request& req, std::uint64_t adhoc_id,
void
remove_adhoc_storage(const request& req, std::uint64_t adhoc_id);

void
deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id);

void
register_job(const scord::network::request& req,
             const admire::job::resources& job_resources,
@@ -66,9 +69,6 @@ DECLARE_MARGO_RPC_HANDLER(ADM_update_job);
/// ADM_remove_job
DECLARE_MARGO_RPC_HANDLER(ADM_remove_job);

/// ADM_deploy_adhoc_storage
DECLARE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage);

/// ADM_register_pfs_storage
DECLARE_MARGO_RPC_HANDLER(ADM_register_pfs_storage);

+2 −0
Original line number Diff line number Diff line
@@ -188,6 +188,8 @@ main(int argc, char* argv[]) {
                           scord::network::handlers::update_adhoc_storage);
        daemon.set_handler("ADM_remove_adhoc_storage"s,
                           scord::network::handlers::remove_adhoc_storage);
        daemon.set_handler("ADM_deploy_adhoc_storage"s,
                           scord::network::handlers::deploy_adhoc_storage);
        daemon.set_handler("ADM_register_job"s,
                           scord::network::handlers::register_job);