Commit 8d29b922 authored by Ramon Nou's avatar Ramon Nou
Browse files

Recover bw

parent 6c399b42
Loading
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -80,10 +80,12 @@ main(int argc, char* argv[]) {

        const auto job = scord::register_job(
                server, scord::job::resources{job_nodes}, reqs, 0);

        const auto transfer = scord::transfer_datasets(
                server, job, sources, targets, qos_limits, mapping);

        scord::transfer_update(server, transfer.id(), 10.0f);
        
        fmt::print(stdout, "ADM_transfer_update() remote procedure completed "
                           "successfully\n");
        exit(EXIT_SUCCESS);
+1 −1
Original line number Diff line number Diff line
@@ -77,7 +77,7 @@ if(SCORD_BUILD_TESTS)
      ${SCORD_CTL_ADDRESS_STRING}
      ${DATA_STAGER_ADDRESS_STRING})
    set_tests_properties(run_${TEST_NAME}
      PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl"
      PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl;cargo"
      ENVIRONMENT "${TEST_ENV}")

    add_test(validate_${TEST_NAME}
+12 −26
Original line number Diff line number Diff line
@@ -756,20 +756,6 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id,
    LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc,
                resp.error_code(), resp.value_or_none());

    // TODO: create a transfer in transfer manager
    // We need the contact point, and different qos

    if(const auto transfer_result =
               m_transfer_manager.create(tx_id.value(), stager_address, limits);
       !transfer_result.has_value()) {
        LOGGER_ERROR(
                "rpc id: {} error_msg: \"Error creating transfer_storage: {}\"",
                rpc.id(), transfer_result.error());
        ec = transfer_result.error();
    }


    req.respond(resp);
}

@@ -830,7 +816,7 @@ rpc_server::scheduler_update() {

    for(const auto& tr_unit : transfer) {
        const auto tr_info = tr_unit.second.get();
        auto bw = tr_info->obtained_bw();
        auto bw = tr_info->measured_bandwidth();
        if(bw == -1) {
            continue;
        }
@@ -841,22 +827,22 @@ rpc_server::scheduler_update() {
        auto qos = tr_info->qos().front().value();
        if(bw + bw * threshold > qos) {
            // Send decrease / slow signal to cargo
            LOGGER_DEBUG("Action for unit {} --> Decrease {}", tr_unit.first,
                         tr_info->contact_point());
            std::pair<std::string, int> entity =
                    std::make_pair(tr_info->contact_point(), -1);
            return_set.push_back(entity);
            LOGGER_DEBUG("Action for unit {} --> Decrease", tr_unit.first
                        );
            
           tr_info->update(20);
          //  std::pair<std::string, int> entity =
            //        std::make_pair(tr_info->contact_point(), -1);
            //return_set.push_back(entity);

        } else if(bw - bw * threshold < qos) {
            // Send increase / speed up signal to cargo
            LOGGER_DEBUG("Action for unit {} --> Increase {}", tr_unit.first,
                         tr_info->contact_point());
            std::pair<std::string, int> entity =
            LOGGER_DEBUG("Action for unit {} --> Increase", tr_unit.first
                         );
            /*std::pair<std::string, int> entity =
                    std::make_pair(tr_info->contact_point(), +1);
            return_set.push_back(entity);
            return_set.push_back(entity);*/
        }
        // Remove from next computations
        tr_info->obtained_bw(-1);
    }
    m_transfer_manager.unlock();
    return return_set;
+2 −5
Original line number Diff line number Diff line
@@ -127,7 +127,7 @@ private:
    job_manager m_job_manager;
    adhoc_storage_manager m_adhoc_manager;
    pfs_storage_manager m_pfs_manager;
    transfer_manager m_transfer_manager;
    transfer_manager<cargo::transfer> m_transfer_manager;


public:
@@ -143,10 +143,7 @@ public:
    std::vector<std::pair<std::string, int>>
    scheduler_update();

    job_manager m_job_manager;
    adhoc_storage_manager m_adhoc_manager;
    pfs_storage_manager m_pfs_manager;
    transfer_manager<cargo::transfer> m_transfer_manager;
  
};

} // namespace scord
+17 −0
Original line number Diff line number Diff line
@@ -116,6 +116,23 @@ struct transfer_manager {
        return tl::make_unexpected(scord::error_code::no_such_entity);
    }

    std::unordered_map<
            scord::transfer_id,
            std::shared_ptr<scord::internal::transfer_metadata<TransferHandle>>>
    transfer() {
        return m_transfer;
    }

    void
    lock() {
        m_transfer_mutex.lock();
    }

    void
    unlock() {
        m_transfer_mutex.unlock();
    }

private:
    mutable abt::shared_mutex m_transfer_mutex;
    std::unordered_map<