Commits on Source (2)
build* build*
cmake-build-debug cmake-build-*
.idea .idea
.vscode .vscode
\ No newline at end of file
...@@ -35,6 +35,7 @@ struct ftio_config { ...@@ -35,6 +35,7 @@ struct ftio_config {
float confidence; float confidence;
float probability; float probability;
float period; float period;
bool run{false};
}; };
ftio_config ftio_config
...@@ -52,16 +53,18 @@ parse_command_line(int argc, char* argv[]) { ...@@ -52,16 +53,18 @@ parse_command_line(int argc, char* argv[]) {
app.add_option("-c,--conf", cfg.confidence, "confidence") app.add_option("-c,--conf", cfg.confidence, "confidence")
->option_text("float") ->option_text("float")
->required(); ->default_val("-1.0");
app.add_option("-p,--probability", cfg.probability, "probability") app.add_option("-p,--probability", cfg.probability, "probability")
->option_text("float") ->option_text("float")
->default_str("-1.0"); ->default_val("-1.0");
app.add_option("-t,--period", cfg.period, "period") app.add_option("-t,--period", cfg.period, "period")
->option_text("float") ->option_text("float")
->required(); ->default_val("-1.0");
app.add_flag(
"--run", cfg.run,
"Trigger stage operation to run now. Has no effect when period is set > 0");
try { try {
app.parse(argc, argv); app.parse(argc, argv);
...@@ -94,7 +97,9 @@ main(int argc, char* argv[]) { ...@@ -94,7 +97,9 @@ main(int argc, char* argv[]) {
if(const auto result = rpc_client.lookup(address); result.has_value()) { if(const auto result = rpc_client.lookup(address); result.has_value()) {
const auto& endpoint = result.value(); const auto& endpoint = result.value();
const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, cfg.period); const auto retval =
endpoint.call("ftio_int", cfg.confidence, cfg.probability,
cfg.period, cfg.run);
if(retval.has_value()) { if(retval.has_value()) {
......
...@@ -179,34 +179,45 @@ master_server::ftio_scheduling_ult() { ...@@ -179,34 +179,45 @@ master_server::ftio_scheduling_ult() {
while(!m_shutting_down) { while(!m_shutting_down) {
if(!m_pending_transfer.m_work or m_period < 0.0f) { if(!m_pending_transfer.m_work or !m_ftio_run) {
std::this_thread::sleep_for(1000ms); std::this_thread::sleep_for(1000ms);
} }
// if(!m_pending_transfer.m_work or m_period < 0.0f) {
// std::this_thread::sleep_for(1000ms);
// }
// Do something with the confidence and probability // Do something with the confidence and probability
if(m_ftio_changed) { // if(m_ftio_run) {
m_ftio_changed = false; // m_ftio_run = false;
LOGGER_INFO("Confidence is {}, probability is {} and period is {}", // LOGGER_INFO("Confidence is {}, probability is {} and
m_confidence, m_probability, m_period); // period is {}",
} // m_confidence, m_probability, m_period);
// }
if(!m_pending_transfer.m_work) if(!m_pending_transfer.m_work)
continue; continue;
if(m_period > 0) {
LOGGER_INFO("Waiting period : {}", m_period); LOGGER_INFO("Waiting period : {}", m_period);
} else {
LOGGER_INFO("Waiting for run trigger ...");
}
// Wait in small periods, just in case we change it, This should be // Wait in small periods, just in case we change it, This should be
// mutexed... // mutexed...
auto elapsed = m_period; auto elapsed = m_period;
while(elapsed > 0) { while(elapsed > 0) {
std::this_thread::sleep_for(std::chrono::seconds((int) (1))); std::this_thread::sleep_for(std::chrono::seconds((int) (1)));
elapsed -= 1; elapsed -= 1;
if(m_ftio_changed) { // reset elapsed value when new RPC comes in
if(m_ftio_run) {
elapsed = m_period; elapsed = m_period;
m_ftio_changed = false; m_ftio_run = false;
} }
} }
if(!m_ftio_run) {
continue;
}
LOGGER_INFO("Checking if there is work to do in {}", LOGGER_INFO("Checking if there is work to do in {}",
m_pending_transfer.m_sources); m_pending_transfer.m_sources);
...@@ -239,6 +250,12 @@ master_server::ftio_scheduling_ult() { ...@@ -239,6 +250,12 @@ master_server::ftio_scheduling_ult() {
fs->unlink(file.path()); fs->unlink(file.path());
} }
} }
if(m_period > 0) {
// always run whenever period is set
m_ftio_run = true;
} else {
m_ftio_run = false;
}
} }
LOGGER_INFO("Shutting down."); LOGGER_INFO("Shutting down.");
...@@ -636,7 +653,7 @@ master_server::transfer_statuses(const network::request& req, ...@@ -636,7 +653,7 @@ master_server::transfer_statuses(const network::request& req,
void void
master_server::ftio_int(const network::request& req, float conf, float prob, master_server::ftio_int(const network::request& req, float conf, float prob,
float period) { float period, bool run) {
using network::get_address; using network::get_address;
using network::rpc_info; using network::rpc_info;
using proto::generic_response; using proto::generic_response;
...@@ -646,12 +663,14 @@ master_server::ftio_int(const network::request& req, float conf, float prob, ...@@ -646,12 +663,14 @@ master_server::ftio_int(const network::request& req, float conf, float prob,
m_confidence = conf; m_confidence = conf;
m_probability = prob; m_probability = prob;
m_period = period; m_period = period;
m_ftio_changed = true; m_ftio_run = run;
if(m_period > 0)
m_ftio_run = true;
m_ftio = true; m_ftio = true;
LOGGER_INFO( LOGGER_INFO(
"rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}", "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}}}",
rpc, conf, prob, period); rpc, conf, prob, period, run);
const auto resp = generic_response{rpc.id(), error_code::success}; const auto resp = generic_response{rpc.id(), error_code::success};
......
...@@ -89,7 +89,7 @@ private: ...@@ -89,7 +89,7 @@ private:
void void
ftio_int(const network::request& req, float confidence, float probability, ftio_int(const network::request& req, float confidence, float probability,
float period); float period, bool run);
private: private:
// Dedicated execution stream for the MPI listener ULT // Dedicated execution stream for the MPI listener ULT
...@@ -104,7 +104,7 @@ private: ...@@ -104,7 +104,7 @@ private:
float m_confidence = -1.0f; float m_confidence = -1.0f;
float m_probability = -1.0f; float m_probability = -1.0f;
float m_period = -1.0f; float m_period = -1.0f;
bool m_ftio_changed = true; bool m_ftio_run = true;
// FTIO enabled flag, we need to call ftio once. // FTIO enabled flag, we need to call ftio once.
bool m_ftio = false; bool m_ftio = false;
......