Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS.
12 :
13 : GekkoFS is free software: you can redistribute it and/or modify
14 : it under the terms of the GNU General Public License as published by
15 : the Free Software Foundation, either version 3 of the License, or
16 : (at your option) any later version.
17 :
18 : GekkoFS is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU General Public License for more details.
22 :
23 : You should have received a copy of the GNU General Public License
24 : along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
25 :
26 : SPDX-License-Identifier: GPL-3.0-or-later
27 : */
28 : /**
29 : * @brief The main source file to launch the daemon.
30 : * @internal
31 : * This file includes the daemon's main() function and starts all daemon
32 : * subroutines. It deals with user input and waits on a signal to shut it down.
33 : * @endinternal
34 : */
35 :
36 : #include <daemon/daemon.hpp>
37 : #include <version.hpp>
38 : #include <common/log_util.hpp>
39 : #include <common/env_util.hpp>
40 : #include <common/rpc/rpc_types.hpp>
41 : #include <common/rpc/rpc_util.hpp>
42 : #include <common/statistics/stats.hpp>
43 :
44 : #include <daemon/env.hpp>
45 : #include <daemon/handler/rpc_defs.hpp>
46 : #include <daemon/ops/metadentry.hpp>
47 : #include <daemon/backend/metadata/db.hpp>
48 : #include <daemon/backend/data/chunk_storage.hpp>
49 : #include <daemon/util.hpp>
50 : #include <CLI/CLI.hpp>
51 :
52 : #ifdef GKFS_ENABLE_AGIOS
53 : #include <daemon/scheduler/agios.hpp>
54 : #endif
55 :
56 :
57 : #include <filesystem>
58 : #include <iostream>
59 : #include <fstream>
60 : #include <csignal>
61 : #include <condition_variable>
62 :
63 : extern "C" {
64 : #include <unistd.h>
65 : #include <cstdlib>
66 : }
67 :
68 : using namespace std;
69 : namespace fs = std::filesystem;
70 :
71 : static condition_variable shutdown_please; // handler for shutdown signaling
72 : static mutex mtx; // mutex to wait on shutdown conditional variable
73 : static bool keep_rootdir = true;
74 :
75 : struct cli_options {
76 : string mountdir;
77 : string rootdir;
78 : string rootdir_suffix;
79 : string metadir;
80 : string listen;
81 : string hosts_file;
82 : string rpc_protocol;
83 : string dbbackend;
84 : string parallax_size;
85 : string stats_file;
86 : string prometheus_gateway;
87 : };
88 :
89 : /**
90 : * @brief Initializes the Argobots execution streams for non-blocking I/O
91 : * @internal
92 : * The corresponding execution streams are defined in
93 : * gkfs::config::rpc::daemon_io_xstreams. A FIFO thread pool accomodates these
94 : * execution streams. Argobots tasklets are created from these pools during I/O
95 : * operations.
96 : * @endinternal
97 : */
98 : void
99 33 : init_io_tasklet_pool() {
100 33 : static_assert(gkfs::config::rpc::daemon_io_xstreams >= 0,
101 : "Daemon IO Execution streams must be higher than 0!");
102 33 : unsigned int xstreams_num = gkfs::config::rpc::daemon_io_xstreams;
103 :
104 : // retrieve the pool of the just created scheduler
105 33 : ABT_pool pool;
106 33 : auto ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
107 : ABT_TRUE, &pool);
108 33 : if(ret != ABT_SUCCESS) {
109 0 : throw runtime_error("Failed to create I/O tasks pool");
110 : }
111 :
112 : // create all subsequent xstream and the associated scheduler, all tapping
113 : // into the same pool
114 33 : vector<ABT_xstream> xstreams(xstreams_num);
115 297 : for(unsigned int i = 0; i < xstreams_num; ++i) {
116 528 : ret = ABT_xstream_create_basic(ABT_SCHED_BASIC_WAIT, 1, &pool,
117 264 : ABT_SCHED_CONFIG_NULL, &xstreams[i]);
118 264 : if(ret != ABT_SUCCESS) {
119 0 : throw runtime_error(
120 0 : "Failed to create task execution streams for I/O operations");
121 : }
122 : }
123 :
124 33 : RPC_DATA->io_streams(xstreams);
125 33 : RPC_DATA->io_pool(pool);
126 33 : }
127 :
128 : /**
129 : * @brief Registers RPC handlers to a given Margo instance.
130 : * @internal
131 : * Registering is done by associating a Margo instance id (mid) with the RPC
132 : * name and its handler function including defined input/out structs
133 : * @endinternal
134 : * @param margo_instance_id
135 : */
136 : void
137 33 : register_server_rpcs(margo_instance_id mid) {
138 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::fs_config, void, rpc_config_out_t,
139 33 : rpc_srv_get_fs_config);
140 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::create, rpc_mk_node_in_t, rpc_err_out_t,
141 33 : rpc_srv_create);
142 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::stat, rpc_path_only_in_t,
143 33 : rpc_stat_out_t, rpc_srv_stat);
144 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t,
145 33 : rpc_err_out_t, rpc_srv_decr_size);
146 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata, rpc_rm_node_in_t,
147 33 : rpc_rm_metadata_out_t, rpc_srv_remove_metadata);
148 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t,
149 33 : rpc_err_out_t, rpc_srv_remove_data);
150 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry,
151 : rpc_update_metadentry_in_t, rpc_err_out_t,
152 33 : rpc_srv_update_metadentry);
153 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::get_metadentry_size, rpc_path_only_in_t,
154 33 : rpc_get_metadentry_size_out_t, rpc_srv_get_metadentry_size);
155 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size,
156 : rpc_update_metadentry_size_in_t,
157 : rpc_update_metadentry_size_out_t,
158 33 : rpc_srv_update_metadentry_size);
159 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents, rpc_get_dirents_in_t,
160 33 : rpc_get_dirents_out_t, rpc_srv_get_dirents);
161 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended,
162 : rpc_get_dirents_in_t, rpc_get_dirents_out_t,
163 33 : rpc_srv_get_dirents_extended);
164 : #ifdef HAS_SYMLINKS
165 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::mk_symlink, rpc_mk_symlink_in_t,
166 33 : rpc_err_out_t, rpc_srv_mk_symlink);
167 : #endif
168 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::write, rpc_write_data_in_t,
169 33 : rpc_data_out_t, rpc_srv_write);
170 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::read, rpc_read_data_in_t,
171 33 : rpc_data_out_t, rpc_srv_read);
172 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t,
173 33 : rpc_srv_truncate);
174 33 : MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t,
175 33 : rpc_chunk_stat_out_t, rpc_srv_get_chunk_stat);
176 33 : }
177 :
178 : /**
179 : * @brief Initializes the daemon RPC server.
180 : * @throws std::runtime_error on failure
181 : */
182 : void
183 33 : init_rpc_server() {
184 33 : hg_addr_t addr_self = nullptr;
185 33 : hg_size_t addr_self_cstring_sz = 128;
186 33 : char addr_self_cstring[128];
187 33 : struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER;
188 33 : hg_options.auto_sm = GKFS_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE;
189 33 : hg_options.stats = HG_FALSE;
190 33 : if(gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->rpc_protocol())
191 0 : hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
192 : // Start Margo (this will also initialize Argobots and Mercury internally)
193 33 : auto margo_config = fmt::format(
194 : R"({{ "use_progress_thread" : true, "rpc_thread_count" : {} }})",
195 33 : gkfs::config::rpc::daemon_handler_xstreams);
196 33 : struct margo_init_info args = {nullptr};
197 33 : args.json_config = margo_config.c_str();
198 33 : args.hg_init_info = &hg_options;
199 33 : auto* mid = margo_init_ext(GKFS_DATA->bind_addr().c_str(),
200 : MARGO_SERVER_MODE, &args);
201 :
202 33 : if(mid == MARGO_INSTANCE_NULL) {
203 0 : throw runtime_error("Failed to initialize the Margo RPC server");
204 : }
205 : // Figure out what address this server is listening on (must be freed when
206 : // finished)
207 33 : auto hret = margo_addr_self(mid, &addr_self);
208 33 : if(hret != HG_SUCCESS) {
209 0 : margo_finalize(mid);
210 0 : throw runtime_error("Failed to retrieve server RPC address");
211 : }
212 : // Convert the address to a cstring (with \0 terminator).
213 33 : hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz,
214 : addr_self);
215 33 : if(hret != HG_SUCCESS) {
216 0 : margo_addr_free(mid, addr_self);
217 0 : margo_finalize(mid);
218 0 : throw runtime_error("Failed to convert server RPC address to string");
219 : }
220 33 : margo_addr_free(mid, addr_self);
221 :
222 66 : std::string addr_self_str(addr_self_cstring);
223 33 : RPC_DATA->self_addr_str(addr_self_str);
224 :
225 33 : GKFS_DATA->spdlogger()->info("{}() Accepting RPCs on address {}", __func__,
226 33 : addr_self_cstring);
227 :
228 : // Put context and class into RPC_data object
229 33 : RPC_DATA->server_rpc_mid(mid);
230 :
231 : // register RPCs
232 33 : register_server_rpcs(mid);
233 33 : }
234 :
235 : /**
236 : * @brief Initializes the daemon environment and setting up its subroutines.
237 : * @internal
238 : * This includes connecting to the KV store, starting the Argobots I/O execution
239 : * streams, initializing the metadata and data backends, and starting the RPC
240 : * server.
241 : *
242 : * Finally, the root metadata entry is created.
243 : * @endinternal
244 : * @throws std::runtime_error if any step fails
245 : */
246 : void
247 33 : init_environment() {
248 : // Initialize metadata db
249 33 : auto metadata_path = fmt::format("{}/{}", GKFS_DATA->metadir(),
250 33 : gkfs::config::metadata::dir);
251 33 : fs::create_directories(metadata_path);
252 33 : GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'",
253 33 : __func__, metadata_path);
254 33 : try {
255 33 : GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(
256 66 : metadata_path, GKFS_DATA->dbbackend()));
257 0 : } catch(const std::exception& e) {
258 0 : GKFS_DATA->spdlogger()->error(
259 : "{}() Failed to initialize metadata DB: {}", __func__,
260 0 : e.what());
261 0 : throw;
262 : }
263 :
264 33 : GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ", __func__);
265 33 : try {
266 : #ifdef GKFS_USE_GUIDED_DISTRIBUTION
267 33 : auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>();
268 : #else
269 : auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>();
270 : #endif
271 99 : RPC_DATA->distributor(distributor);
272 0 : } catch(const std::exception& e) {
273 0 : GKFS_DATA->spdlogger()->error(
274 : "{}() Failed to initialize Distributor: {}", __func__,
275 0 : e.what());
276 0 : throw;
277 : }
278 :
279 : #ifdef GKFS_ENABLE_FORWARDING
280 0 : GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__);
281 : #endif
282 :
283 : #ifdef GKFS_ENABLE_AGIOS
284 : // Initialize AGIOS scheduler
285 : GKFS_DATA->spdlogger()->debug("{}() Initializing AGIOS scheduler: '{}'",
286 : __func__, "/tmp/agios.conf");
287 : try {
288 : agios_initialize();
289 : } catch(const std::exception& e) {
290 : GKFS_DATA->spdlogger()->error(
291 : "{}() Failed to initialize AGIOS scheduler: {}", __func__,
292 : e.what());
293 : throw;
294 : }
295 : #endif
296 :
297 : // Initialize Stats
298 33 : if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
299 33 : GKFS_DATA->stats(std::make_shared<gkfs::utils::Stats>(
300 66 : GKFS_DATA->enable_chunkstats(), GKFS_DATA->enable_prometheus(),
301 66 : GKFS_DATA->stats_file(), GKFS_DATA->prometheus_gateway()));
302 :
303 : // Initialize data backend
304 33 : auto chunk_storage_path = fmt::format("{}/{}", GKFS_DATA->rootdir(),
305 66 : gkfs::config::data::chunk_dir);
306 33 : GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'",
307 33 : __func__, chunk_storage_path);
308 33 : fs::create_directories(chunk_storage_path);
309 33 : try {
310 33 : GKFS_DATA->storage(std::make_shared<gkfs::data::ChunkStorage>(
311 33 : chunk_storage_path, gkfs::config::rpc::chunksize));
312 0 : } catch(const std::exception& e) {
313 0 : GKFS_DATA->spdlogger()->error(
314 : "{}() Failed to initialize storage backend: {}", __func__,
315 0 : e.what());
316 0 : throw;
317 : }
318 :
319 : // Init margo for RPC
320 33 : GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'",
321 33 : __func__, GKFS_DATA->bind_addr());
322 33 : try {
323 33 : init_rpc_server();
324 0 : } catch(const std::exception& e) {
325 0 : GKFS_DATA->spdlogger()->error(
326 0 : "{}() Failed to initialize RPC server: {}", __func__, e.what());
327 0 : throw;
328 : }
329 :
330 : // Init Argobots ESs to drive IO
331 33 : try {
332 33 : GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__);
333 33 : init_io_tasklet_pool();
334 0 : } catch(const std::exception& e) {
335 0 : GKFS_DATA->spdlogger()->error(
336 : "{}() Failed to initialize Argobots pool for I/O: {}", __func__,
337 0 : e.what());
338 0 : throw;
339 : }
340 :
341 : // TODO set metadata configurations. these have to go into a user
342 : // configurable file that is parsed here
343 33 : GKFS_DATA->atime_state(gkfs::config::metadata::use_atime);
344 33 : GKFS_DATA->mtime_state(gkfs::config::metadata::use_mtime);
345 33 : GKFS_DATA->ctime_state(gkfs::config::metadata::use_ctime);
346 33 : GKFS_DATA->link_cnt_state(gkfs::config::metadata::use_link_cnt);
347 33 : GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks);
348 : // Create metadentry for root directory
349 66 : gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO};
350 33 : try {
351 66 : gkfs::metadata::create("/", root_md);
352 0 : } catch(const gkfs::metadata::ExistsException& e) {
353 : // launched on existing directory which is no error
354 0 : } catch(const std::exception& e) {
355 0 : throw runtime_error("Failed to write root metadentry to KV store: "s +
356 0 : e.what());
357 : }
358 : // setup hostfile to let clients know that a daemon is running on this host
359 33 : if(!GKFS_DATA->hosts_file().empty()) {
360 33 : gkfs::utils::populate_hosts_file();
361 : }
362 66 : GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready.");
363 33 : }
364 :
365 : #ifdef GKFS_ENABLE_AGIOS
366 : /**
367 : * @brief Initialize the AGIOS scheduling library
368 : */
369 : void
370 : agios_initialize() {
371 : char configuration[] = "/tmp/agios.conf";
372 :
373 : if(!agios_init(NULL, NULL, configuration, 0)) {
374 : GKFS_DATA->spdlogger()->error(
375 : "{}() Failed to initialize AGIOS scheduler: '{}'", __func__,
376 : configuration);
377 :
378 : agios_exit();
379 :
380 : throw;
381 : }
382 : }
383 : #endif
384 :
385 : /**
386 : * @brief Destroys the daemon environment and gracefully shuts down all
387 : * subroutines.
388 : * @internal
389 : * Shutting down includes freeing Argobots execution streams, cleaning
390 : * hostsfile, and shutting down the Mercury RPC server.
391 : * @endinternal
392 : */
393 : void
394 33 : destroy_enviroment() {
395 33 : GKFS_DATA->spdlogger()->debug("{}() Removing mount directory", __func__);
396 33 : std::error_code ecode;
397 33 : fs::remove_all(GKFS_DATA->mountdir(), ecode);
398 33 : GKFS_DATA->spdlogger()->debug("{}() Freeing I/O executions streams",
399 33 : __func__);
400 297 : for(unsigned int i = 0; i < RPC_DATA->io_streams().size(); i++) {
401 264 : ABT_xstream_join(RPC_DATA->io_streams().at(i));
402 264 : ABT_xstream_free(&RPC_DATA->io_streams().at(i));
403 : }
404 :
405 33 : if(!GKFS_DATA->hosts_file().empty()) {
406 33 : GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__);
407 33 : try {
408 33 : gkfs::utils::destroy_hosts_file();
409 0 : } catch(const fs::filesystem_error& e) {
410 0 : GKFS_DATA->spdlogger()->debug("{}() hosts file not found",
411 0 : __func__);
412 : }
413 : }
414 :
415 33 : if(RPC_DATA->server_rpc_mid() != nullptr) {
416 33 : GKFS_DATA->spdlogger()->debug("{}() Finalizing margo RPC server",
417 33 : __func__);
418 33 : margo_finalize(RPC_DATA->server_rpc_mid());
419 : }
420 :
421 33 : GKFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__);
422 33 : GKFS_DATA->close_mdb();
423 :
424 :
425 : // Delete rootdir/metadir if requested
426 33 : if(!keep_rootdir) {
427 0 : GKFS_DATA->spdlogger()->info("{}() Removing rootdir and metadir ...",
428 0 : __func__);
429 0 : fs::remove_all(GKFS_DATA->metadir(), ecode);
430 0 : fs::remove_all(GKFS_DATA->rootdir(), ecode);
431 : }
432 33 : GKFS_DATA->close_stats();
433 33 : }
434 :
435 : /**
436 : * @brief Handler for daemon shutdown signal handling.
437 : * @internal
438 : * Notifies the waiting thread in main() to wake up.
439 : * @endinternal
440 : * @param dummy unused but required by signal() called in main()
441 : */
442 : void
443 33 : shutdown_handler(int dummy) {
444 33 : GKFS_DATA->spdlogger()->info("{}() Received signal: '{}'", __func__,
445 33 : strsignal(dummy));
446 33 : shutdown_please.notify_all();
447 33 : }
448 :
449 : /**
450 : * @brief Initializes the daemon logging environment.
451 : * @internal
452 : * Reads user input via environment variables regarding the
453 : * log path and log level.
454 : * @endinternal
455 : * Initializes three loggers: main, metadata module, and data module
456 : */
457 : void
458 33 : initialize_loggers() {
459 33 : std::string path = gkfs::config::log::daemon_log_path;
460 : // Try to get log path from env variable
461 66 : std::string env_path_key = DAEMON_ENV_PREFIX;
462 33 : env_path_key += "LOG_PATH";
463 33 : char* env_path = getenv(env_path_key.c_str());
464 33 : if(env_path != nullptr) {
465 33 : path = env_path;
466 : }
467 :
468 33 : spdlog::level::level_enum level =
469 33 : gkfs::log::get_level(gkfs::config::log::daemon_log_level);
470 : // Try to get log path from env variable
471 66 : std::string env_level_key = DAEMON_ENV_PREFIX;
472 33 : env_level_key += "LOG_LEVEL";
473 33 : char* env_level = getenv(env_level_key.c_str());
474 33 : if(env_level != nullptr) {
475 33 : level = gkfs::log::get_level(env_level);
476 : }
477 :
478 33 : auto logger_names = std::vector<std::string>{
479 : "main",
480 : "MetadataModule",
481 : "DataModule",
482 165 : };
483 :
484 33 : gkfs::log::setup(logger_names, level, path);
485 33 : }
486 :
487 : /**
488 : * @brief Parses command line arguments from user
489 : *
490 : * @param opts CLI values
491 : * @param desc CLI allowed options
492 : * @throws std::runtime_error
493 : */
494 : void
495 33 : parse_input(const cli_options& opts, const CLI::App& desc) {
496 33 : auto rpc_protocol = string(gkfs::rpc::protocol::ofi_sockets);
497 33 : if(desc.count("--rpc-protocol")) {
498 0 : rpc_protocol = opts.rpc_protocol;
499 0 : if(rpc_protocol != gkfs::rpc::protocol::ofi_verbs &&
500 0 : rpc_protocol != gkfs::rpc::protocol::ofi_sockets &&
501 0 : rpc_protocol != gkfs::rpc::protocol::ofi_psm2) {
502 0 : throw runtime_error(fmt::format(
503 : "Given RPC protocol '{}' not supported. Check --help for supported protocols.",
504 0 : rpc_protocol));
505 : }
506 : }
507 :
508 33 : auto use_auto_sm = desc.count("--auto-sm") != 0;
509 33 : GKFS_DATA->use_auto_sm(use_auto_sm);
510 33 : GKFS_DATA->spdlogger()->debug(
511 : "{}() Shared memory (auto_sm) for intra-node communication (IPCs) set to '{}'.",
512 33 : __func__, use_auto_sm);
513 :
514 66 : string addr{};
515 33 : if(desc.count("--listen")) {
516 33 : addr = opts.listen;
517 : // ofi+verbs requires an empty addr to bind to the ib interface
518 33 : if(rpc_protocol == gkfs::rpc::protocol::ofi_verbs) {
519 : /*
520 : * FI_VERBS_IFACE : The prefix or the full name of the network
521 : * interface associated with the verbs device (default: ib) Mercury
522 : * does not allow to bind to an address when ofi+verbs is used
523 : */
524 0 : if(!secure_getenv("FI_VERBS_IFACE"))
525 0 : setenv("FI_VERBS_IFACE", addr.c_str(), 1);
526 0 : addr = ""s;
527 : }
528 : } else {
529 0 : if(rpc_protocol != gkfs::rpc::protocol::ofi_verbs)
530 0 : addr = gkfs::rpc::get_my_hostname(true);
531 : }
532 :
533 33 : GKFS_DATA->rpc_protocol(rpc_protocol);
534 66 : GKFS_DATA->bind_addr(fmt::format("{}://{}", rpc_protocol, addr));
535 :
536 66 : string hosts_file;
537 33 : if(desc.count("--hosts-file")) {
538 0 : hosts_file = opts.hosts_file;
539 : } else {
540 66 : hosts_file = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
541 66 : gkfs::config::hostfile_path);
542 : }
543 33 : GKFS_DATA->hosts_file(hosts_file);
544 :
545 66 : assert(desc.count("--mountdir"));
546 66 : auto mountdir = opts.mountdir;
547 : // Create mountdir. We use this dir to get some information on the
548 : // underlying fs with statfs in gkfs_statfs
549 33 : fs::create_directories(mountdir);
550 66 : GKFS_DATA->mountdir(fs::canonical(mountdir).native());
551 :
552 66 : assert(desc.count("--rootdir"));
553 66 : auto rootdir = opts.rootdir;
554 :
555 : #ifdef GKFS_ENABLE_FORWARDING
556 : // In forwarding mode, the backend is shared
557 0 : auto rootdir_path = fs::path(rootdir);
558 : #else
559 66 : auto rootdir_path = fs::path(rootdir);
560 33 : if(desc.count("--rootdir-suffix")) {
561 0 : if(opts.rootdir_suffix == gkfs::config::data::chunk_dir ||
562 0 : opts.rootdir_suffix == gkfs::config::metadata::dir)
563 0 : throw runtime_error(fmt::format(
564 : "rootdir_suffix '{}' is reserved and not allowed.",
565 0 : opts.rootdir_suffix));
566 0 : if(opts.rootdir_suffix.find('#') != string::npos)
567 0 : throw runtime_error(fmt::format(
568 0 : "The character '#' in the rootdir_suffix is reserved and not allowed."));
569 : // append path with a directory separator
570 0 : rootdir_path /= opts.rootdir_suffix;
571 0 : GKFS_DATA->rootdir_suffix(opts.rootdir_suffix);
572 : }
573 : #endif
574 :
575 33 : if(desc.count("--clean-rootdir")) {
576 : // may throw exception (caught in main)
577 0 : GKFS_DATA->spdlogger()->debug("{}() Cleaning rootdir '{}' ...",
578 0 : __func__, rootdir_path.native());
579 0 : fs::remove_all(rootdir_path.native());
580 0 : GKFS_DATA->spdlogger()->info("{}() rootdir cleaned.", __func__);
581 : }
582 :
583 33 : if(desc.count("--clean-rootdir-finish")) {
584 0 : keep_rootdir = false;
585 : }
586 :
587 33 : GKFS_DATA->spdlogger()->debug("{}() Root directory: '{}'", __func__,
588 33 : rootdir_path.native());
589 33 : fs::create_directories(rootdir_path);
590 33 : GKFS_DATA->rootdir(rootdir_path.native());
591 :
592 33 : if(desc.count("--metadir")) {
593 66 : auto metadir = opts.metadir;
594 :
595 : #ifdef GKFS_ENABLE_FORWARDING
596 0 : auto metadir_path = fs::path(metadir) / fmt::format_int(getpid()).str();
597 : #else
598 66 : auto metadir_path = fs::path(metadir);
599 : #endif
600 33 : if(desc.count("--clean-rootdir")) {
601 : // may throw exception (caught in main)
602 0 : GKFS_DATA->spdlogger()->debug("{}() Cleaning metadir '{}' ...",
603 0 : __func__, metadir_path.native());
604 0 : fs::remove_all(metadir_path.native());
605 0 : GKFS_DATA->spdlogger()->info("{}() metadir cleaned.", __func__);
606 : }
607 33 : fs::create_directories(metadir_path);
608 33 : GKFS_DATA->metadir(fs::canonical(metadir_path).native());
609 :
610 66 : GKFS_DATA->spdlogger()->debug("{}() Meta directory: '{}'", __func__,
611 33 : metadir_path.native());
612 : } else {
613 : // use rootdir as metadata dir
614 0 : auto metadir = opts.rootdir;
615 :
616 : #ifdef GKFS_ENABLE_FORWARDING
617 0 : auto metadir_path = fs::path(metadir) / fmt::format_int(getpid()).str();
618 0 : fs::create_directories(metadir_path);
619 0 : GKFS_DATA->metadir(fs::canonical(metadir_path).native());
620 : #else
621 0 : GKFS_DATA->metadir(GKFS_DATA->rootdir());
622 : #endif
623 : }
624 :
625 33 : if(desc.count("--dbbackend")) {
626 33 : if(opts.dbbackend == gkfs::metadata::rocksdb_backend ||
627 0 : opts.dbbackend == gkfs::metadata::parallax_backend) {
628 : #ifndef GKFS_ENABLE_PARALLAX
629 33 : if(opts.dbbackend == gkfs::metadata::parallax_backend) {
630 0 : throw runtime_error(fmt::format(
631 : "dbbackend '{}' was not compiled and is disabled. "
632 : "Pass -DGKFS_ENABLE_PARALLAX:BOOL=ON to CMake to enable.",
633 0 : opts.dbbackend));
634 : }
635 : #endif
636 : #ifndef GKFS_ENABLE_ROCKSDB
637 : if(opts.dbbackend == gkfs::metadata::rocksdb_backend) {
638 : throw runtime_error(fmt::format(
639 : "dbbackend '{}' was not compiled and is disabled. "
640 : "Pass -DGKFS_ENABLE_ROCKSDB:BOOL=ON to CMake to enable.",
641 : opts.dbbackend));
642 : }
643 : #endif
644 33 : GKFS_DATA->dbbackend(opts.dbbackend);
645 : } else {
646 0 : throw runtime_error(
647 0 : fmt::format("dbbackend '{}' is not valid. Consult `--help`",
648 0 : opts.dbbackend));
649 : }
650 :
651 : } else
652 0 : GKFS_DATA->dbbackend(gkfs::metadata::rocksdb_backend);
653 :
654 33 : if(desc.count("--parallaxsize")) { // Size in GB
655 0 : GKFS_DATA->parallax_size_md(stoi(opts.parallax_size));
656 : }
657 :
658 : /*
659 : * Statistics collection arguments
660 : */
661 33 : if(desc.count("--enable-collection")) {
662 33 : GKFS_DATA->enable_stats(true);
663 66 : GKFS_DATA->spdlogger()->info("{}() Statistic collection enabled",
664 33 : __func__);
665 : }
666 33 : if(desc.count("--enable-chunkstats")) {
667 33 : GKFS_DATA->enable_chunkstats(true);
668 66 : GKFS_DATA->spdlogger()->info("{}() Chunk statistic collection enabled",
669 33 : __func__);
670 : }
671 :
672 : #ifdef GKFS_ENABLE_PROMETHEUS
673 33 : if(desc.count("--enable-prometheus")) {
674 0 : GKFS_DATA->enable_prometheus(true);
675 0 : if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
676 0 : GKFS_DATA->spdlogger()->info(
677 0 : "{}() Statistics output to Prometheus enabled", __func__);
678 : else
679 0 : GKFS_DATA->spdlogger()->warn(
680 : "{}() Prometheus statistic output enabled but no stat collection is enabled. There will be no output to Prometheus",
681 0 : __func__);
682 : }
683 :
684 33 : if(desc.count("--prometheus-gateway")) {
685 0 : auto gateway = opts.prometheus_gateway;
686 0 : GKFS_DATA->prometheus_gateway(gateway);
687 0 : if(GKFS_DATA->enable_prometheus())
688 0 : GKFS_DATA->spdlogger()->info("{}() Prometheus gateway set to '{}'",
689 0 : __func__, gateway);
690 : else
691 0 : GKFS_DATA->spdlogger()->warn(
692 : "{}() Prometheus gateway was set but Prometheus is disabled.");
693 : }
694 : #endif
695 :
696 33 : if(desc.count("--output-stats")) {
697 66 : auto stats_file = opts.stats_file;
698 33 : GKFS_DATA->stats_file(stats_file);
699 33 : if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
700 66 : GKFS_DATA->spdlogger()->info(
701 : "{}() Statistics are written to file '{}'", __func__,
702 33 : stats_file);
703 : else
704 0 : GKFS_DATA->spdlogger()->warn(
705 : "{}() --output-stats argument used but no stat collection is enabled. There will be no output to file '{}'",
706 0 : __func__, stats_file);
707 : } else {
708 0 : GKFS_DATA->stats_file("");
709 0 : GKFS_DATA->spdlogger()->debug("{}() Statistics output disabled",
710 0 : __func__);
711 : }
712 33 : }
713 :
714 : /**
715 : * @brief The initial function called when launching the daemon.
716 : * @internal
717 : * Launches all subroutines and waits on a conditional variable to shut it down.
718 : * Daemon will react to the following signals:
719 : *
720 : * SIGINT - Interrupt from keyboard (ctrl-c)
721 : * SIGTERM - Termination signal (kill <daemon_pid>
722 : * SIGKILL - Kill signal (kill -9 <daemon_pid>
723 : * @endinternal
724 : * @param argc number of command line arguments
725 : * @param argv list of the command line arguments
726 : * @return exit status: EXIT_SUCCESS (0) or EXIT_FAILURE (1)
727 : */
728 : int
729 33 : main(int argc, const char* argv[]) {
730 99 : CLI::App desc{"Allowed options"};
731 33 : cli_options opts{};
732 : // clang-format off
733 33 : desc.add_option("--mountdir,-m", opts.mountdir,
734 66 : "Virtual mounting directory where GekkoFS is available.")
735 33 : ->required();
736 66 : desc.add_option(
737 : "--rootdir,-r", opts.rootdir,
738 66 : "Local data directory where GekkoFS data for this daemon is stored.")
739 33 : ->required();
740 66 : desc.add_option(
741 : "--rootdir-suffix,-s", opts.rootdir_suffix,
742 66 : "Creates an additional directory within the rootdir, allowing multiple daemons on one node.");
743 66 : desc.add_option(
744 : "--metadir,-i", opts.metadir,
745 66 : "Metadata directory where GekkoFS RocksDB data directory is located. If not set, rootdir is used.");
746 66 : desc.add_option(
747 : "--listen,-l", opts.listen,
748 : "Address or interface to bind the daemon to. Default: local hostname.\n"
749 : "When used with ofi+verbs the FI_VERBS_IFACE environment variable is set accordingly "
750 : "which associates the verbs device with the network interface. In case FI_VERBS_IFACE "
751 66 : "is already defined, the argument is ignored. Default 'ib'.");
752 66 : desc.add_option("--hosts-file,-H", opts.hosts_file,
753 : "Shared file used by deamons to register their "
754 66 : "endpoints. (default './gkfs_hosts.txt')");
755 66 : desc.add_option(
756 : "--rpc-protocol,-P", opts.rpc_protocol,
757 : "Used RPC protocol for inter-node communication.\n"
758 : "Available: {ofi+sockets, ofi+verbs, ofi+psm2} for TCP, Infiniband, "
759 : "and Omni-Path, respectively. (Default ofi+sockets)\n"
760 66 : "Libfabric must have enabled support verbs or psm2.");
761 33 : desc.add_flag(
762 : "--auto-sm",
763 : "Enables intra-node communication (IPCs) via the `na+sm` (shared memory) protocol, "
764 66 : "instead of using the RPC protocol. (Default off)");
765 33 : desc.add_flag(
766 : "--clean-rootdir",
767 66 : "Cleans Rootdir >before< launching the deamon");
768 33 : desc.add_flag(
769 : "--clean-rootdir-finish,-c",
770 66 : "Cleans Rootdir >after< the deamon finishes");
771 66 : desc.add_option(
772 : "--dbbackend,-d", opts.dbbackend,
773 : "Metadata database backend to use. Available: {rocksdb, parallaxdb}\n"
774 : "RocksDB is default if not set. Parallax support is experimental.\n"
775 66 : "Note, parallaxdb creates a file called rocksdbx with 8GB created in metadir.");
776 66 : desc.add_option("--parallaxsize", opts.parallax_size,
777 : "parallaxdb - metadata file size in GB (default 8GB), "
778 66 : "used only with new files");
779 33 : desc.add_flag(
780 : "--enable-collection",
781 : "Enables collection of general statistics. "
782 66 : "Output requires either the --output-stats or --enable-prometheus argument.");
783 33 : desc.add_flag(
784 : "--enable-chunkstats",
785 : "Enables collection of data chunk statistics in I/O operations."
786 66 : "Output requires either the --output-stats or --enable-prometheus argument.");
787 66 : desc.add_option(
788 : "--output-stats", opts.stats_file,
789 66 : "Creates a thread that outputs the server stats each 10s to the specified file.");
790 : #ifdef GKFS_ENABLE_PROMETHEUS
791 33 : desc.add_flag(
792 : "--enable-prometheus",
793 66 : "Enables prometheus output and a corresponding thread.");
794 :
795 66 : desc.add_option(
796 : "--prometheus-gateway", opts.prometheus_gateway,
797 66 : "Defines the prometheus gateway <ip:port> (Default 127.0.0.1:9091).");
798 : #endif
799 :
800 66 : desc.add_flag("--version", "Print version and exit.");
801 : // clang-format on
802 33 : try {
803 33 : desc.parse(argc, argv);
804 0 : } catch(const CLI::ParseError& e) {
805 0 : return desc.exit(e);
806 : }
807 :
808 :
809 33 : if(desc.count("--version")) {
810 0 : cout << GKFS_VERSION_STRING << endl;
811 : #ifndef NDEBUG
812 0 : cout << "Debug: ON" << endl;
813 : #else
814 : cout << "Debug: OFF" << endl;
815 : #endif
816 : #if GKFS_CREATE_CHECK_PARENTS
817 0 : cout << "Create check parents: ON" << endl;
818 : #else
819 : cout << "Create check parents: OFF" << endl;
820 : #endif
821 0 : cout << "Chunk size: " << gkfs::config::rpc::chunksize << " bytes"
822 33 : << endl;
823 : return EXIT_SUCCESS;
824 : }
825 : // intitialize logging framework
826 33 : initialize_loggers();
827 66 : GKFS_DATA->spdlogger(spdlog::get("main"));
828 :
829 : // parse all input parameters and populate singleton structures
830 33 : try {
831 33 : parse_input(opts, desc);
832 0 : } catch(const std::exception& e) {
833 0 : cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.",
834 0 : e.what());
835 0 : return EXIT_FAILURE;
836 : }
837 :
838 : /*
839 : * Initialize environment and start daemon. Wait until signaled to cancel
840 : * before shutting down
841 : */
842 33 : try {
843 33 : GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__);
844 33 : init_environment();
845 0 : } catch(const std::exception& e) {
846 0 : auto emsg =
847 0 : fmt::format("Failed to initialize environment: {}", e.what());
848 0 : GKFS_DATA->spdlogger()->error(emsg);
849 0 : cerr << emsg << endl;
850 0 : destroy_enviroment();
851 0 : return EXIT_FAILURE;
852 : }
853 :
854 33 : signal(SIGINT, shutdown_handler);
855 33 : signal(SIGTERM, shutdown_handler);
856 33 : signal(SIGKILL, shutdown_handler);
857 :
858 66 : unique_lock<mutex> lk(mtx);
859 : // Wait for shutdown signal to initiate shutdown protocols
860 33 : shutdown_please.wait(lk);
861 33 : GKFS_DATA->spdlogger()->info("{}() Shutting down...", __func__);
862 33 : destroy_enviroment();
863 33 : GKFS_DATA->spdlogger()->info("{}() Complete. Exiting...", __func__);
864 33 : return EXIT_SUCCESS;
865 : }
|