Program Listing for File proxy.cpp
↰ Return to documentation for file (src/proxy/proxy.cpp)
/*
Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This software was partially supported by the
the European Union’s Horizon 2020 JTI-EuroHPC research and
innovation programme, by the project ADMIRE (Project ID: 956748,
admire-eurohpc.eu)
This project was partially promoted by the Ministry for Digital Transformation
and the Civil Service, within the framework of the Recovery,
Transformation and Resilience Plan - Funded by the European Union
-NextGenerationEU.
SPDX-License-Identifier: MIT
*/
#include <proxy/proxy.hpp>
#include <proxy/util.hpp>
#include <proxy/rpc/rpc_defs.hpp>
#include <common/log_util.hpp>
#include <common/env_util.hpp>
#include <common/rpc/rpc_types.hpp>
#include <common/rpc/distributor.hpp>
#include <common/rpc/rpc_util.hpp>
#include <CLI/CLI.hpp>
#include <iostream>
#include <csignal>
#include <condition_variable>
using namespace std;
static condition_variable shutdown_please;
static mutex mtx;
struct cli_options {
string hosts_file;
string proxy_protocol;
string pid_path;
};
void
register_server_ipcs(margo_instance_id mid) {
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_write,
rpc_client_proxy_write_in_t, rpc_data_out_t,
proxy_rpc_srv_write)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_read,
rpc_client_proxy_read_in_t, rpc_data_out_t,
proxy_rpc_srv_read)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_truncate,
rpc_client_proxy_trunc_in_t, rpc_err_out_t,
proxy_rpc_srv_truncate)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_chunk_stat,
rpc_chunk_stat_in_t, rpc_chunk_stat_out_t,
proxy_rpc_srv_chunk_stat)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_create, rpc_mk_node_in_t,
rpc_err_out_t, proxy_rpc_srv_create)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_stat, rpc_path_only_in_t,
rpc_stat_out_t, proxy_rpc_srv_stat)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_remove, rpc_rm_node_in_t,
rpc_err_out_t, proxy_rpc_srv_remove)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_decr_size, rpc_trunc_in_t,
rpc_err_out_t, proxy_rpc_srv_decr_size)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_get_size,
rpc_path_only_in_t, rpc_get_metadentry_size_out_t,
proxy_rpc_srv_get_metadentry_size)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_update_size,
rpc_update_metadentry_size_in_t,
rpc_update_metadentry_size_out_t,
proxy_rpc_srv_update_metadentry_size)
MARGO_REGISTER(mid, gkfs::rpc::tag::client_proxy_get_dirents_extended,
rpc_proxy_get_dirents_in_t, rpc_get_dirents_out_t,
proxy_rpc_srv_get_dirents_extended)
}
void
init_ipc_server() {
hg_addr_t addr_self;
hg_size_t addr_self_cstring_sz = 128;
char addr_self_cstring[128];
struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER;
hg_options.auto_sm = HG_FALSE;
hg_options.stats = HG_FALSE;
// Start Margo (this will also initialize Argobots and Mercury internally)
auto margo_config = fmt::format(
R"({{ "use_progress_thread" : true, "rpc_thread_count" : {} }})",
gkfs::config::rpc::proxy_handler_xstreams);
struct margo_init_info args = {nullptr};
args.json_config = margo_config.c_str();
args.hg_init_info = &hg_options;
auto* mid = margo_init_ext(gkfs::rpc::protocol::na_sm, MARGO_SERVER_MODE,
&args);
if(mid == MARGO_INSTANCE_NULL) {
throw runtime_error("Failed to initialize the Margo IPC server");
}
// Figure out what address this server is listening on (must be freed when
// finished)
auto hret = margo_addr_self(mid, &addr_self);
if(hret != HG_SUCCESS) {
margo_finalize(mid);
throw runtime_error("Failed to retrieve server IPC address");
}
// Convert the address to a cstring (with \0 terminator).
hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz,
addr_self);
if(hret != HG_SUCCESS) {
margo_addr_free(mid, addr_self);
margo_finalize(mid);
throw runtime_error("Failed to convert server IPC address to string");
}
margo_addr_free(mid, addr_self);
std::string addr_self_str(addr_self_cstring);
PROXY_DATA->server_self_addr(addr_self_str);
PROXY_DATA->log()->info("{}() Accepting IPCs on address {}", __func__,
addr_self_cstring);
// Put context and class into RPC_data object
PROXY_DATA->server_ipc_mid(mid);
// register RPCs
register_server_ipcs(mid);
}
void
register_client_rpcs(margo_instance_id mid) {
PROXY_DATA->rpc_client_ids().rpc_write_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_daemon_write,
rpc_proxy_daemon_write_in_t, rpc_data_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_read_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_daemon_read,
rpc_proxy_daemon_read_in_t, rpc_data_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_truncate_id = MARGO_REGISTER(
mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_chunk_stat_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat,
rpc_chunk_stat_in_t, rpc_chunk_stat_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_create_id = MARGO_REGISTER(
mid, gkfs::rpc::tag::create, rpc_mk_node_in_t, rpc_err_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_stat_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::stat, rpc_path_only_in_t,
rpc_stat_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_remove_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata,
rpc_rm_node_in_t, rpc_rm_metadata_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_decr_size_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t,
rpc_err_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_remove_data_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t,
rpc_err_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_get_metadentry_size_id = MARGO_REGISTER(
mid, gkfs::rpc::tag::get_metadentry_size, rpc_path_only_in_t,
rpc_get_metadentry_size_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_update_metadentry_size_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size,
rpc_update_metadentry_size_in_t,
rpc_update_metadentry_size_out_t, NULL);
PROXY_DATA->rpc_client_ids().rpc_get_dirents_extended_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended,
rpc_get_dirents_in_t, rpc_get_dirents_out_t, NULL);
}
void
init_rpc_client(const string& protocol) {
struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER;
hg_options.auto_sm = PROXY_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE;
hg_options.stats = HG_FALSE;
if(gkfs::rpc::protocol::ofi_psm2 == protocol.c_str())
hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
// Start Margo (this will also initialize Argobots and Mercury internally)
auto margo_config = fmt::format(
R"({{ "use_progress_thread" : true, "rpc_thread_count" : {} }})",
0);
struct margo_init_info args = {nullptr};
args.json_config = margo_config.c_str();
args.hg_init_info = &hg_options;
auto* mid = margo_init_ext(protocol.c_str(), MARGO_CLIENT_MODE, &args);
if(mid == MARGO_INSTANCE_NULL) {
throw runtime_error("Failed to initialize the Margo RPC client");
}
PROXY_DATA->log()->info(
"{}() Margo RPC client initialized with protocol '{}'", __func__,
protocol);
PROXY_DATA->log()->info("{}() auto sm is set to '{}' for RPC client.",
__func__, PROXY_DATA->use_auto_sm());
PROXY_DATA->client_rpc_mid(mid);
register_client_rpcs(mid);
}
void
init_environment(const string& hostfile_path, const string& rpc_protocol) {
// Check if host file exists before doing anything
if(!gkfs::util::check_for_hosts_file(hostfile_path))
throw runtime_error(fmt::format(
"Host file '{}' does not exist. Exiting ...", hostfile_path));
// Check if another proxy is already running
PROXY_DATA->log()->info("{}() Checking for another proxy process...",
__func__);
if(gkfs::util::is_proxy_already_running()) {
throw runtime_error("Another proxy is already running. Exiting ...");
}
PROXY_DATA->log()->info("{}() No other proxy is running. Proceeding ...",
__func__);
vector<pair<string, string>> hosts{};
try {
PROXY_DATA->log()->info("{}() Loading daemon hostsfile ...", __func__);
hosts = gkfs::util::read_hosts_file(hostfile_path);
} catch(const std::exception& e) {
auto err_msg =
fmt::format("Failed to load hosts addresses: {}", e.what());
throw runtime_error(err_msg);
}
// Init IPC server
PROXY_DATA->log()->info("{}() Initializing IPC server...", __func__);
try {
init_ipc_server();
} catch(const std::exception& e) {
auto err_msg =
fmt::format("Failed to initialize IPC server: {}", e.what());
throw runtime_error(err_msg);
}
// Init RPC client
PROXY_DATA->log()->info("{}() Initializing RPC client...", __func__);
try {
init_rpc_client(rpc_protocol);
} catch(const std::exception& e) {
auto err_msg =
fmt::format("Failed to initialize RPC client: {}", e.what());
throw runtime_error(err_msg);
}
// Create PID file
PROXY_DATA->log()->info("{}() Creating PID file ...", __func__);
try {
gkfs::util::create_proxy_pid_file();
} catch(const std::exception& e) {
auto err_msg = fmt::format(
"Unexpected error: '{}' when creating PID file.", e.what());
throw runtime_error(err_msg);
}
// Load hosts from hostfile
try {
PROXY_DATA->log()->info(
"{}() Loading daemon addresses and looking up ...", __func__);
gkfs::util::connect_to_hosts(hosts);
} catch(const std::exception& e) {
auto err_msg =
fmt::format("Failed to load hosts addresses: '{}'", e.what());
throw runtime_error(err_msg);
}
// Setup SimpleDistributor
PROXY_DATA->log()->info(
"{}() Setting up simple hash distributor with local_host_id '{}' #hosts '{}'...",
__func__, PROXY_DATA->local_host_id(),
PROXY_DATA->rpc_endpoints().size());
// TODO this needs to be globally configured because client must have same
// distribution
auto simple_hash_dist = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
PROXY_DATA->local_host_id(), PROXY_DATA->rpc_endpoints().size());
PROXY_DATA->distributor(simple_hash_dist);
PROXY_DATA->log()->info("Startup successful. Proxy is ready.");
}
void
destroy_enviroment() {
PROXY_DATA->log()->info("{}() Closing connections ...", __func__);
for(auto& endp : PROXY_DATA->rpc_endpoints()) {
if(margo_addr_free(PROXY_DATA->client_rpc_mid(), endp.second) !=
HG_SUCCESS) {
PROXY_DATA->log()->warn(
"{}() Unable to free RPC client's address: '{}'.", __func__,
endp.first);
}
}
if(PROXY_DATA->server_ipc_mid() != nullptr) {
PROXY_DATA->log()->info("{}() Finalizing margo IPC server ...",
__func__);
margo_finalize(PROXY_DATA->server_ipc_mid());
}
if(PROXY_DATA->client_rpc_mid() != nullptr) {
PROXY_DATA->log()->info("{}() Finalizing margo RPC client ...",
__func__);
margo_finalize(PROXY_DATA->client_rpc_mid());
}
gkfs::util::remove_proxy_pid_file();
}
void
shutdown_handler(int dummy) {
PROXY_DATA->log()->info("{}() Received signal: '{}'", __func__,
strsignal(dummy));
shutdown_please.notify_all();
}
void
initialize_loggers() {
std::string path = gkfs::config::log::proxy_log_path;
// Try to get log path from env variable
std::string env_path_key = PROXY_ENV_PREFIX;
env_path_key += "LOG_PATH";
char* env_path = getenv(env_path_key.c_str());
if(env_path != nullptr) {
path = env_path;
}
spdlog::level::level_enum level =
gkfs::log::get_level(gkfs::config::log::proxy_log_level);
// Try to get log path from env variable
std::string env_level_key = PROXY_ENV_PREFIX;
env_level_key += "LOG_LEVEL";
char* env_level = getenv(env_level_key.c_str());
if(env_level != nullptr) {
level = gkfs::log::get_level(env_level);
}
auto logger_names = std::vector<std::string>{
"main",
};
gkfs::log::setup(logger_names, level, path);
}
int
main(int argc, const char* argv[]) {
CLI::App desc{"Allowed options"};
cli_options opts{};
// clang-format off
desc.add_option("--hosts-file,-H", opts.hosts_file,
"Path to the shared host file generated by daemons, including all daemon addresses to connect to. (default path './gkfs_hosts.txt')");
desc.add_option("--proxy-protocol,-p", opts.proxy_protocol,
"Used protocol between proxy and daemon communication. Choose between: ofi+sockets, ofi+psm2, ofi+verbs. Default: ofi+sockets");
desc.add_option("--pid-path,-P", opts.pid_path,
"Path to PID file where daemon registers itself for clients. Default: /tmp/gkfs_proxy.pid");
// clang-format on
try {
desc.parse(argc, argv);
} catch(const CLI::ParseError& e) {
return desc.exit(e);
}
initialize_loggers();
PROXY_DATA->log(spdlog::get("main"));
string proxy_protocol = gkfs::rpc::protocol::ofi_sockets;
if(desc.count("--proxy-protocol")) {
proxy_protocol = opts.proxy_protocol;
}
string hosts_file = gkfs::config::hostfile_path;
if(desc.count("--hosts-file")) {
hosts_file = opts.hosts_file;
}
if(desc.count("--pid-path")) {
PROXY_DATA->pid_file_path(opts.pid_path);
}
PROXY_DATA->log()->info("{}() Initializing environment", __func__);
try {
init_environment(hosts_file, proxy_protocol);
} catch(const std::exception& e) {
auto emsg =
fmt::format("Failed to initialize environment: {}", e.what());
PROXY_DATA->log()->error(emsg);
cerr << emsg << endl;
destroy_enviroment();
exit(EXIT_FAILURE);
}
signal(SIGINT, shutdown_handler);
signal(SIGTERM, shutdown_handler);
signal(SIGKILL, shutdown_handler);
unique_lock<mutex> lk(mtx);
// Wait for shutdown signal to initiate shutdown protocols
shutdown_please.wait(lk);
PROXY_DATA->log()->info("{}() Shutting down...", __func__);
destroy_enviroment();
PROXY_DATA->log()->info("{}() Complete. Exiting...", __func__);
return 0;
}