Verified Commit 6d8f90e9 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Update ADM_update_job RPC implementation

parent 8b558cf8
Loading
Loading
Loading
Loading
+6 −1
Original line number Diff line number Diff line
@@ -27,8 +27,13 @@ main(int argc, char* argv[]) {
        outputs.emplace_back(fmt::format("output-dataset-{}", i));
    }

    auto p = std::make_unique<admire::adhoc_storage>(
            admire::storage::type::gekkofs, "foobar",
            admire::adhoc_storage::execution_mode::separate_new,
            admire::adhoc_storage::access_type::read_write, 42, 100, false);

    admire::job job{42};
    admire::job_requirements reqs{inputs, outputs};
    admire::job_requirements reqs{inputs, outputs, std::move(p)};
    ADM_return_t ret = ADM_SUCCESS;

    try {
+9 −2
Original line number Diff line number Diff line
@@ -244,9 +244,16 @@ MERCURY_GEN_PROC(
);

/// ADM_update_job
MERCURY_GEN_PROC(ADM_update_job_in_t, ((int32_t) (reqs)))
MERCURY_GEN_PROC(
    ADM_update_job_in_t,
        ((ADM_job_t) (job))
        ((adm_job_requirements) (reqs))
);

MERCURY_GEN_PROC(ADM_update_job_out_t, ((int32_t) (ret)))
MERCURY_GEN_PROC(
    ADM_update_job_out_t,
        ((int32_t) (retval))
);

/// ADM_remove_job
MERCURY_GEN_PROC(ADM_remove_job_in_t, ((int32_t) (reqs)))
+1 −22
Original line number Diff line number Diff line
@@ -197,28 +197,7 @@ register_job(const server& srv, const job_requirements& reqs) {

ADM_return_t
update_job(const server& srv, const job& job, const job_requirements& reqs) {
    (void) srv;
    (void) job;
    (void) reqs;

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("ADM_update_job(...)");

    ADM_update_job_in_t in{};
    ADM_update_job_out_t out;

    endp.call("ADM_update_job", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_update_job() = {}", out.ret);
        return static_cast<ADM_return_t>(out.ret);
    }

    LOGGER_INFO("ADM_update_job() = {}", ADM_SUCCESS);
    return ADM_SUCCESS;
    return detail::update_job(srv, job, reqs);
}

ADM_return_t
+30 −0
Original line number Diff line number Diff line
@@ -192,4 +192,34 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {
    return job;
}

admire::error_code
update_job(const server& srv, const job& job, const job_requirements& reqs) {

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("RPC ({}): {{job: {{{}}}, job_requirements: {{{}}}}}",
                "ADM_update_job", job, reqs);

    const auto rpc_job = managed_rpc_type<admire::job>{job};
    const auto rpc_reqs = managed_rpc_type<admire::job_requirements>{reqs};

    ADM_update_job_in_t in{rpc_job.get(), *rpc_reqs.get()};
    ADM_update_job_out_t out;

    endp.call("ADM_update_job", &in, &out);


    if(out.retval < 0) {
        const auto retval = static_cast<admire::error_code>(out.retval);
        LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__,
                     retval);
        return retval;
    }

    LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS);
    return ADM_SUCCESS;
}

} // namespace admire::detail
+3 −0
Original line number Diff line number Diff line
@@ -37,6 +37,9 @@ ping(const server& srv);
tl::expected<admire::job, admire::error_code>
register_job(const server& srv, const job_requirements& reqs);

admire::error_code
update_job(const server& srv, const job& job, const job_requirements& reqs);

} // namespace admire::detail

#endif // SCORD_ADMIRE_IMPL_HPP
Loading