Verified Commit 959561d9 authored by Marc Vef's avatar Marc Vef
Browse files

Adding --run argument to cargo_ftio to trigger run now. Has no effect when period is set.

parent 5b3d7d3b
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
build*
cmake-build-debug
cmake-build-*
.idea
.vscode
 No newline at end of file
+10 −5
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@ struct ftio_config {
    float confidence;
    float probability;
    float period;
    bool run{false};
};

ftio_config
@@ -52,16 +53,18 @@ parse_command_line(int argc, char* argv[]) {

    app.add_option("-c,--conf", cfg.confidence, "confidence")
            ->option_text("float")
            ->required();
            ->default_val("-1.0");

    app.add_option("-p,--probability", cfg.probability, "probability")
            ->option_text("float")
            ->default_str("-1.0");
            ->default_val("-1.0");

    app.add_option("-t,--period", cfg.period, "period")
            ->option_text("float")
            ->required();

            ->default_val("-1.0");
    app.add_flag(
            "--run", cfg.run,
            "Trigger stage operation to run now. Has no effect when period is set > 0");

    try {
        app.parse(argc, argv);
@@ -94,7 +97,9 @@ main(int argc, char* argv[]) {

        if(const auto result = rpc_client.lookup(address); result.has_value()) {
            const auto& endpoint = result.value();
            const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, cfg.period);
            const auto retval =
                    endpoint.call("ftio_int", cfg.confidence, cfg.probability,
                                  cfg.period, cfg.run);

            if(retval.has_value()) {

+33 −14
Original line number Diff line number Diff line
@@ -179,34 +179,45 @@ master_server::ftio_scheduling_ult() {

    while(!m_shutting_down) {

        if(!m_pending_transfer.m_work or m_period < 0.0f) {
        if(!m_pending_transfer.m_work or !m_ftio_run) {
            std::this_thread::sleep_for(1000ms);
        }
        //        if(!m_pending_transfer.m_work or m_period < 0.0f) {
        //            std::this_thread::sleep_for(1000ms);
        //        }


        // Do something with the confidence and probability

        if(m_ftio_changed) {
            m_ftio_changed = false;
            LOGGER_INFO("Confidence is {}, probability is {} and period is {}",
                        m_confidence, m_probability, m_period);
        }
        //        if(m_ftio_run) {
        //            m_ftio_run = false;
        //            LOGGER_INFO("Confidence is {}, probability is {} and
        //            period is {}",
        //                        m_confidence, m_probability, m_period);
        //        }

        if(!m_pending_transfer.m_work)
            continue;

        if(m_period > 0) {
            LOGGER_INFO("Waiting period : {}", m_period);
        } else {
            LOGGER_INFO("Waiting for run trigger ...");
        }
        // Wait in small periods, just in case we change it, This should be
        // mutexed...
        auto elapsed = m_period;
        while(elapsed > 0) {
            std::this_thread::sleep_for(std::chrono::seconds((int) (1)));
            elapsed -= 1;
            if(m_ftio_changed) {
            // reset elapsed value when new RPC comes in
            if(m_ftio_run) {
                elapsed = m_period;
                m_ftio_changed = false;
                m_ftio_run = false;
            }
        }
        if(!m_ftio_run) {
            continue;
        }

        LOGGER_INFO("Checking if there is work to do in {}",
                    m_pending_transfer.m_sources);
@@ -239,6 +250,12 @@ master_server::ftio_scheduling_ult() {
                fs->unlink(file.path());
            }
        }
        if(m_period > 0) {
            // always run whenever period is set
            m_ftio_run = true;
        } else {
            m_ftio_run = false;
        }
    }

    LOGGER_INFO("Shutting down.");
@@ -636,7 +653,7 @@ master_server::transfer_statuses(const network::request& req,

void
master_server::ftio_int(const network::request& req, float conf, float prob,
                        float period) {
                        float period, bool run) {
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;
@@ -646,12 +663,14 @@ master_server::ftio_int(const network::request& req, float conf, float prob,
    m_confidence = conf;
    m_probability = prob;
    m_period = period;
    m_ftio_changed = true;
    m_ftio_run = run;
    if(m_period > 0)
        m_ftio_run = true;
    m_ftio = true;

    LOGGER_INFO(
            "rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}",
            rpc, conf, prob, period);
            "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}}}",
            rpc, conf, prob, period, run);

    const auto resp = generic_response{rpc.id(), error_code::success};

+2 −2
Original line number Diff line number Diff line
@@ -89,7 +89,7 @@ private:

    void
    ftio_int(const network::request& req, float confidence, float probability,
             float period);
             float period, bool run);

private:
    // Dedicated execution stream for the MPI listener ULT
@@ -104,7 +104,7 @@ private:
    float m_confidence = -1.0f;
    float m_probability = -1.0f;
    float m_period = -1.0f;
    bool m_ftio_changed = true;
    bool m_ftio_run = true;
    // FTIO enabled flag, we need to call ftio once.
    bool m_ftio = false;