Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS' POSIX interface.
12 :
13 : GekkoFS' POSIX interface is free software: you can redistribute it and/or
14 : modify it under the terms of the GNU Lesser General Public License as
15 : published by the Free Software Foundation, either version 3 of the License,
16 : or (at your option) any later version.
17 :
18 : GekkoFS' POSIX interface is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU Lesser General Public License for more details.
22 :
23 : You should have received a copy of the GNU Lesser General Public License
24 : along with GekkoFS' POSIX interface. If not, see
25 : <https://www.gnu.org/licenses/>.
26 :
27 : SPDX-License-Identifier: LGPL-3.0-or-later
28 : */
29 :
30 : #include <client/preload.hpp>
31 : #include <client/path.hpp>
32 : #include <client/logging.hpp>
33 : #include <client/rpc/forward_management.hpp>
34 : #include <client/preload_util.hpp>
35 : #include <client/intercept.hpp>
36 :
37 : #include <common/rpc/distributor.hpp>
38 : #include <common/common_defs.hpp>
39 :
40 : #include <ctime>
41 : #include <cstdlib>
42 : #include <fstream>
43 :
44 : #include <hermes.hpp>
45 :
46 :
47 : using namespace std;
48 :
49 : std::unique_ptr<hermes::async_engine> ld_network_service; // extern variable
50 :
51 : namespace {
52 :
53 : // FORWARDING
54 : pthread_t mapper;
55 : bool forwarding_running;
56 :
57 : pthread_mutex_t remap_mutex;
58 : pthread_cond_t remap_signal;
59 : // END FORWARDING
60 :
61 : inline void
62 0 : exit_error_msg(int errcode, const string& msg) {
63 :
64 0 : LOG_ERROR("{}", msg);
65 0 : gkfs::log::logger::log_message(stderr, "{}\n", msg);
66 :
67 : // if we don't disable interception before calling ::exit()
68 : // syscall hooks may find an inconsistent in shared state
69 : // (e.g. the logger) and thus, crash
70 0 : if(CTX->interception_enabled()) {
71 0 : gkfs::preload::stop_interception();
72 0 : CTX->disable_interception();
73 : }
74 0 : ::exit(errcode);
75 : }
76 :
77 : /**
78 : * Initializes the Hermes client for a given transport prefix
79 : * @return true if successfully initialized; false otherwise
80 : */
81 : bool
82 269 : init_hermes_client() {
83 :
84 269 : try {
85 :
86 269 : hermes::engine_options opts{};
87 :
88 269 : if(CTX->auto_sm())
89 0 : opts |= hermes::use_auto_sm;
90 269 : if(gkfs::rpc::protocol::ofi_psm2 == CTX->rpc_protocol()) {
91 0 : opts |= hermes::force_no_block_progress;
92 : }
93 :
94 269 : opts |= hermes::process_may_fork;
95 :
96 807 : ld_network_service = std::make_unique<hermes::async_engine>(
97 538 : hermes::get_transport_type(CTX->rpc_protocol()), opts);
98 269 : ld_network_service->run();
99 0 : } catch(const std::exception& ex) {
100 0 : fmt::print(stderr, "Failed to initialize Hermes RPC client {}\n",
101 0 : ex.what());
102 0 : return false;
103 : }
104 269 : return true;
105 : }
106 :
107 : void*
108 21 : forwarding_mapper(void* p) {
109 21 : struct timespec timeout;
110 21 : clock_gettime(CLOCK_REALTIME, &timeout);
111 21 : timeout.tv_sec += 10; // 10 seconds
112 :
113 21 : int previous = -1;
114 :
115 42 : while(forwarding_running) {
116 21 : try {
117 21 : gkfs::utils::load_forwarding_map();
118 :
119 21 : if(previous != (int64_t) CTX->fwd_host_id()) {
120 21 : LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
121 :
122 21 : previous = CTX->fwd_host_id();
123 : }
124 0 : } catch(std::exception& e) {
125 0 : exit_error_msg(EXIT_FAILURE,
126 0 : fmt::format("Unable set the forwarding host '{}'",
127 0 : e.what()));
128 : }
129 :
130 21 : pthread_mutex_lock(&remap_mutex);
131 21 : pthread_cond_timedwait(&remap_signal, &remap_mutex, &timeout);
132 21 : pthread_mutex_unlock(&remap_mutex);
133 : }
134 :
135 21 : return nullptr;
136 : }
137 :
138 : void
139 21 : init_forwarding_mapper() {
140 21 : forwarding_running = true;
141 :
142 21 : pthread_create(&mapper, NULL, forwarding_mapper, NULL);
143 21 : }
144 :
145 : void
146 21 : destroy_forwarding_mapper() {
147 21 : forwarding_running = false;
148 :
149 21 : pthread_cond_signal(&remap_signal);
150 :
151 21 : pthread_join(mapper, NULL);
152 21 : }
153 :
154 : void
155 269 : log_prog_name() {
156 269 : std::string line;
157 538 : std::ifstream cmdline("/proc/self/cmdline");
158 269 : if(!cmdline.is_open()) {
159 0 : LOG(ERROR, "Unable to open cmdline file");
160 0 : throw std::runtime_error("Unable to open cmdline file");
161 : }
162 269 : if(!getline(cmdline, line)) {
163 0 : throw std::runtime_error("Unable to read cmdline file");
164 : }
165 269 : std::replace(line.begin(), line.end(), '\0', ' ');
166 269 : line.erase(line.length() - 1, line.length());
167 269 : LOG(INFO, "Process cmdline: '{}'", line);
168 269 : cmdline.close();
169 269 : }
170 :
171 : } // namespace
172 :
173 : namespace gkfs::preload {
174 :
175 : /**
176 : * This function is only called in the preload constructor and initializes
177 : * the file system client
178 : */
179 : void
180 269 : init_environment() {
181 :
182 269 : vector<pair<string, string>> hosts{};
183 269 : try {
184 269 : LOG(INFO, "Loading peer addresses...");
185 538 : hosts = gkfs::utils::read_hosts_file();
186 0 : } catch(const std::exception& e) {
187 0 : exit_error_msg(EXIT_FAILURE,
188 0 : "Failed to load hosts addresses: "s + e.what());
189 : }
190 :
191 : // initialize Hermes interface to Mercury
192 269 : LOG(INFO, "Initializing RPC subsystem...");
193 :
194 269 : if(!init_hermes_client()) {
195 0 : exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem");
196 : }
197 :
198 269 : try {
199 269 : gkfs::utils::connect_to_hosts(hosts);
200 0 : } catch(const std::exception& e) {
201 0 : exit_error_msg(EXIT_FAILURE,
202 0 : "Failed to connect to hosts: "s + e.what());
203 : }
204 :
205 : /* Setup distributor */
206 269 : auto forwarding_map_file = gkfs::env::get_var(
207 807 : gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
208 :
209 269 : if(!forwarding_map_file.empty()) {
210 21 : try {
211 21 : gkfs::utils::load_forwarding_map();
212 :
213 21 : LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
214 0 : } catch(std::exception& e) {
215 0 : exit_error_msg(EXIT_FAILURE,
216 0 : fmt::format("Unable set the forwarding host '{}'",
217 0 : e.what()));
218 : }
219 :
220 21 : auto forwarder_dist = std::make_shared<gkfs::rpc::ForwarderDistributor>(
221 42 : CTX->fwd_host_id(), CTX->hosts().size());
222 63 : CTX->distributor(forwarder_dist);
223 : } else {
224 :
225 : #ifdef GKFS_USE_GUIDED_DISTRIBUTION
226 248 : auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
227 496 : CTX->local_host_id(), CTX->hosts().size());
228 : #else
229 : auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
230 : CTX->local_host_id(), CTX->hosts().size());
231 : #endif
232 744 : CTX->distributor(distributor);
233 : }
234 :
235 269 : LOG(INFO, "Retrieving file system configuration...");
236 :
237 269 : if(!gkfs::rpc::forward_get_fs_config()) {
238 0 : exit_error_msg(
239 : EXIT_FAILURE,
240 0 : "Unable to fetch file system configurations from daemon process through RPC.");
241 : }
242 : // Initialize random number generator and seed for replica selection
243 : // in case of failure, a new replica will be selected
244 269 : if(CTX->get_replicas() > 0) {
245 0 : srand(time(nullptr));
246 : }
247 :
248 269 : LOG(INFO, "Environment initialization successful.");
249 269 : }
250 :
251 : } // namespace gkfs::preload
252 :
253 : /**
254 : * Called initially ONCE when preload library is used with the LD_PRELOAD
255 : * environment variable
256 : */
257 : void
258 269 : init_preload() {
259 : // The original errno value will be restored after initialization to not
260 : // leak internal error codes
261 269 : auto oerrno = errno;
262 :
263 269 : CTX->enable_interception();
264 269 : gkfs::preload::start_self_interception();
265 :
266 269 : CTX->init_logging();
267 : // from here ownwards it is safe to print messages
268 269 : LOG(DEBUG, "Logging subsystem initialized");
269 :
270 : // Kernel modules such as ib_uverbs may create fds in kernel space and pass
271 : // them to user-space processes using ioctl()-like interfaces. if this
272 : // happens during our internal initialization, there's no way for us to
273 : // control this creation and the fd will be created in the
274 : // [0, MAX_USER_FDS) range rather than in our private
275 : // [MAX_USER_FDS, GKFS_MAX_OPEN_FDS) range.
276 : // with MAX_USER_FDS = GKFS_MAX_OPEN_FDS - GKFS_MAX_INTERNAL_FDS
277 : // To prevent this for our internal
278 : // initialization code, we forcefully occupy the user fd range to force
279 : // such modules to create fds in our private range.
280 269 : CTX->protect_user_fds();
281 :
282 269 : log_prog_name();
283 269 : gkfs::path::init_cwd();
284 :
285 269 : LOG(DEBUG, "Current working directory: '{}'", CTX->cwd());
286 269 : LOG(DEBUG, "Number of replicas : '{}'", CTX->get_replicas());
287 269 : gkfs::preload::init_environment();
288 269 : CTX->enable_interception();
289 :
290 269 : CTX->unprotect_user_fds();
291 :
292 269 : auto forwarding_map_file = gkfs::env::get_var(
293 538 : gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
294 269 : if(!forwarding_map_file.empty()) {
295 21 : init_forwarding_mapper();
296 : }
297 :
298 269 : gkfs::preload::start_interception();
299 269 : errno = oerrno;
300 269 : }
301 :
302 : /**
303 : * Called last when preload library is used with the LD_PRELOAD environment
304 : * variable
305 : */
306 : void
307 269 : destroy_preload() {
308 269 : auto forwarding_map_file = gkfs::env::get_var(
309 538 : gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
310 269 : if(!forwarding_map_file.empty()) {
311 21 : destroy_forwarding_mapper();
312 : }
313 269 : CTX->clear_hosts();
314 269 : LOG(DEBUG, "Peer information deleted");
315 :
316 269 : ld_network_service.reset();
317 269 : LOG(DEBUG, "RPC subsystem shut down");
318 :
319 269 : if(CTX->interception_enabled()) {
320 269 : gkfs::preload::stop_interception();
321 269 : CTX->disable_interception();
322 269 : LOG(DEBUG, "Syscall interception stopped");
323 : }
324 :
325 269 : LOG(INFO, "All subsystems shut down. Client shutdown complete.");
326 269 : }
327 :
328 :
329 : /**
330 : * @brief External functions to call linking the library
331 : *
332 : */
333 : extern "C" int
334 0 : gkfs_init() {
335 0 : CTX->init_logging();
336 :
337 : // from here ownwards it is safe to print messages
338 0 : LOG(DEBUG, "Logging subsystem initialized");
339 :
340 0 : gkfs::preload::init_environment();
341 :
342 0 : return 0;
343 : }
344 :
345 :
346 : extern "C" int
347 0 : gkfs_end() {
348 0 : CTX->clear_hosts();
349 0 : LOG(DEBUG, "Peer information deleted");
350 :
351 0 : ld_network_service.reset();
352 0 : LOG(DEBUG, "RPC subsystem shut down");
353 :
354 0 : LOG(INFO, "All subsystems shut down. Client shutdown complete.");
355 :
356 0 : return 0;
357 : }
|