Newer
Older
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#include <daemon/daemon.hpp>
#include <version.hpp>
#include <global/env_util.hpp>
Marc Vef
committed
#include <global/rpc/rpc_types.hpp>
#include <global/rpc/rpc_util.hpp>
#include <daemon/env.hpp>
Marc Vef
committed
#include <daemon/handler/rpc_defs.hpp>
#include <daemon/backend/metadata/db.hpp>
#include <daemon/backend/data/chunk_storage.hpp>
#include <daemon/util.hpp>
#include <boost/filesystem.hpp>
#include <boost/program_options.hpp>
#include <boost/tokenizer.hpp>
#include <iostream>
#include <fstream>
Marc Vef
committed
#include <csignal>
#include <condition_variable>
extern "C" {
#include <unistd.h>
#include <stdlib.h>
}
Marc Vef
committed
using namespace std;
namespace po = boost::program_options;
Marc Vef
committed
static condition_variable shutdown_please;
static mutex mtx;
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
void init_io_tasklet_pool() {
assert(gkfs::config::rpc::daemon_io_xstreams >= 0);
unsigned int xstreams_num = gkfs::config::rpc::daemon_io_xstreams;
//retrieve the pool of the just created scheduler
ABT_pool pool;
auto ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &pool);
if (ret != ABT_SUCCESS) {
throw runtime_error("Failed to create I/O tasks pool");
}
//create all subsequent xstream and the associated scheduler, all tapping into the same pool
vector<ABT_xstream> xstreams(xstreams_num);
for (unsigned int i = 0; i < xstreams_num; ++i) {
ret = ABT_xstream_create_basic(ABT_SCHED_BASIC_WAIT, 1, &pool,
ABT_SCHED_CONFIG_NULL, &xstreams[i]);
if (ret != ABT_SUCCESS) {
throw runtime_error("Failed to create task execution streams for I/O operations");
}
}
RPC_DATA->io_streams(xstreams);
RPC_DATA->io_pool(pool);
}
/**
* Registers RPC handlers to Margo instance
* @param hg_class
*/
void register_server_rpcs(margo_instance_id mid) {
MARGO_REGISTER(mid, gkfs::rpc::tag::fs_config, void, rpc_config_out_t, rpc_srv_get_fs_config);
MARGO_REGISTER(mid, gkfs::rpc::tag::create, rpc_mk_node_in_t, rpc_err_out_t, rpc_srv_create);
MARGO_REGISTER(mid, gkfs::rpc::tag::stat, rpc_path_only_in_t, rpc_stat_out_t, rpc_srv_stat);
MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t, rpc_err_out_t, rpc_srv_decr_size);
MARGO_REGISTER(mid, gkfs::rpc::tag::remove, rpc_rm_node_in_t, rpc_err_out_t, rpc_srv_remove);
MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry, rpc_update_metadentry_in_t, rpc_err_out_t,
rpc_srv_update_metadentry);
MARGO_REGISTER(mid, gkfs::rpc::tag::get_metadentry_size, rpc_path_only_in_t, rpc_get_metadentry_size_out_t,
rpc_srv_get_metadentry_size);
MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size, rpc_update_metadentry_size_in_t,
rpc_update_metadentry_size_out_t, rpc_srv_update_metadentry_size);
MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents, rpc_get_dirents_in_t, rpc_get_dirents_out_t,
rpc_srv_get_dirents);
#ifdef HAS_SYMLINKS
MARGO_REGISTER(mid, gkfs::rpc::tag::mk_symlink, rpc_mk_symlink_in_t, rpc_err_out_t, rpc_srv_mk_symlink);
#endif
MARGO_REGISTER(mid, gkfs::rpc::tag::write, rpc_write_data_in_t, rpc_data_out_t, rpc_srv_write);
MARGO_REGISTER(mid, gkfs::rpc::tag::read, rpc_read_data_in_t, rpc_data_out_t, rpc_srv_read);
MARGO_REGISTER(mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t, rpc_srv_truncate);
MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t, rpc_chunk_stat_out_t,
rpc_srv_get_chunk_stat);
}
void init_rpc_server(const string& protocol_port) {
hg_addr_t addr_self;
hg_size_t addr_self_cstring_sz = 128;
char addr_self_cstring[128];
// IMPORTANT: this struct needs to be zeroed before use
struct hg_init_info hg_options = {};
#if USE_SHM
hg_options.auto_sm = HG_TRUE;
#else
hg_options.auto_sm = HG_FALSE;
#endif
hg_options.stats = HG_FALSE;
hg_options.na_class = nullptr;
// Start Margo (this will also initialize Argobots and Mercury internally)
auto mid = margo_init_opt(protocol_port.c_str(),
MARGO_SERVER_MODE,
&hg_options,
HG_TRUE,
gkfs::config::rpc::daemon_handler_xstreams);
if (mid == MARGO_INSTANCE_NULL) {
throw runtime_error("Failed to initialize the Margo RPC server");
}
// Figure out what address this server is listening on (must be freed when finished)
auto hret = margo_addr_self(mid, &addr_self);
if (hret != HG_SUCCESS) {
margo_finalize(mid);
throw runtime_error("Failed to retrieve server RPC address");
}
// Convert the address to a cstring (with \0 terminator).
hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz, addr_self);
if (hret != HG_SUCCESS) {
margo_addr_free(mid, addr_self);
margo_finalize(mid);
throw runtime_error("Failed to convert server RPC address to string");
}
margo_addr_free(mid, addr_self);
std::string addr_self_str(addr_self_cstring);
RPC_DATA->self_addr_str(addr_self_str);
GKFS_DATA->spdlogger()->info("{}() Accepting RPCs on address {}", __func__, addr_self_cstring);
// Put context and class into RPC_data object
RPC_DATA->server_rpc_mid(mid);
// register RPCs
register_server_rpcs(mid);
}
void init_environment() {
std::string metadata_path = GKFS_DATA->metadir() + "/rocksdb"s;
GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path);
GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(metadata_path));
} catch (const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to initialize metadata DB: {}", __func__, e.what());
}
std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s;
GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path);
bfs::create_directories(chunk_storage_path);
try {
GKFS_DATA->storage(
std::make_shared<gkfs::data::ChunkStorage>(chunk_storage_path, gkfs::config::rpc::chunksize));
} catch (const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to initialize storage backend: {}", __func__, e.what());
GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'",
__func__, GKFS_DATA->bind_addr());
init_rpc_server(GKFS_DATA->bind_addr());
} catch (const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what());
}
// Init Argobots ESs to drive IO
GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__);
init_io_tasklet_pool();
} catch (const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to initialize Argobots pool for I/O: {}", __func__, e.what());
// TODO set metadata configurations. these have to go into a user configurable file that is parsed here
GKFS_DATA->atime_state(gkfs::config::metadata::use_atime);
GKFS_DATA->mtime_state(gkfs::config::metadata::use_mtime);
GKFS_DATA->ctime_state(gkfs::config::metadata::use_ctime);
GKFS_DATA->link_cnt_state(gkfs::config::metadata::use_link_cnt);
GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks);
gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO};
gkfs::metadata::create("/", root_md);
} catch (const std::exception& e) {
throw runtime_error("Failed to write root metadentry to KV store: "s + e.what());
}
// setup hostfile to let clients know that a daemon is running on this host
if (!GKFS_DATA->hosts_file().empty()) {
gkfs::util::populate_hosts_file();
GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready.");
}
/**
* Destroys the margo, argobots, and mercury environments
*/
void destroy_enviroment() {
GKFS_DATA->spdlogger()->debug("{}() Removing mount directory", __func__);
boost::system::error_code ecode;
bfs::remove_all(GKFS_DATA->mountdir(), ecode);
GKFS_DATA->spdlogger()->debug("{}() Freeing I/O executions streams", __func__);
for (unsigned int i = 0; i < RPC_DATA->io_streams().size(); i++) {
ABT_xstream_join(RPC_DATA->io_streams().at(i));
ABT_xstream_free(&RPC_DATA->io_streams().at(i));
}
if (!GKFS_DATA->hosts_file().empty()) {
GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__);
gkfs::util::destroy_hosts_file();
} catch (const bfs::filesystem_error& e) {
GKFS_DATA->spdlogger()->debug("{}() hosts file not found", __func__);
if (RPC_DATA->server_rpc_mid() != nullptr) {
GKFS_DATA->spdlogger()->debug("{}() Finalizing margo RPC server", __func__);
margo_finalize(RPC_DATA->server_rpc_mid());
}
GKFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__);
GKFS_DATA->close_mdb();
Marc Vef
committed
void shutdown_handler(int dummy) {
GKFS_DATA->spdlogger()->info("{}() Received signal: '{}'", __func__, strsignal(dummy));
Marc Vef
committed
shutdown_please.notify_all();
}
std::string path = gkfs::config::log::daemon_log_path;
// Try to get log path from env variable
std::string env_path_key = DAEMON_ENV_PREFIX;
env_path_key += "DAEMON_LOG_PATH";
char* env_path = getenv(env_path_key.c_str());
if (env_path != nullptr) {
path = env_path;
}
spdlog::level::level_enum level = gkfs::log::get_level(gkfs::config::log::daemon_log_level);
// Try to get log path from env variable
std::string env_level_key = DAEMON_ENV_PREFIX;
env_level_key += "LOG_LEVEL";
char* env_level = getenv(env_level_key.c_str());
if (env_level != nullptr) {
level = gkfs::log::get_level(env_level);
"main",
"MetadataDB",
"ChunkStorage",
gkfs::log::setup(logger_names, level, path);
Marc Vef
committed
Marc Vef
committed
Marc Vef
committed
po::options_description desc("Allowed options");
desc.add_options()
("help,h", "Help message")
("mountdir,m", po::value<string>()->required(), "User Fuse mountdir")
("rootdir,r", po::value<string>()->required(), "data directory")
("metadir,i", po::value<string>(), "metadata directory, if not set rootdir is used for metadata ")
("listen,l", po::value<string>(), "Address or interface to bind the daemon on. Default: local hostname.\n"
"When used with ofi+verbs the FI_VERBS_IFACE environment variable is set accordingly "
"which associates the verbs device with the network interface. In case FI_VERBS_IFACE "
"is already defined, the argument is ignored. Default 'ib'.")
("hosts-file,H", po::value<string>(),
"Shared file used by deamons to register their "
"enpoints. (default './gkfs_hosts.txt')")
("version", "print version and exit");
Marc Vef
committed
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
if (vm.count("help")) {
cout << desc << "\n";
return 1;
}
if (vm.count("version")) {
cout << GKFS_VERSION_STRING << endl;
#ifndef NDEBUG
cout << "Debug: ON" << endl;
#else
cout << "Debug: OFF" << endl;
#endif
cout << "RPC protocol: " << RPC_PROTOCOL << endl;
#if USE_SHM
cout << "Shared-memory comm: ON" << endl;
#else
cout << "Shared-memory comm: OFF" << endl;
#endif
#if CREATE_CHECK_PARENTS
cout << "Create check parents: ON" << endl;
#else
cout << "Create check parents: OFF" << endl;
cout << "Chunk size: " << gkfs::config::rpc::chunksize << " bytes" << endl;
Marc Vef
committed
try {
po::notify(vm);
} catch (po::required_option& e) {
std::cerr << "Error: " << e.what() << "\n";
return 1;
}
GKFS_DATA->spdlogger(spdlog::get("main"));
if (vm.count("listen")) {
addr = vm["listen"].as<string>();
// ofi+verbs requires an empty addr to bind to the ib interface
if (RPC_PROTOCOL == string(gkfs::rpc::protocol::ofi_verbs)) {
/*
* FI_VERBS_IFACE : The prefix or the full name of the network interface associated with the verbs device (default: ib)
* Mercury does not allow to bind to an address when ofi+verbs is used
*/
if (!secure_getenv("FI_VERBS_IFACE"))
setenv("FI_VERBS_IFACE", addr.c_str(), 1);
addr = ""s;
}
if (RPC_PROTOCOL != string(gkfs::rpc::protocol::ofi_verbs))
addr = get_my_hostname(true);
GKFS_DATA->bind_addr(fmt::format("{}://{}", RPC_PROTOCOL, addr));
string hosts_file;
if (vm.count("hosts-file")) {
hosts_file = vm["hosts-file"].as<string>();
} else {
gkfs::env::get_var(gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path);
GKFS_DATA->hosts_file(hosts_file);
GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__);
Marc Vef
committed
assert(vm.count("mountdir"));
auto mountdir = vm["mountdir"].as<string>();
// Create mountdir. We use this dir to get some information on the underlying fs with statfs in gkfs_statfs
GKFS_DATA->mountdir(bfs::canonical(mountdir).native());
auto rootdir_path = bfs::path(rootdir) / fmt::format_int(getpid()).str();
GKFS_DATA->spdlogger()->debug("{}() Root directory: '{}'",
__func__, rootdir_path.native());
bfs::create_directories(rootdir_path);
GKFS_DATA->rootdir(rootdir_path.native());
if (vm.count("metadir")) {
auto metadir = vm["metadir"].as<string>();
bfs::create_directories(metadir);
GKFS_DATA->metadir(bfs::canonical(metadir).native());
GKFS_DATA->metadir(GKFS_DATA->rootdir());
Marc Vef
committed
try {
init_environment();
} catch (const std::exception& e) {
auto emsg = fmt::format("Failed to initialize environment: {}", e.what());
GKFS_DATA->spdlogger()->error(emsg);
cerr << emsg << endl;
destroy_enviroment();
exit(EXIT_FAILURE);
Marc Vef
committed
}
signal(SIGINT, shutdown_handler);
signal(SIGTERM, shutdown_handler);
signal(SIGKILL, shutdown_handler);
Marc Vef
committed
unique_lock<mutex> lk(mtx);
// Wait for shutdown signal to initiate shutdown protocols
shutdown_please.wait(lk);
GKFS_DATA->spdlogger()->info("{}() Shutting down...", __func__);
destroy_enviroment();
GKFS_DATA->spdlogger()->info("{}() Complete. Exiting...", __func__);
Marc Vef
committed
return 0;