Commit 1c1168bf authored by Ramon Nou's avatar Ramon Nou
Browse files

Added QoS (simplified some weird memory/pointer issue)

parent 7fed8519
Loading
Loading
Loading
Loading
Loading
+50 −16
Original line number Diff line number Diff line
@@ -72,6 +72,8 @@
#define TAG_DATASET_OUTPUT                 7
#define TAG_DATASET_EXPECTED_OUTPUT        8
#define TAG_DATASET_EXPECTED_INOUT_DATASET 9
#define TAG_QOS_LIMIT                      10


// clang-format off
SPANK_PLUGIN (admire-cli, 1)
@@ -93,6 +95,7 @@ ADM_dataset_route_t* expected_output_datasets = NULL;
size_t expected_output_datasets_count = 0;
ADM_dataset_route_t* expected_inout_datasets = NULL;
size_t expected_inout_datasets_count = 0;
int64_t limit = 0;

/* server-related options */
typedef struct {
@@ -241,6 +244,9 @@ struct spank_option spank_opts[] = {
                TAG_DATASET_EXPECTED_INOUT_DATASET, /* option tag */
                (spank_opt_cb_f) process_opts       /* callback  */
        },
        {"adm-qos-limit", "qos-limit-bw",
         "Define the qos limit for the tranfer operation.", 1, TAG_QOS_LIMIT,
         (spank_opt_cb_f) process_opts},
        SPANK_OPTIONS_TABLE_END};

int
@@ -364,6 +370,18 @@ process_opts(int tag, const char* optarg, int remote) {
                return -1;
            }
            return 0;
        case TAG_QOS_LIMIT:
            char* endptr;
            errno = 0;

            limit = 0;

            limit = strtol(optarg, &endptr, 0);
            if(errno != 0 || endptr == optarg || *endptr != '\0' ||
               limit <= 0) {
                return -1;
            }
            return 0;

        default:
            return -1;
@@ -634,13 +652,21 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg,
            sources[i] = scord_reqs->r_inputs->l_routes[i].d_src;
            targets[i] = scord_reqs->r_inputs->l_routes[i].d_dst;
        }
        // Unfortunaly we have to sleep or cargo will not find the instance up.
        // Unfortunaly we have to sleep or cargo will not find the instance
        // up.
        sleep(5);

        if(ADM_transfer_datasets(
        int nlimit = 0;
        if(limit > 0) {

            nlimit = 1;
        }

        slurm_info("%s: prepared limits %ld", plugin_name, limit);
        if(ADM_transfer_datasets_1(
                   scord_server, scord_job, sources, input_datasets_count,
                   targets, input_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE,
                   &transfer, true) != ADM_SUCCESS) {
                   targets, input_datasets_count, limit, nlimit,
                   ADM_MAPPING_ONE_TO_ONE, &transfer, true) != ADM_SUCCESS) {
            slurm_error("%s: adhoc storage transfer failed", plugin_name);
            rc = -1;
            goto end;
@@ -685,9 +711,9 @@ end:
}

/**
 * Called just after plugins are loaded. In remote context, this is just after
 * job step is initialized. This function is called before any plugin option
 * processing.
 * Called just after plugins are loaded. In remote context, this is just
 * after job step is initialized. This function is called before any plugin
 * option processing.
 *
 * ┌-----------------------┐
 * | Command | Context     |
@@ -727,8 +753,8 @@ slurm_spank_init(spank_t sp, int ac, char** av) {

/**
 * Called in local context only after all options have been processed.
 * This is called after the job ID and step IDs are available. This happens in
 * `srun` after the allocation is made, but before tasks are launched.
 * This is called after the job ID and step IDs are available. This happens
 * in `srun` after the allocation is made, but before tasks are launched.
 *
 * ┌-----------------------┐
 * | Command | Context     |
@@ -947,10 +973,16 @@ scord_unregister_job(spank_t sp, scord_plugin_config_t cfg,
        }

      
        if(ADM_transfer_datasets(
        int nlimit = 0;

        if(limit > 0) {
            nlimit = 1;
        }

        if(ADM_transfer_datasets_1(
                   scord_server, scord_job, sources, output_datasets_count,
                   targets, output_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE,
                   &transfer, true) != ADM_SUCCESS) {
                   targets, output_datasets_count, limit, nlimit,
                   ADM_MAPPING_ONE_TO_ONE, &transfer, true) != ADM_SUCCESS) {
            slurm_error("%s: adhoc storage transfer failed", plugin_name);
            rc = -1;
            goto end;
@@ -1016,7 +1048,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) {
    // spank_context_t sctx = spank_context();


    //    slurm_debug("%s: %s() registering options", plugin_name, __func__);
    //    slurm_debug("%s: %s() registering options", plugin_name,
    //    __func__);

    /* register adm/scord options */
    //    struct spank_option* opt = &spank_opts[0];
@@ -1033,7 +1066,8 @@ slurm_spank_exit(spank_t sp, int ac, char** av) {
    /* Get relative for the node executing id. Job registration is only done
     * by the node with ID 0 */
    spank_context_t sctx = spank_context();
    if(sctx != S_CTX_REMOTE) return 0;
    if(sctx != S_CTX_REMOTE)
        return 0;
    uint32_t nodeid;

    if((rc = spank_get_item(sp, S_JOB_NODEID, &nodeid)) != ESPANK_SUCCESS) {
+65 −6
Original line number Diff line number Diff line
@@ -58,6 +58,16 @@ convert(ADM_qos_limit_t limits[], size_t limits_len) {
    return rv;
}

std::vector<scord::qos::limit>
convert(uint64_t limit, size_t limits_len) {

    std::vector<scord::qos::limit> rv(limits_len);

    rv[0] = scord::qos::limit{scord::qos::subclass::bandwidth, limit};

    return rv;
}

} // namespace


@@ -246,11 +256,9 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
                scord::server{server}, scord::job{job},
                scord::transfer{rv.value()});
        if(!rv_wait) {
            if(rv_wait.error().value() == scord::error_code::no_such_entity)
            {
            if(rv_wait.error().value() == scord::error_code::no_such_entity) {
                return ADM_SUCCESS;
            }
            else
            } else
                return rv_wait.error();
        }
        auto status = rv_wait.value().status();
@@ -261,11 +269,62 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
            rv_wait = scord::detail::query_transfer(
                    scord::server{server}, scord::job{job},
                    scord::transfer{rv.value()});
            if(!rv_wait) {
                if(rv_wait.error().value() ==
                   scord::error_code::no_such_entity) {
                    return ADM_SUCCESS;
                } else
                    return rv_wait.error();
            }
            status = rv_wait.value().status();
        }
    }

    return ADM_SUCCESS;
}

ADM_return_t
ADM_transfer_datasets_1(ADM_server_t server, ADM_job_t job,
                        ADM_dataset_t sources[], size_t sources_len,
                        ADM_dataset_t targets[], size_t targets_len,
                        uint64_t limit, size_t limits_len,
                        ADM_transfer_mapping_t mapping,
                        ADM_transfer_t* transfer, bool wait = false) {

    const auto rv = scord::detail::transfer_datasets(
            scord::server{server}, scord::job{job},
            ::convert(sources, sources_len), ::convert(targets, targets_len),
            ::convert(limit, limits_len),
            static_cast<scord::transfer::mapping>(mapping));

    if(!rv) {
        return rv.error();
    }

    *transfer = static_cast<ADM_transfer_t>(rv.value());
    if(wait) {
        auto rv_wait = scord::detail::query_transfer(
                scord::server{server}, scord::job{job},
                scord::transfer{rv.value()});
        if(!rv_wait) {
            if(rv_wait.error().value() == scord::error_code::no_such_entity) {
                return ADM_SUCCESS;
            } else
                return rv_wait.error();
        }
                else
        auto status = rv_wait.value().status();

        while(status == scord::transfer_state::type::running or
              status == scord::transfer_state::type::queued) {
            sleep(5);
            rv_wait = scord::detail::query_transfer(
                    scord::server{server}, scord::job{job},
                    scord::transfer{rv.value()});
            if(!rv_wait) {
                if(rv_wait.error().value() ==
                   scord::error_code::no_such_entity) {
                    return ADM_SUCCESS;
                } else
                    return rv_wait.error();
            }
            status = rv_wait.value().status();
+8 −0
Original line number Diff line number Diff line
@@ -251,6 +251,14 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
                      ADM_qos_limit_t limits[], size_t limits_len,
                      ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait);


ADM_return_t
ADM_transfer_datasets_1(ADM_server_t server, ADM_job_t job,
                      ADM_dataset_t sources[], size_t sources_len,
                      ADM_dataset_t targets[], size_t targets_len,
                      uint64_t limit, size_t limits_len,
                      ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait);

/**
 * Sets the obtained bw for the transfer operation
 *