Commits on Source (2)
build*
cmake-build-debug
cmake-build-*
.idea
.vscode
.vscode
\ No newline at end of file
......@@ -35,6 +35,7 @@ struct ftio_config {
float confidence;
float probability;
float period;
bool run{false};
};
ftio_config
......@@ -52,16 +53,18 @@ parse_command_line(int argc, char* argv[]) {
app.add_option("-c,--conf", cfg.confidence, "confidence")
->option_text("float")
->required();
->default_val("-1.0");
app.add_option("-p,--probability", cfg.probability, "probability")
->option_text("float")
->default_str("-1.0");
->default_val("-1.0");
app.add_option("-t,--period", cfg.period, "period")
->option_text("float")
->required();
->default_val("-1.0");
app.add_flag(
"--run", cfg.run,
"Trigger stage operation to run now. Has no effect when period is set > 0");
try {
app.parse(argc, argv);
......@@ -94,7 +97,9 @@ main(int argc, char* argv[]) {
if(const auto result = rpc_client.lookup(address); result.has_value()) {
const auto& endpoint = result.value();
const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, cfg.period);
const auto retval =
endpoint.call("ftio_int", cfg.confidence, cfg.probability,
cfg.period, cfg.run);
if(retval.has_value()) {
......
......@@ -179,34 +179,45 @@ master_server::ftio_scheduling_ult() {
while(!m_shutting_down) {
if(!m_pending_transfer.m_work or m_period < 0.0f) {
if(!m_pending_transfer.m_work or !m_ftio_run) {
std::this_thread::sleep_for(1000ms);
}
// if(!m_pending_transfer.m_work or m_period < 0.0f) {
// std::this_thread::sleep_for(1000ms);
// }
// Do something with the confidence and probability
if(m_ftio_changed) {
m_ftio_changed = false;
LOGGER_INFO("Confidence is {}, probability is {} and period is {}",
m_confidence, m_probability, m_period);
}
// if(m_ftio_run) {
// m_ftio_run = false;
// LOGGER_INFO("Confidence is {}, probability is {} and
// period is {}",
// m_confidence, m_probability, m_period);
// }
if(!m_pending_transfer.m_work)
continue;
LOGGER_INFO("Waiting period : {}", m_period);
if(m_period > 0) {
LOGGER_INFO("Waiting period : {}", m_period);
} else {
LOGGER_INFO("Waiting for run trigger ...");
}
// Wait in small periods, just in case we change it, This should be
// mutexed...
auto elapsed = m_period;
while(elapsed > 0) {
std::this_thread::sleep_for(std::chrono::seconds((int) (1)));
elapsed -= 1;
if(m_ftio_changed) {
// reset elapsed value when new RPC comes in
if(m_ftio_run) {
elapsed = m_period;
m_ftio_changed = false;
m_ftio_run = false;
}
}
if(!m_ftio_run) {
continue;
}
LOGGER_INFO("Checking if there is work to do in {}",
m_pending_transfer.m_sources);
......@@ -239,6 +250,12 @@ master_server::ftio_scheduling_ult() {
fs->unlink(file.path());
}
}
if(m_period > 0) {
// always run whenever period is set
m_ftio_run = true;
} else {
m_ftio_run = false;
}
}
LOGGER_INFO("Shutting down.");
......@@ -636,7 +653,7 @@ master_server::transfer_statuses(const network::request& req,
void
master_server::ftio_int(const network::request& req, float conf, float prob,
float period) {
float period, bool run) {
using network::get_address;
using network::rpc_info;
using proto::generic_response;
......@@ -646,12 +663,14 @@ master_server::ftio_int(const network::request& req, float conf, float prob,
m_confidence = conf;
m_probability = prob;
m_period = period;
m_ftio_changed = true;
m_ftio_run = run;
if(m_period > 0)
m_ftio_run = true;
m_ftio = true;
LOGGER_INFO(
"rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}",
rpc, conf, prob, period);
"rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}}}",
rpc, conf, prob, period, run);
const auto resp = generic_response{rpc.id(), error_code::success};
......
......@@ -89,7 +89,7 @@ private:
void
ftio_int(const network::request& req, float confidence, float probability,
float period);
float period, bool run);
private:
// Dedicated execution stream for the MPI listener ULT
......@@ -104,7 +104,7 @@ private:
float m_confidence = -1.0f;
float m_probability = -1.0f;
float m_period = -1.0f;
bool m_ftio_changed = true;
bool m_ftio_run = true;
// FTIO enabled flag, we need to call ftio once.
bool m_ftio = false;
......