Verified Commit 36126af5 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Reimplement ADM_transfer_dataset stubs

parent 6287b09d
Loading
Loading
Loading
Loading
+12 −32
Original line number Diff line number Diff line
@@ -400,41 +400,22 @@ remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage) {
    return ADM_SUCCESS;
}

ADM_return_t
transfer_dataset(const server& srv, ADM_job_t job, ADM_dataset_t** sources,
                 ADM_dataset_t** targets, ADM_qos_limit_t** limits,
                 ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer) {
    (void) srv;
    (void) job;
    (void) sources;
    (void) targets;
    (void) limits;
    (void) mapping;
    (void) transfer;

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("ADM_transfer_dataset(...)");
admire::transfer
transfer_dataset(const server& srv, const job& job,
                 const std::vector<dataset>& sources,
                 const std::vector<dataset>& targets,
                 const std::vector<qos::limit>& limits,
                 transfer::mapping mapping) {

    ADM_transfer_dataset_in_t in{};
    ADM_transfer_dataset_out_t out;
    const auto rv = detail::transfer_dataset(srv, job, sources, targets, limits,
                                             mapping);

    in.source = "/tmp";
    in.destination = "/tmp";
    in.qos_constraints = "constraints";
    in.distribution = "distribution";

    const auto rpc = endp.call("ADM_transfer_dataset", &in, &out);

    if(out.ret < 0) {
        LOGGER_ERROR("ADM_transfer_dataset() = {}", out.ret);
        return static_cast<ADM_return_t>(out.ret);
    if(!rv) {
        throw std::runtime_error(fmt::format("ADM_transfer_dataset() error: {}",
                                             ADM_strerror(rv.error())));
    }

    LOGGER_INFO("ADM_transfer_dataset() = {}", ADM_SUCCESS);
    return ADM_SUCCESS;
    return rv.value();
}

ADM_return_t
@@ -780,7 +761,6 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job,
    (void) op;
    (void) should_stream;
    (void) args;
    (void) transfer;

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

+10 −6
Original line number Diff line number Diff line
@@ -185,13 +185,16 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage);
 *
 * @param[in] server The server to which the request is directed
 * @param[in] job An ADM_JOB identifying the originating job.
 * @param[in] sources A list of DATASETs identifying the source dataset/s
 * @param[in] sources An array of DATASETs identifying the source dataset/s
 * to be transferred.
 * @param[in] targets A list of DATASETs identifying the destination
 * @param[in] sources_len The number of DATASETs stored in sources.
 * @param[in] targets An array of DATASETs identifying the destination
 * dataset/s and its/their desired locations in a storage tier.
 * @param[in] limits A list of QOS_CONSTRAINTS that must be applied to
 * @param[in] targets_len The number of DATASETs stored in targets.
 * @param[in] limits An array of QOS_CONSTRAINTS that must be applied to
 * the transfer. These may not exceed the global ones set at node, application,
 * or resource level.
 * @param[in] limits_len The number of QOS_CONSTRAINTS stored in limits.
 * @param[in] mapping A distribution strategy for the transfers (e.g.
 * ONE_TO_ONE, ONE_TO_MANY, MANY_TO_MANY)
 * @param[out] transfer A ADM_TRANSFER allowing clients to interact
@@ -202,9 +205,10 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage);
 */
ADM_return_t
ADM_transfer_dataset(ADM_server_t server, ADM_job_t job,
                     ADM_dataset_t** sources, ADM_dataset_t** targets,
                     ADM_qos_limit_t** limits, ADM_transfer_mapping_t mapping,
                     ADM_transfer_t* transfer);
                     ADM_dataset_t sources[], size_t sources_len,
                     ADM_dataset_t targets[], size_t targets_len,
                     ADM_qos_limit_t limits[], size_t limits_len,
                     ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer);


/**
+6 −4
Original line number Diff line number Diff line
@@ -81,10 +81,12 @@ update_pfs_storage(const server& srv, ADM_pfs_context_t ctx,
ADM_return_t
remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage);

ADM_return_t
transfer_dataset(const server& srv, ADM_job_t job, ADM_dataset_t** sources,
                 ADM_dataset_t** targets, ADM_qos_limit_t** limits,
                 ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer);
admire::transfer
transfer_dataset(const server& srv, const job& job,
                 const std::vector<dataset>& sources,
                 const std::vector<dataset>& targets,
                 const std::vector<qos::limit>& limits,
                 transfer::mapping mapping);

ADM_return_t
set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,
+17 −6
Original line number Diff line number Diff line
@@ -140,14 +140,25 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t pfs_storage) {

ADM_return_t
ADM_transfer_dataset(ADM_server_t server, ADM_job_t job,
                     ADM_dataset_t** sources, ADM_dataset_t** targets,
                     ADM_qos_limit_t** limits, ADM_transfer_mapping_t mapping,
                     ADM_transfer_t* transfer) {
                     ADM_dataset_t sources[], size_t sources_len,
                     ADM_dataset_t targets[], size_t targets_len,
                     ADM_qos_limit_t limits[], size_t limits_len,
                     ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer) {

    const auto rv = admire::detail::transfer_dataset(
            admire::server{server}, admire::job{job},
            admire::api::convert(sources, sources_len),
            admire::api::convert(targets, targets_len),
            admire::api::convert(limits, limits_len),
            static_cast<admire::transfer::mapping>(mapping));

    const admire::server srv{server};
    if(!rv) {
        return rv.error();
    }

    *transfer = admire::api::convert(*rv).release();

    return admire::transfer_dataset(srv, job, sources, targets, limits, mapping,
                                    transfer);
    return ADM_SUCCESS;
}

ADM_return_t
+42 −0
Original line number Diff line number Diff line
@@ -255,4 +255,46 @@ remove_job(const server& srv, const job& job) {
    return ADM_SUCCESS;
}

tl::expected<transfer, error_code>
transfer_dataset(const server& srv, const job& job,
                 const std::vector<dataset>& sources,
                 const std::vector<dataset>& targets,
                 const std::vector<qos::limit>& limits,
                 transfer::mapping mapping) {

    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};

    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("RPC (ADM_{}) => {{job: {{{}}}, sources: {}, targets: {}, "
                "limits: {}, mapping: {}}}",
                __FUNCTION__, job, sources, targets, limits, mapping);

    const auto rpc_job = api::convert(job);
    const auto rpc_sources = api::convert(sources);
    const auto rpc_targets = api::convert(targets);
    const auto rpc_qos_limits = api::convert(limits);

    ADM_transfer_dataset_in_t in{rpc_job.get(), rpc_sources.get(),
                                 rpc_targets.get(), rpc_qos_limits.get(),
                                 static_cast<ADM_transfer_mapping_t>(mapping)};
    ADM_transfer_dataset_out_t out;

    [[maybe_unused]] const auto rpc =
            endp.call("ADM_transfer_dataset", &in, &out);

    if(out.retval < 0) {
        LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__,
                     out.retval);
        return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
    }

    const admire::transfer tx = api::convert(out.tx);

    LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, transfer: {{{}}}}}",
                __FUNCTION__, ADM_SUCCESS, tx);
    return tx;
}


} // namespace admire::detail
Loading