Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS.
12 :
13 : GekkoFS is free software: you can redistribute it and/or modify
14 : it under the terms of the GNU General Public License as published by
15 : the Free Software Foundation, either version 3 of the License, or
16 : (at your option) any later version.
17 :
18 : GekkoFS is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU General Public License for more details.
22 :
23 : You should have received a copy of the GNU General Public License
24 : along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
25 :
26 : SPDX-License-Identifier: GPL-3.0-or-later
27 : */
28 : /**
29 : * @brief Provides all Margo RPC handler definitions called by Mercury on client
30 : * request for all file system metadata operations.
31 : * @internal
32 : * The end of the file defines the associates the Margo RPC handler functions
33 : * and associates them with their corresponding GekkoFS handler functions.
34 : * @endinternal
35 : */
36 :
37 : #include <daemon/handler/rpc_defs.hpp>
38 : #include <daemon/handler/rpc_util.hpp>
39 : #include <daemon/backend/metadata/db.hpp>
40 : #include <daemon/backend/data/chunk_storage.hpp>
41 : #include <daemon/ops/metadentry.hpp>
42 :
43 : #include <common/rpc/rpc_types.hpp>
44 : #include <common/statistics/stats.hpp>
45 :
46 : using namespace std;
47 :
48 : namespace {
49 :
50 : /**
51 : * @brief Serves a file/directory create request or returns an error to the
52 : * client if the object already exists.
53 : * @internal
54 : * The create request creates or updates a corresponding entry in the KV store.
55 : * If the object already exists, the RPC output struct includes an EEXIST error
56 : * code. This is not a hard error. Other unexpected errors are placed in the
57 : * output struct as well.
58 : *
59 : * All exceptions must be caught here and dealt with accordingly.
60 : * @endinteral
61 : * @param handle Mercury RPC handle
62 : * @return Mercury error code to Mercury
63 : */
64 : hg_return_t
65 1067 : rpc_srv_create(hg_handle_t handle) {
66 1067 : rpc_mk_node_in_t in;
67 1067 : rpc_err_out_t out;
68 :
69 1067 : auto ret = margo_get_input(handle, &in);
70 1067 : if(ret != HG_SUCCESS)
71 0 : GKFS_DATA->spdlogger()->error(
72 0 : "{}() Failed to retrieve input from handle", __func__);
73 1067 : assert(ret == HG_SUCCESS);
74 1067 : GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__,
75 1067 : in.path);
76 1067 : gkfs::metadata::Metadata md(in.mode);
77 1067 : try {
78 : // create metadentry
79 2134 : gkfs::metadata::create(in.path, md);
80 1064 : out.err = 0;
81 3 : } catch(const gkfs::metadata::ExistsException& e) {
82 3 : out.err = EEXIST;
83 0 : } catch(const std::exception& e) {
84 0 : GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'",
85 0 : __func__, e.what());
86 0 : out.err = -1;
87 : }
88 :
89 1067 : GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
90 1067 : out.err);
91 1067 : auto hret = margo_respond(handle, &out);
92 1067 : if(hret != HG_SUCCESS) {
93 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
94 : }
95 :
96 : // Destroy handle when finished
97 1067 : margo_free_input(handle, &in);
98 1067 : margo_destroy(handle);
99 1067 : if(GKFS_DATA->enable_stats()) {
100 1067 : GKFS_DATA->stats()->add_value_iops(
101 : gkfs::utils::Stats::IopsOp::iops_create);
102 : }
103 2134 : return HG_SUCCESS;
104 : }
105 :
106 : /**
107 : * @brief Serves a stat request or returns an error to the
108 : * client if the object does not exist.
109 : * @internal
110 : * The stat request reads the corresponding entry in the KV store. The value
111 : * string is directly passed to the client. It sets an error code if the object
112 : * does not exist or in other unexpected errors.
113 : *
114 : * All exceptions must be caught here and dealt with accordingly.
115 : * @endinteral
116 : * @param handle Mercury RPC handle
117 : * @return Mercury error code to Mercury
118 : */
119 : hg_return_t
120 1359 : rpc_srv_stat(hg_handle_t handle) {
121 1359 : rpc_path_only_in_t in{};
122 1359 : rpc_stat_out_t out{};
123 1359 : auto ret = margo_get_input(handle, &in);
124 1359 : if(ret != HG_SUCCESS)
125 0 : GKFS_DATA->spdlogger()->error(
126 0 : "{}() Failed to retrieve input from handle", __func__);
127 1359 : assert(ret == HG_SUCCESS);
128 1359 : GKFS_DATA->spdlogger()->debug("{}() path: '{}'", __func__, in.path);
129 1359 : std::string val;
130 :
131 1359 : try {
132 : // get the metadata
133 1383 : val = gkfs::metadata::get_str(in.path);
134 1335 : out.db_val = val.c_str();
135 1335 : out.err = 0;
136 2670 : GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__,
137 1335 : out.db_val);
138 24 : } catch(const gkfs::metadata::NotFoundException& e) {
139 24 : GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
140 24 : in.path);
141 24 : out.err = ENOENT;
142 0 : } catch(const std::exception& e) {
143 0 : GKFS_DATA->spdlogger()->error(
144 : "{}() Failed to get metadentry from DB: '{}'", __func__,
145 0 : e.what());
146 0 : out.err = EBUSY;
147 : }
148 :
149 1359 : auto hret = margo_respond(handle, &out);
150 1359 : if(hret != HG_SUCCESS) {
151 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
152 : }
153 :
154 : // Destroy handle when finished
155 1359 : margo_free_input(handle, &in);
156 1359 : margo_destroy(handle);
157 :
158 1359 : if(GKFS_DATA->enable_stats()) {
159 1359 : GKFS_DATA->stats()->add_value_iops(
160 : gkfs::utils::Stats::IopsOp::iops_stats);
161 : }
162 1359 : return HG_SUCCESS;
163 : }
164 :
165 : /**
166 : * @brief Serves a request to decrease the file size in the object's KV store
167 : * entry.
168 : * @internal
169 : * All exceptions must be caught here and dealt with accordingly. Any errors are
170 : * placed in the response.
171 : * @endinteral
172 : * @param handle Mercury RPC handle
173 : * @return Mercury error code to Mercury
174 : */
175 : hg_return_t
176 3 : rpc_srv_decr_size(hg_handle_t handle) {
177 3 : rpc_trunc_in_t in{};
178 3 : rpc_err_out_t out{};
179 :
180 3 : auto ret = margo_get_input(handle, &in);
181 3 : if(ret != HG_SUCCESS) {
182 0 : GKFS_DATA->spdlogger()->error(
183 0 : "{}() Failed to retrieve input from handle", __func__);
184 0 : throw runtime_error("Failed to retrieve input from handle");
185 : }
186 :
187 3 : GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__,
188 3 : in.path, in.length);
189 :
190 3 : try {
191 6 : GKFS_DATA->mdb()->decrease_size(in.path, in.length);
192 3 : out.err = 0;
193 0 : } catch(const std::exception& e) {
194 0 : GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'",
195 0 : __func__, e.what());
196 0 : out.err = EIO;
197 : }
198 :
199 3 : GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
200 3 : out.err);
201 3 : auto hret = margo_respond(handle, &out);
202 3 : if(hret != HG_SUCCESS) {
203 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
204 0 : throw runtime_error("Failed to respond");
205 : }
206 : // Destroy handle when finished
207 3 : margo_free_input(handle, &in);
208 3 : margo_destroy(handle);
209 3 : return HG_SUCCESS;
210 : }
211 :
212 : /**
213 : * @brief Serves a request to remove a file/directory metadata.
214 : * @internal
215 : * The handler triggers the removal of the KV store entry but still returns the
216 : * file mode and size information to the client. This is because the size is
217 : * needed to remove all data chunks. The metadata is removed first to ensure
218 : * data isn't removed while the metadata is still available. This could cause
219 : * issues because a stat request would say that the file still exists.
220 : *
221 : * gkfs::config::metadata::implicit_data_removal offers an optimization to
222 : * implicitly remove the data chunks on the metadata node. This can increase
223 : * remove performance for small files.
224 : *
225 : * All exceptions must be caught here and dealt with accordingly. Any errors are
226 : * placed in the response.
227 : * @endinteral
228 : * @param handle Mercury RPC handle
229 : * @return Mercury error code to Mercury
230 : */
231 : hg_return_t
232 8 : rpc_srv_remove_metadata(hg_handle_t handle) {
233 8 : rpc_rm_node_in_t in{};
234 8 : rpc_rm_metadata_out_t out{};
235 :
236 8 : auto ret = margo_get_input(handle, &in);
237 8 : if(ret != HG_SUCCESS)
238 0 : GKFS_DATA->spdlogger()->error(
239 0 : "{}() Failed to retrieve input from handle", __func__);
240 8 : assert(ret == HG_SUCCESS);
241 8 : GKFS_DATA->spdlogger()->debug("{}() Got remove metadata RPC with path '{}'",
242 8 : __func__, in.path);
243 :
244 : // Remove metadentry if exists on the node
245 8 : try {
246 24 : auto md = gkfs::metadata::get(in.path);
247 16 : gkfs::metadata::remove(in.path);
248 8 : out.err = 0;
249 8 : out.mode = md.mode();
250 8 : out.size = md.size();
251 8 : if constexpr(gkfs::config::metadata::implicit_data_removal) {
252 8 : if(S_ISREG(md.mode()) && (md.size() != 0))
253 6 : GKFS_DATA->storage()->destroy_chunk_space(in.path);
254 : }
255 :
256 0 : } catch(const gkfs::metadata::DBException& e) {
257 0 : GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__,
258 0 : in.path, e.what());
259 0 : out.err = EIO;
260 0 : } catch(const gkfs::data::ChunkStorageException& e) {
261 0 : GKFS_DATA->spdlogger()->error(
262 : "{}(): path '{}' errcode '{}' message '{}'", __func__, in.path,
263 0 : e.code().value(), e.what());
264 0 : out.err = e.code().value();
265 0 : } catch(const std::exception& e) {
266 0 : GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__,
267 0 : in.path, e.what());
268 0 : out.err = EBUSY;
269 : }
270 :
271 8 : GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
272 8 : out.err);
273 8 : auto hret = margo_respond(handle, &out);
274 8 : if(hret != HG_SUCCESS) {
275 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
276 : }
277 : // Destroy handle when finished
278 8 : margo_free_input(handle, &in);
279 8 : margo_destroy(handle);
280 8 : if(GKFS_DATA->enable_stats()) {
281 8 : GKFS_DATA->stats()->add_value_iops(
282 : gkfs::utils::Stats::IopsOp::iops_remove);
283 : }
284 8 : return HG_SUCCESS;
285 : }
286 :
287 : /**
288 : * @brief Serves a request to remove all file data chunks on this daemon.
289 : * @internal
290 : * The handler simply issues the removal of all chunk files on the local file
291 : * system.
292 : *
293 : * All exceptions must be caught here and dealt with accordingly. Any errors are
294 : * placed in the response.
295 : * @endinteral
296 : * @param handle Mercury RPC handle
297 : * @return Mercury error code to Mercury
298 : */
299 : hg_return_t
300 3 : rpc_srv_remove_data(hg_handle_t handle) {
301 3 : rpc_rm_node_in_t in{};
302 3 : rpc_err_out_t out{};
303 :
304 3 : auto ret = margo_get_input(handle, &in);
305 3 : if(ret != HG_SUCCESS)
306 0 : GKFS_DATA->spdlogger()->error(
307 0 : "{}() Failed to retrieve input from handle", __func__);
308 3 : assert(ret == HG_SUCCESS);
309 3 : GKFS_DATA->spdlogger()->debug("{}() Got remove data RPC with path '{}'",
310 3 : __func__, in.path);
311 :
312 : // Remove all chunks for that file
313 3 : try {
314 6 : GKFS_DATA->storage()->destroy_chunk_space(in.path);
315 3 : out.err = 0;
316 0 : } catch(const gkfs::data::ChunkStorageException& e) {
317 0 : GKFS_DATA->spdlogger()->error(
318 : "{}(): path '{}' errcode '{}' message '{}'", __func__, in.path,
319 0 : e.code().value(), e.what());
320 0 : out.err = e.code().value();
321 0 : } catch(const std::exception& e) {
322 0 : GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__,
323 0 : in.path, e.what());
324 0 : out.err = EBUSY;
325 : }
326 :
327 3 : GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
328 3 : out.err);
329 3 : auto hret = margo_respond(handle, &out);
330 3 : if(hret != HG_SUCCESS) {
331 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
332 : }
333 : // Destroy handle when finished
334 3 : margo_free_input(handle, &in);
335 3 : margo_destroy(handle);
336 3 : return HG_SUCCESS;
337 : }
338 :
339 : /**
340 : * @brief Serves a request to update the metadata. This function is UNUSED.
341 : * @internal
342 : * All exceptions must be caught here and dealt with accordingly. Any errors are
343 : * placed in the response.
344 : * @endinteral
345 : * @param handle Mercury RPC handle
346 : * @return Mercury error code to Mercury
347 : */
348 : hg_return_t
349 11 : rpc_srv_update_metadentry(hg_handle_t handle) {
350 : // Note: Currently this handler is not called by the client.
351 11 : rpc_update_metadentry_in_t in{};
352 11 : rpc_err_out_t out{};
353 :
354 :
355 11 : auto ret = margo_get_input(handle, &in);
356 11 : if(ret != HG_SUCCESS)
357 0 : GKFS_DATA->spdlogger()->error(
358 0 : "{}() Failed to retrieve input from handle", __func__);
359 11 : assert(ret == HG_SUCCESS);
360 11 : GKFS_DATA->spdlogger()->debug(
361 11 : "{}() Got update metadentry RPC with path '{}'", __func__, in.path);
362 :
363 : // do update
364 11 : try {
365 33 : gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
366 11 : if(in.block_flag == HG_TRUE)
367 11 : md.blocks(in.blocks);
368 11 : if(in.nlink_flag == HG_TRUE)
369 0 : md.link_count(in.nlink);
370 11 : if(in.size_flag == HG_TRUE)
371 6 : md.size(in.size);
372 11 : if(in.atime_flag == HG_TRUE)
373 10 : md.atime(in.atime);
374 11 : if(in.mtime_flag == HG_TRUE)
375 10 : md.mtime(in.mtime);
376 11 : if(in.ctime_flag == HG_TRUE)
377 10 : md.ctime(in.ctime);
378 22 : gkfs::metadata::update(in.path, md);
379 11 : out.err = 0;
380 0 : } catch(const std::exception& e) {
381 : // TODO handle NotFoundException
382 0 : GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__);
383 0 : out.err = 1;
384 : }
385 :
386 11 : GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
387 11 : out.err);
388 11 : auto hret = margo_respond(handle, &out);
389 11 : if(hret != HG_SUCCESS) {
390 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
391 : }
392 :
393 : // Destroy handle when finished
394 11 : margo_free_input(handle, &in);
395 11 : margo_destroy(handle);
396 11 : return HG_SUCCESS;
397 : }
398 :
399 : /**
400 : * @brief Serves a request to update the file size to a given value in the KV
401 : * store.
402 : * @internal
403 : * All exceptions must be caught here and dealt with accordingly. Any errors are
404 : * placed in the response.
405 : * @endinteral
406 : * @param handle Mercury RPC handle
407 : * @return Mercury error code to Mercury
408 : */
409 : hg_return_t
410 41 : rpc_srv_update_metadentry_size(hg_handle_t handle) {
411 41 : rpc_update_metadentry_size_in_t in{};
412 41 : rpc_update_metadentry_size_out_t out{};
413 :
414 41 : auto ret = margo_get_input(handle, &in);
415 41 : if(ret != HG_SUCCESS)
416 0 : GKFS_DATA->spdlogger()->error(
417 0 : "{}() Failed to retrieve input from handle", __func__);
418 41 : assert(ret == HG_SUCCESS);
419 41 : GKFS_DATA->spdlogger()->debug(
420 : "{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__,
421 41 : in.path, in.size, in.offset, in.append);
422 :
423 41 : try {
424 41 : out.ret_offset = gkfs::metadata::update_size(
425 82 : in.path, in.size, in.offset, (in.append == HG_TRUE));
426 41 : out.err = 0;
427 0 : } catch(const gkfs::metadata::NotFoundException& e) {
428 0 : GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
429 0 : in.path);
430 0 : out.err = ENOENT;
431 0 : } catch(const std::exception& e) {
432 0 : GKFS_DATA->spdlogger()->error(
433 : "{}() Failed to update metadentry size on DB: '{}'", __func__,
434 0 : e.what());
435 0 : out.err = EBUSY;
436 : }
437 :
438 41 : GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
439 41 : out.err);
440 41 : auto hret = margo_respond(handle, &out);
441 41 : if(hret != HG_SUCCESS) {
442 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
443 : }
444 :
445 : // Destroy handle when finished
446 41 : margo_free_input(handle, &in);
447 41 : margo_destroy(handle);
448 41 : return HG_SUCCESS;
449 : }
450 :
451 : /**
452 : * @brief Serves a request to return the current file size.
453 : * @internal
454 : * All exceptions must be caught here and dealt with accordingly. Any errors are
455 : * placed in the response.
456 : * @endinteral
457 : * @param handle Mercury RPC handle
458 : * @return Mercury error code to Mercury
459 : */
460 : hg_return_t
461 5 : rpc_srv_get_metadentry_size(hg_handle_t handle) {
462 5 : rpc_path_only_in_t in{};
463 5 : rpc_get_metadentry_size_out_t out{};
464 :
465 :
466 5 : auto ret = margo_get_input(handle, &in);
467 5 : if(ret != HG_SUCCESS)
468 0 : GKFS_DATA->spdlogger()->error(
469 0 : "{}() Failed to retrieve input from handle", __func__);
470 5 : assert(ret == HG_SUCCESS);
471 5 : GKFS_DATA->spdlogger()->debug(
472 : "{}() Got update metadentry size RPC with path '{}'", __func__,
473 5 : in.path);
474 :
475 : // do update
476 5 : try {
477 5 : out.ret_size = gkfs::metadata::get_size(in.path);
478 5 : out.err = 0;
479 0 : } catch(const gkfs::metadata::NotFoundException& e) {
480 0 : GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
481 0 : in.path);
482 0 : out.err = ENOENT;
483 0 : } catch(const std::exception& e) {
484 0 : GKFS_DATA->spdlogger()->error(
485 : "{}() Failed to get metadentry size from DB: '{}'", __func__,
486 0 : e.what());
487 0 : out.err = EBUSY;
488 : }
489 :
490 5 : GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
491 5 : out.err);
492 5 : auto hret = margo_respond(handle, &out);
493 5 : if(hret != HG_SUCCESS) {
494 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
495 : }
496 :
497 : // Destroy handle when finished
498 5 : margo_free_input(handle, &in);
499 5 : margo_destroy(handle);
500 5 : return HG_SUCCESS;
501 : }
502 :
503 : /**
504 : * @brief Serves a request to return all file system objects in a directory.
505 : * @internal
506 : * This handler triggers a KV store scan starting at the given path prefix that
507 : * represents a directory. All KV store entries are returned via a bulk transfer
508 : * as it can involve an arbitrary number of entries.
509 : *
510 : * Note, the bulk buffer size is decided by the client statically although it
511 : * doesn't know if it the space is sufficient to accomodate all entries. This is
512 : * planned to be fixed in the future.
513 : *
514 : * All exceptions must be caught here and dealt with accordingly. Any errors are
515 : * placed in the response.
516 : * @endinteral
517 : * @param handle Mercury RPC handle
518 : * @return Mercury error code to Mercury
519 : */
520 : hg_return_t
521 25 : rpc_srv_get_dirents(hg_handle_t handle) {
522 25 : rpc_get_dirents_in_t in{};
523 25 : rpc_get_dirents_out_t out{};
524 25 : out.err = EIO;
525 25 : out.dirents_size = 0;
526 25 : hg_bulk_t bulk_handle = nullptr;
527 :
528 : // Get input parmeters
529 25 : auto ret = margo_get_input(handle, &in);
530 25 : if(ret != HG_SUCCESS) {
531 0 : GKFS_DATA->spdlogger()->error(
532 : "{}() Could not get RPC input data with err '{}'", __func__,
533 0 : ret);
534 0 : out.err = EBUSY;
535 25 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
536 : }
537 :
538 : // Retrieve size of source buffer
539 25 : auto hgi = margo_get_info(handle);
540 25 : auto mid = margo_hg_info_get_instance(hgi);
541 25 : auto bulk_size = margo_bulk_get_size(in.bulk_handle);
542 25 : GKFS_DATA->spdlogger()->debug("{}() Got RPC: path '{}' bulk_size '{}' ",
543 25 : __func__, in.path, bulk_size);
544 :
545 : // Get directory entries from local DB
546 25 : vector<pair<string, bool>> entries{};
547 25 : try {
548 50 : entries = gkfs::metadata::get_dirents(in.path);
549 0 : } catch(const ::exception& e) {
550 0 : GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'",
551 0 : __func__, e.what());
552 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
553 : }
554 :
555 25 : GKFS_DATA->spdlogger()->trace(
556 : "{}() path '{}' Read database with '{}' entries", __func__, in.path,
557 25 : entries.size());
558 :
559 25 : if(entries.empty()) {
560 10 : out.err = 0;
561 10 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
562 : }
563 :
564 : // Calculate total output size
565 : // TODO OPTIMIZATION: this can be calculated inside db_get_dirents
566 15 : size_t tot_names_size = 0;
567 1044 : for(auto const& e : entries) {
568 1029 : tot_names_size += e.first.size();
569 : }
570 :
571 : // tot_names_size (# characters in entry) + # entries * (bool size + char
572 : // size for \0 character)
573 15 : size_t out_size =
574 15 : tot_names_size + entries.size() * (sizeof(bool) + sizeof(char));
575 15 : if(bulk_size < out_size) {
576 : // Source buffer is smaller than total output size
577 0 : GKFS_DATA->spdlogger()->error(
578 : "{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!",
579 0 : __func__, bulk_size, out_size);
580 0 : out.err = ENOBUFS;
581 25 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
582 : }
583 :
584 15 : void* bulk_buf; // buffer for bulk transfer
585 : // create bulk handle and allocated memory for buffer with out_size
586 : // information
587 15 : ret = margo_bulk_create(mid, 1, nullptr, &out_size, HG_BULK_READ_ONLY,
588 : &bulk_handle);
589 15 : if(ret != HG_SUCCESS) {
590 0 : GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
591 0 : __func__);
592 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
593 : }
594 : // access the internally allocated memory buffer and put it into bulk_buf
595 15 : uint32_t actual_count; // number of segments. we use one here because we
596 : // push the whole buffer at once
597 15 : ret = margo_bulk_access(bulk_handle, 0, out_size, HG_BULK_READ_ONLY, 1,
598 : &bulk_buf, &out_size, &actual_count);
599 15 : if(ret != HG_SUCCESS || actual_count != 1) {
600 0 : GKFS_DATA->spdlogger()->error(
601 : "{}() Failed to access allocated buffer from bulk handle",
602 0 : __func__);
603 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
604 : }
605 :
606 15 : GKFS_DATA->spdlogger()->trace(
607 : "{}() path '{}' entries '{}' out_size '{}'. Set up local read only bulk handle and allocated buffer with size '{}'",
608 15 : __func__, in.path, entries.size(), out_size, out_size);
609 :
610 : // Serialize output data on local buffer
611 15 : auto out_buff_ptr = static_cast<char*>(bulk_buf);
612 15 : auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
613 15 : auto names_ptr = out_buff_ptr + entries.size();
614 :
615 1044 : for(auto const& e : entries) {
616 1029 : if(e.first.empty()) {
617 0 : GKFS_DATA->spdlogger()->warn(
618 : "{}() Entry in readdir() empty. If this shows up, something else is very wrong.",
619 0 : __func__);
620 : }
621 1029 : *bool_ptr = e.second;
622 1029 : bool_ptr++;
623 :
624 1029 : const auto name = e.first.c_str();
625 1029 : ::strcpy(names_ptr, name);
626 : // number of characters + \0 terminator
627 1029 : names_ptr += e.first.size() + 1;
628 : }
629 :
630 15 : GKFS_DATA->spdlogger()->trace(
631 : "{}() path '{}' entries '{}' out_size '{}'. Copied data to bulk_buffer. NEXT bulk_transfer",
632 15 : __func__, in.path, entries.size(), out_size);
633 :
634 15 : ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0,
635 : bulk_handle, 0, out_size);
636 15 : if(ret != HG_SUCCESS) {
637 0 : GKFS_DATA->spdlogger()->error(
638 : "{}() Failed to push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'",
639 0 : __func__, entries.size(), in.path, bulk_size, out_size);
640 0 : out.err = EBUSY;
641 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
642 : }
643 :
644 15 : out.dirents_size = entries.size();
645 15 : out.err = 0;
646 15 : GKFS_DATA->spdlogger()->debug(
647 : "{}() Sending output response err '{}' dirents_size '{}'. DONE",
648 15 : __func__, out.err, out.dirents_size);
649 15 : if(GKFS_DATA->enable_stats()) {
650 15 : GKFS_DATA->stats()->add_value_iops(
651 : gkfs::utils::Stats::IopsOp::iops_dirent);
652 : }
653 15 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
654 : }
655 :
656 : /* Sends the name-size-ctime of a specific directory
657 : * Used to accelerate find
658 : * It mimics get_dirents, but uses a tuple
659 : */
660 :
661 : /**
662 : * @brief Serves a request to return all file system objects in a directory
663 : * including their size and create timestamp.
664 : * @internal
665 : * This is an extension to the above rpc_srv_get_dirents. However, this handler
666 : * is an optimization which needs to be refactored and merged with with
667 : * rpc_srv_get_dirents due to redundant code (TODO).
668 : *
669 : * Note, the bulk buffer size is decided by the client statically although it
670 : * doesn't know if it the space is sufficient to accommodate all entries. This
671 : * is planned to be fixed in the future (TODO).
672 : *
673 : * All exceptions must be caught here and dealt with accordingly. Any errors are
674 : * placed in the response.
675 : * @endinteral
676 : * @param handle Mercury RPC handle
677 : * @return Mercury error code to Mercury
678 : */
679 : hg_return_t
680 4 : rpc_srv_get_dirents_extended(hg_handle_t handle) {
681 4 : rpc_get_dirents_in_t in{};
682 4 : rpc_get_dirents_out_t out{};
683 4 : out.err = EIO;
684 4 : out.dirents_size = 0;
685 4 : hg_bulk_t bulk_handle = nullptr;
686 :
687 : // Get input parmeters
688 4 : auto ret = margo_get_input(handle, &in);
689 4 : if(ret != HG_SUCCESS) {
690 0 : GKFS_DATA->spdlogger()->error(
691 : "{}() Could not get RPC input data with err '{}'", __func__,
692 0 : ret);
693 0 : out.err = EBUSY;
694 4 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
695 : }
696 :
697 : // Retrieve size of source buffer
698 4 : auto hgi = margo_get_info(handle);
699 4 : auto mid = margo_hg_info_get_instance(hgi);
700 4 : auto bulk_size = margo_bulk_get_size(in.bulk_handle);
701 4 : GKFS_DATA->spdlogger()->debug("{}() Got RPC: path '{}' bulk_size '{}' ",
702 4 : __func__, in.path, bulk_size);
703 :
704 : // Get directory entries from local DB
705 4 : vector<tuple<string, bool, size_t, time_t>> entries{};
706 4 : try {
707 8 : entries = gkfs::metadata::get_dirents_extended(in.path);
708 0 : } catch(const ::exception& e) {
709 0 : GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'",
710 0 : __func__, e.what());
711 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
712 : }
713 :
714 4 : GKFS_DATA->spdlogger()->trace(
715 : "{}() path '{}' Read database with '{}' entries", __func__, in.path,
716 4 : entries.size());
717 :
718 4 : if(entries.empty()) {
719 2 : out.err = 0;
720 2 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
721 : }
722 :
723 : // Calculate total output size
724 : // TODO OPTIMIZATION: this can be calculated inside db_get_dirents
725 2 : size_t tot_names_size = 0;
726 6 : for(auto const& e : entries) {
727 4 : tot_names_size += (get<0>(e)).size();
728 : }
729 :
730 : // tot_names_size (# characters in entry) + # entries * (bool size + char
731 : // size for \0 character)
732 2 : size_t out_size =
733 2 : tot_names_size + entries.size() * (sizeof(bool) + sizeof(char) +
734 2 : sizeof(size_t) + sizeof(time_t));
735 2 : if(bulk_size < out_size) {
736 : // Source buffer is smaller than total output size
737 0 : GKFS_DATA->spdlogger()->error(
738 : "{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!",
739 0 : __func__, bulk_size, out_size);
740 0 : out.err = ENOBUFS;
741 4 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
742 : }
743 :
744 2 : void* bulk_buf; // buffer for bulk transfer
745 : // create bulk handle and allocated memory for buffer with out_size
746 : // information
747 2 : ret = margo_bulk_create(mid, 1, nullptr, &out_size, HG_BULK_READ_ONLY,
748 : &bulk_handle);
749 2 : if(ret != HG_SUCCESS) {
750 0 : GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
751 0 : __func__);
752 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
753 : }
754 : // access the internally allocated memory buffer and put it into bulk_buf
755 2 : uint32_t actual_count; // number of segments. we use one here because we
756 : // push the whole buffer at once
757 2 : ret = margo_bulk_access(bulk_handle, 0, out_size, HG_BULK_READ_ONLY, 1,
758 : &bulk_buf, &out_size, &actual_count);
759 2 : if(ret != HG_SUCCESS || actual_count != 1) {
760 0 : GKFS_DATA->spdlogger()->error(
761 : "{}() Failed to access allocated buffer from bulk handle",
762 0 : __func__);
763 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
764 : }
765 :
766 2 : GKFS_DATA->spdlogger()->trace(
767 : "{}() path '{}' entries '{}' out_size '{}'. Set up local read only bulk handle and allocated buffer with size '{}'",
768 2 : __func__, in.path, entries.size(), out_size, out_size);
769 :
770 : // Serialize output data on local buffer
771 : // The parenthesis are extremely important, if not the + will be size_t or
772 : // time_t size and not char
773 2 : auto out_buff_ptr = static_cast<char*>(bulk_buf);
774 2 : auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
775 2 : auto size_ptr = reinterpret_cast<size_t*>((out_buff_ptr) +
776 2 : (entries.size() * sizeof(bool)));
777 2 : auto ctime_ptr = reinterpret_cast<time_t*>(
778 : (out_buff_ptr) +
779 2 : (entries.size() * (sizeof(bool) + sizeof(size_t))));
780 2 : auto names_ptr =
781 : out_buff_ptr +
782 2 : (entries.size() * (sizeof(bool) + sizeof(size_t) + sizeof(time_t)));
783 :
784 6 : for(auto const& e : entries) {
785 4 : if((get<0>(e)).empty()) {
786 0 : GKFS_DATA->spdlogger()->warn(
787 : "{}() Entry in readdir() empty. If this shows up, something else is very wrong.",
788 0 : __func__);
789 : }
790 4 : *bool_ptr = (get<1>(e));
791 4 : bool_ptr++;
792 :
793 4 : *size_ptr = (get<2>(e));
794 4 : size_ptr++;
795 :
796 4 : *ctime_ptr = (get<3>(e));
797 4 : ctime_ptr++;
798 :
799 4 : const auto name = (get<0>(e)).c_str();
800 4 : ::strcpy(names_ptr, name);
801 : // number of characters + \0 terminator
802 4 : names_ptr += ((get<0>(e)).size() + 1);
803 : }
804 :
805 2 : GKFS_DATA->spdlogger()->trace(
806 : "{}() path '{}' entries '{}' out_size '{}'. Copied data to bulk_buffer. NEXT bulk_transfer",
807 2 : __func__, in.path, entries.size(), out_size);
808 2 : ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0,
809 : bulk_handle, 0, out_size);
810 2 : if(ret != HG_SUCCESS) {
811 0 : GKFS_DATA->spdlogger()->error(
812 : "{}() Failed to push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'",
813 0 : __func__, entries.size(), in.path, bulk_size, out_size);
814 0 : out.err = EBUSY;
815 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
816 : }
817 :
818 2 : out.dirents_size = entries.size();
819 2 : out.err = 0;
820 2 : GKFS_DATA->spdlogger()->debug(
821 : "{}() Sending output response err '{}' dirents_size '{}'. DONE",
822 2 : __func__, out.err, out.dirents_size);
823 2 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
824 : }
825 :
826 : #if defined(HAS_SYMLINKS) || defined(HAS_RENAME)
827 : /**
828 : * @brief Serves a request create a symbolic link and supports rename
829 : * @internal
830 : * The state of this function is unclear and requires a complete refactor.
831 : *
832 : * All exceptions must be caught here and dealt with accordingly. Any errors are
833 : * placed in the response.
834 : * @endinteral
835 : * @param handle Mercury RPC handle
836 : * @return Mercury error code to Mercury
837 : */
838 : hg_return_t
839 20 : rpc_srv_mk_symlink(hg_handle_t handle) {
840 20 : rpc_mk_symlink_in_t in{};
841 20 : rpc_err_out_t out{};
842 :
843 20 : auto ret = margo_get_input(handle, &in);
844 20 : if(ret != HG_SUCCESS) {
845 0 : GKFS_DATA->spdlogger()->error(
846 0 : "{}() Failed to retrieve input from handle", __func__);
847 : }
848 20 : GKFS_DATA->spdlogger()->debug(
849 : "{}() Got RPC with path '{}' and target path '{}'", __func__,
850 20 : in.path, in.target_path);
851 : // do update
852 20 : try {
853 60 : gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
854 : #ifdef HAS_RENAME
855 20 : if(md.blocks() == -1) {
856 : // We need to fill the rename path as this is an inverse path
857 : // old -> new
858 20 : md.rename_path(in.target_path);
859 : } else {
860 : #endif // HAS_RENAME
861 20 : md.target_path(in.target_path);
862 : #ifdef HAS_RENAME
863 : }
864 : #endif // HAS_RENAME
865 40 : GKFS_DATA->spdlogger()->debug(
866 : "{}() Updating path '{}' with metadata '{}'", __func__, in.path,
867 20 : md.serialize());
868 40 : gkfs::metadata::update(in.path, md);
869 20 : out.err = 0;
870 0 : } catch(const std::exception& e) {
871 : // TODO handle NotFoundException
872 0 : GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__);
873 0 : out.err = 1;
874 : }
875 :
876 20 : GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
877 20 : out.err);
878 20 : auto hret = margo_respond(handle, &out);
879 20 : if(hret != HG_SUCCESS) {
880 0 : GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
881 : }
882 :
883 : // Destroy handle when finished
884 20 : margo_free_input(handle, &in);
885 20 : margo_destroy(handle);
886 20 : return HG_SUCCESS;
887 : }
888 :
889 : #endif // HAS_SYMLINKS || HAS_RENAME
890 :
891 : } // namespace
892 :
893 2134 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_create)
894 :
895 2718 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_stat)
896 :
897 6 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_decr_size)
898 :
899 16 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove_metadata)
900 :
901 6 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove_data)
902 :
903 22 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry)
904 :
905 82 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry_size)
906 :
907 10 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_metadentry_size)
908 :
909 50 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents)
910 :
911 8 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents_extended)
912 : #ifdef HAS_SYMLINKS
913 :
914 40 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink)
915 :
916 : #endif
|