Program Listing for File util.cpp
↰ Return to documentation for file (src/proxy/util.cpp)
/*
Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This software was partially supported by the
the European Union’s Horizon 2020 JTI-EuroHPC research and
innovation programme, by the project ADMIRE (Project ID: 956748,
admire-eurohpc.eu)
This project was partially promoted by the Ministry for Digital Transformation
and the Civil Service, within the framework of the Recovery,
Transformation and Resilience Plan - Funded by the European Union
-NextGenerationEU.
SPDX-License-Identifier: MIT
*/
#include <proxy/util.hpp>
#include <proxy/proxy.hpp>
#include <proxy/env.hpp>
#include <common/env_util.hpp>
#include <common/rpc/rpc_util.hpp>
#include <filesystem>
#include <csignal>
#include <regex>
#include <random>
#include <fstream>
#include <thread>
extern "C" {
#include <unistd.h>
}
using namespace std;
namespace fs = std::filesystem;
namespace {
vector<pair<string, string>>
load_hostfile(const std::string& lfpath) {
PROXY_DATA->log()->debug("{}() Loading hosts file: '{}'", __func__, lfpath);
ifstream lf(lfpath);
if(!lf) {
throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
lfpath, strerror(errno)));
}
vector<pair<string, string>> hosts;
const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$",
regex::ECMAScript | regex::optimize);
string line;
string host;
string uri;
std::smatch match;
while(getline(lf, line)) {
if(line[0] == '#')
continue;
if(!regex_match(line, match, line_re)) {
PROXY_DATA->log()->debug(
"{}() Unrecognized line format: [path: '{}', line: '{}']",
__func__, lfpath, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
}
host = match[1];
if(match.size() < 3) {
throw runtime_error(fmt::format(
"hostfile does not have three columns for daemon RPC proxy server"));
}
uri = match[3];
if(!PROXY_DATA->use_auto_sm() &&
uri.find("na+sm") != std::string::npos) {
PROXY_DATA->use_auto_sm(true);
PROXY_DATA->log()->info(
"{}() auto_sm detected in daemon hosefile. Enabling it on proxy ...",
__func__);
}
hosts.emplace_back(host, uri);
}
return hosts;
}
} // namespace
namespace gkfs::util {
bool
is_proxy_already_running() {
const auto& pid_path = PROXY_DATA->pid_file_path();
// check if another proxy is already running
if(fs::exists(pid_path)) {
ifstream ifs(pid_path, ::ifstream::in);
if(ifs) {
string running_pid{};
if(getline(ifs, running_pid) && !running_pid.empty()) {
// check if process exists without killing it. Signal 0 doesn't
// kill
if(0 == ::kill(::stoi(running_pid), 0))
return true;
}
} else {
throw runtime_error(
"FATAL: pid file of another proxy already exists, but cannot be opened. Exiting ...");
}
ifs.close();
fs::remove(pid_path);
}
return false;
}
void
create_proxy_pid_file() {
/*
* - na+sm pid address
* - file name socket (numa node) getcpu() call in #include <linux/getcpu.h>
* - only allow one per socket
*/
const auto& pid_path = PROXY_DATA->pid_file_path();
auto my_pid = getpid();
if(my_pid == -1) {
throw runtime_error("Unable to get own pid for proxy pid file");
}
ofstream ofs(pid_path, ::ofstream::trunc);
if(ofs) {
ofs << to_string(my_pid);
ofs << "\n";
ofs << PROXY_DATA->server_self_addr();
} else {
throw runtime_error("Unable to create proxy pid file");
}
}
void
remove_proxy_pid_file() {
const auto& pid_path = PROXY_DATA->pid_file_path();
fs::remove(pid_path);
}
bool
check_for_hosts_file(const std::string& hostfile) {
return fs::exists(hostfile);
}
vector<pair<string, string>>
read_hosts_file(const std::string& hostfile) {
vector<pair<string, string>> hosts;
try {
hosts = load_hostfile(hostfile);
} catch(const exception& e) {
auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
throw runtime_error(emsg);
}
if(hosts.empty()) {
throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
}
PROXY_DATA->log()->info("{}() Daemon pool size: '{}'", __func__,
hosts.size());
return hosts;
}
void
connect_to_hosts(const vector<pair<string, string>>& hosts) {
auto local_hostname = gkfs::rpc::get_my_hostname(true);
bool local_host_found = false;
PROXY_DATA->hosts_size(hosts.size());
vector<uint64_t> host_ids(hosts.size());
// populate vector with [0, ..., host_size - 1]
::iota(::begin(host_ids), ::end(host_ids), 0);
/*
* Shuffle hosts to balance addr lookups to all hosts
* Too many concurrent lookups send to same host
* could overwhelm the server,
* returning error when addr lookup
*/
::random_device rd; // obtain a random number from hardware
::mt19937 g(rd()); // seed the random generator
::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
// lookup addresses and put abstract server addresses into rpc_addresses
for(const auto& id : host_ids) {
const auto& hostname = hosts.at(id).first;
const auto& uri = hosts.at(id).second;
hg_addr_t svr_addr = HG_ADDR_NULL;
// try to look up 3 times before erroring out
hg_return_t ret;
for(uint32_t i = 0; i < 4; i++) {
ret = margo_addr_lookup(PROXY_DATA->client_rpc_mid(), uri.c_str(),
&svr_addr);
if(ret != HG_SUCCESS) {
// still not working after 5 tries.
if(i == 4) {
auto err_msg =
fmt::format("{}() Unable to lookup address '{}'",
__func__, uri);
throw runtime_error(err_msg);
}
// Wait a random amount of time and try again
::mt19937 eng(rd()); // seed the random generator
::uniform_int_distribution<> distr(
50, 50 * (i + 2)); // define the range
::this_thread::sleep_for(std::chrono::milliseconds(distr(eng)));
} else {
break;
}
}
if(svr_addr == HG_ADDR_NULL) {
auto err_msg = fmt::format(
"{}() looked up address is NULL for address '{}'", __func__,
uri);
throw runtime_error(err_msg);
}
PROXY_DATA->rpc_endpoints().insert(make_pair(id, svr_addr));
if(!local_host_found && hostname == local_hostname) {
PROXY_DATA->log()->debug("{}() Found local host: {}", __func__,
hostname);
PROXY_DATA->local_host_id(id);
local_host_found = true;
}
PROXY_DATA->log()->debug("{}() Found daemon: id '{}' uri '{}'",
__func__, id, uri);
}
if(!local_host_found) {
PROXY_DATA->log()->warn(
"{}() Failed to find local host. Using host '0' as local host",
__func__);
PROXY_DATA->local_host_id(0);
}
}
} // namespace gkfs::util