LCOV - code coverage report
Current view: top level - src/daemon/handler - srv_metadata.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 334 462 72.3 %
Date: 2024-04-23 00:09:24 Functions: 33 33 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS.
      12             : 
      13             :   GekkoFS is free software: you can redistribute it and/or modify
      14             :   it under the terms of the GNU General Public License as published by
      15             :   the Free Software Foundation, either version 3 of the License, or
      16             :   (at your option) any later version.
      17             : 
      18             :   GekkoFS is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU General Public License
      24             :   along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.
      25             : 
      26             :   SPDX-License-Identifier: GPL-3.0-or-later
      27             : */
      28             : /**
      29             :  * @brief Provides all Margo RPC handler definitions called by Mercury on client
      30             :  * request for all file system metadata operations.
      31             :  * @internal
      32             :  * The end of the file defines the associates the Margo RPC handler functions
      33             :  * and associates them with their corresponding GekkoFS handler functions.
      34             :  * @endinternal
      35             :  */
      36             : 
      37             : #include <daemon/handler/rpc_defs.hpp>
      38             : #include <daemon/handler/rpc_util.hpp>
      39             : #include <daemon/backend/metadata/db.hpp>
      40             : #include <daemon/backend/data/chunk_storage.hpp>
      41             : #include <daemon/ops/metadentry.hpp>
      42             : 
      43             : #include <common/rpc/rpc_types.hpp>
      44             : #include <common/statistics/stats.hpp>
      45             : 
      46             : using namespace std;
      47             : 
      48             : namespace {
      49             : 
      50             : /**
      51             :  * @brief Serves a file/directory create request or returns an error to the
      52             :  * client if the object already exists.
      53             :  * @internal
      54             :  * The create request creates or updates a corresponding entry in the KV store.
      55             :  * If the object already exists, the RPC output struct includes an EEXIST error
      56             :  * code. This is not a hard error. Other unexpected errors are placed in the
      57             :  * output struct as well.
      58             :  *
      59             :  * All exceptions must be caught here and dealt with accordingly.
      60             :  * @endinteral
      61             :  * @param handle Mercury RPC handle
      62             :  * @return Mercury error code to Mercury
      63             :  */
      64             : hg_return_t
      65        1067 : rpc_srv_create(hg_handle_t handle) {
      66        1067 :     rpc_mk_node_in_t in;
      67        1067 :     rpc_err_out_t out;
      68             : 
      69        1067 :     auto ret = margo_get_input(handle, &in);
      70        1067 :     if(ret != HG_SUCCESS)
      71           0 :         GKFS_DATA->spdlogger()->error(
      72           0 :                 "{}() Failed to retrieve input from handle", __func__);
      73        1067 :     assert(ret == HG_SUCCESS);
      74        1067 :     GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__,
      75        1067 :                                   in.path);
      76        1067 :     gkfs::metadata::Metadata md(in.mode);
      77        1067 :     try {
      78             :         // create metadentry
      79        2134 :         gkfs::metadata::create(in.path, md);
      80        1064 :         out.err = 0;
      81           3 :     } catch(const gkfs::metadata::ExistsException& e) {
      82           3 :         out.err = EEXIST;
      83           0 :     } catch(const std::exception& e) {
      84           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'",
      85           0 :                                       __func__, e.what());
      86           0 :         out.err = -1;
      87             :     }
      88             : 
      89        1067 :     GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
      90        1067 :                                   out.err);
      91        1067 :     auto hret = margo_respond(handle, &out);
      92        1067 :     if(hret != HG_SUCCESS) {
      93           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
      94             :     }
      95             : 
      96             :     // Destroy handle when finished
      97        1067 :     margo_free_input(handle, &in);
      98        1067 :     margo_destroy(handle);
      99        1067 :     if(GKFS_DATA->enable_stats()) {
     100        1067 :         GKFS_DATA->stats()->add_value_iops(
     101             :                 gkfs::utils::Stats::IopsOp::iops_create);
     102             :     }
     103        2134 :     return HG_SUCCESS;
     104             : }
     105             : 
     106             : /**
     107             :  * @brief Serves a stat request or returns an error to the
     108             :  * client if the object does not exist.
     109             :  * @internal
     110             :  * The stat request reads the corresponding entry in the KV store. The value
     111             :  * string is directly passed to the client. It sets an error code if the object
     112             :  * does not exist or in other unexpected errors.
     113             :  *
     114             :  * All exceptions must be caught here and dealt with accordingly.
     115             :  * @endinteral
     116             :  * @param handle Mercury RPC handle
     117             :  * @return Mercury error code to Mercury
     118             :  */
     119             : hg_return_t
     120        1359 : rpc_srv_stat(hg_handle_t handle) {
     121        1359 :     rpc_path_only_in_t in{};
     122        1359 :     rpc_stat_out_t out{};
     123        1359 :     auto ret = margo_get_input(handle, &in);
     124        1359 :     if(ret != HG_SUCCESS)
     125           0 :         GKFS_DATA->spdlogger()->error(
     126           0 :                 "{}() Failed to retrieve input from handle", __func__);
     127        1359 :     assert(ret == HG_SUCCESS);
     128        1359 :     GKFS_DATA->spdlogger()->debug("{}() path: '{}'", __func__, in.path);
     129        1359 :     std::string val;
     130             : 
     131        1359 :     try {
     132             :         // get the metadata
     133        1383 :         val = gkfs::metadata::get_str(in.path);
     134        1335 :         out.db_val = val.c_str();
     135        1335 :         out.err = 0;
     136        2670 :         GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__,
     137        1335 :                                       out.db_val);
     138          24 :     } catch(const gkfs::metadata::NotFoundException& e) {
     139          24 :         GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
     140          24 :                                       in.path);
     141          24 :         out.err = ENOENT;
     142           0 :     } catch(const std::exception& e) {
     143           0 :         GKFS_DATA->spdlogger()->error(
     144             :                 "{}() Failed to get metadentry from DB: '{}'", __func__,
     145           0 :                 e.what());
     146           0 :         out.err = EBUSY;
     147             :     }
     148             : 
     149        1359 :     auto hret = margo_respond(handle, &out);
     150        1359 :     if(hret != HG_SUCCESS) {
     151           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     152             :     }
     153             : 
     154             :     // Destroy handle when finished
     155        1359 :     margo_free_input(handle, &in);
     156        1359 :     margo_destroy(handle);
     157             : 
     158        1359 :     if(GKFS_DATA->enable_stats()) {
     159        1359 :         GKFS_DATA->stats()->add_value_iops(
     160             :                 gkfs::utils::Stats::IopsOp::iops_stats);
     161             :     }
     162        1359 :     return HG_SUCCESS;
     163             : }
     164             : 
     165             : /**
     166             :  * @brief Serves a request to decrease the file size in the object's KV store
     167             :  * entry.
     168             :  * @internal
     169             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     170             :  * placed in the response.
     171             :  * @endinteral
     172             :  * @param handle Mercury RPC handle
     173             :  * @return Mercury error code to Mercury
     174             :  */
     175             : hg_return_t
     176           3 : rpc_srv_decr_size(hg_handle_t handle) {
     177           3 :     rpc_trunc_in_t in{};
     178           3 :     rpc_err_out_t out{};
     179             : 
     180           3 :     auto ret = margo_get_input(handle, &in);
     181           3 :     if(ret != HG_SUCCESS) {
     182           0 :         GKFS_DATA->spdlogger()->error(
     183           0 :                 "{}() Failed to retrieve input from handle", __func__);
     184           0 :         throw runtime_error("Failed to retrieve input from handle");
     185             :     }
     186             : 
     187           3 :     GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__,
     188           3 :                                   in.path, in.length);
     189             : 
     190           3 :     try {
     191           6 :         GKFS_DATA->mdb()->decrease_size(in.path, in.length);
     192           3 :         out.err = 0;
     193           0 :     } catch(const std::exception& e) {
     194           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'",
     195           0 :                                       __func__, e.what());
     196           0 :         out.err = EIO;
     197             :     }
     198             : 
     199           3 :     GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
     200           3 :                                   out.err);
     201           3 :     auto hret = margo_respond(handle, &out);
     202           3 :     if(hret != HG_SUCCESS) {
     203           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     204           0 :         throw runtime_error("Failed to respond");
     205             :     }
     206             :     // Destroy handle when finished
     207           3 :     margo_free_input(handle, &in);
     208           3 :     margo_destroy(handle);
     209           3 :     return HG_SUCCESS;
     210             : }
     211             : 
     212             : /**
     213             :  * @brief Serves a request to remove a file/directory metadata.
     214             :  * @internal
     215             :  * The handler triggers the removal of the KV store entry but still returns the
     216             :  * file mode and size information to the client. This is because the size is
     217             :  * needed to remove all data chunks. The metadata is removed first to ensure
     218             :  * data isn't removed while the metadata is still available. This could cause
     219             :  * issues because a stat request would say that the file still exists.
     220             :  *
     221             :  * gkfs::config::metadata::implicit_data_removal offers an optimization to
     222             :  * implicitly remove the data chunks on the metadata node. This can increase
     223             :  * remove performance for small files.
     224             :  *
     225             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     226             :  * placed in the response.
     227             :  * @endinteral
     228             :  * @param handle Mercury RPC handle
     229             :  * @return Mercury error code to Mercury
     230             :  */
     231             : hg_return_t
     232           8 : rpc_srv_remove_metadata(hg_handle_t handle) {
     233           8 :     rpc_rm_node_in_t in{};
     234           8 :     rpc_rm_metadata_out_t out{};
     235             : 
     236           8 :     auto ret = margo_get_input(handle, &in);
     237           8 :     if(ret != HG_SUCCESS)
     238           0 :         GKFS_DATA->spdlogger()->error(
     239           0 :                 "{}() Failed to retrieve input from handle", __func__);
     240           8 :     assert(ret == HG_SUCCESS);
     241           8 :     GKFS_DATA->spdlogger()->debug("{}() Got remove metadata RPC with path '{}'",
     242           8 :                                   __func__, in.path);
     243             : 
     244             :     // Remove metadentry if exists on the node
     245           8 :     try {
     246          24 :         auto md = gkfs::metadata::get(in.path);
     247          16 :         gkfs::metadata::remove(in.path);
     248           8 :         out.err = 0;
     249           8 :         out.mode = md.mode();
     250           8 :         out.size = md.size();
     251           8 :         if constexpr(gkfs::config::metadata::implicit_data_removal) {
     252           8 :             if(S_ISREG(md.mode()) && (md.size() != 0))
     253           6 :                 GKFS_DATA->storage()->destroy_chunk_space(in.path);
     254             :         }
     255             : 
     256           0 :     } catch(const gkfs::metadata::DBException& e) {
     257           0 :         GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__,
     258           0 :                                       in.path, e.what());
     259           0 :         out.err = EIO;
     260           0 :     } catch(const gkfs::data::ChunkStorageException& e) {
     261           0 :         GKFS_DATA->spdlogger()->error(
     262             :                 "{}(): path '{}' errcode '{}' message '{}'", __func__, in.path,
     263           0 :                 e.code().value(), e.what());
     264           0 :         out.err = e.code().value();
     265           0 :     } catch(const std::exception& e) {
     266           0 :         GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__,
     267           0 :                                       in.path, e.what());
     268           0 :         out.err = EBUSY;
     269             :     }
     270             : 
     271           8 :     GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
     272           8 :                                   out.err);
     273           8 :     auto hret = margo_respond(handle, &out);
     274           8 :     if(hret != HG_SUCCESS) {
     275           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     276             :     }
     277             :     // Destroy handle when finished
     278           8 :     margo_free_input(handle, &in);
     279           8 :     margo_destroy(handle);
     280           8 :     if(GKFS_DATA->enable_stats()) {
     281           8 :         GKFS_DATA->stats()->add_value_iops(
     282             :                 gkfs::utils::Stats::IopsOp::iops_remove);
     283             :     }
     284           8 :     return HG_SUCCESS;
     285             : }
     286             : 
     287             : /**
     288             :  * @brief Serves a request to remove all file data chunks on this daemon.
     289             :  * @internal
     290             :  * The handler simply issues the removal of all chunk files on the local file
     291             :  * system.
     292             :  *
     293             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     294             :  * placed in the response.
     295             :  * @endinteral
     296             :  * @param handle Mercury RPC handle
     297             :  * @return Mercury error code to Mercury
     298             :  */
     299             : hg_return_t
     300           3 : rpc_srv_remove_data(hg_handle_t handle) {
     301           3 :     rpc_rm_node_in_t in{};
     302           3 :     rpc_err_out_t out{};
     303             : 
     304           3 :     auto ret = margo_get_input(handle, &in);
     305           3 :     if(ret != HG_SUCCESS)
     306           0 :         GKFS_DATA->spdlogger()->error(
     307           0 :                 "{}() Failed to retrieve input from handle", __func__);
     308           3 :     assert(ret == HG_SUCCESS);
     309           3 :     GKFS_DATA->spdlogger()->debug("{}() Got remove data RPC with path '{}'",
     310           3 :                                   __func__, in.path);
     311             : 
     312             :     // Remove all chunks for that file
     313           3 :     try {
     314           6 :         GKFS_DATA->storage()->destroy_chunk_space(in.path);
     315           3 :         out.err = 0;
     316           0 :     } catch(const gkfs::data::ChunkStorageException& e) {
     317           0 :         GKFS_DATA->spdlogger()->error(
     318             :                 "{}(): path '{}' errcode '{}' message '{}'", __func__, in.path,
     319           0 :                 e.code().value(), e.what());
     320           0 :         out.err = e.code().value();
     321           0 :     } catch(const std::exception& e) {
     322           0 :         GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__,
     323           0 :                                       in.path, e.what());
     324           0 :         out.err = EBUSY;
     325             :     }
     326             : 
     327           3 :     GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
     328           3 :                                   out.err);
     329           3 :     auto hret = margo_respond(handle, &out);
     330           3 :     if(hret != HG_SUCCESS) {
     331           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     332             :     }
     333             :     // Destroy handle when finished
     334           3 :     margo_free_input(handle, &in);
     335           3 :     margo_destroy(handle);
     336           3 :     return HG_SUCCESS;
     337             : }
     338             : 
     339             : /**
     340             :  * @brief Serves a request to update the metadata. This function is UNUSED.
     341             :  * @internal
     342             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     343             :  * placed in the response.
     344             :  * @endinteral
     345             :  * @param handle Mercury RPC handle
     346             :  * @return Mercury error code to Mercury
     347             :  */
     348             : hg_return_t
     349          11 : rpc_srv_update_metadentry(hg_handle_t handle) {
     350             :     // Note: Currently this handler is not called by the client.
     351          11 :     rpc_update_metadentry_in_t in{};
     352          11 :     rpc_err_out_t out{};
     353             : 
     354             : 
     355          11 :     auto ret = margo_get_input(handle, &in);
     356          11 :     if(ret != HG_SUCCESS)
     357           0 :         GKFS_DATA->spdlogger()->error(
     358           0 :                 "{}() Failed to retrieve input from handle", __func__);
     359          11 :     assert(ret == HG_SUCCESS);
     360          11 :     GKFS_DATA->spdlogger()->debug(
     361          11 :             "{}() Got update metadentry RPC with path '{}'", __func__, in.path);
     362             : 
     363             :     // do update
     364          11 :     try {
     365          33 :         gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
     366          11 :         if(in.block_flag == HG_TRUE)
     367          11 :             md.blocks(in.blocks);
     368          11 :         if(in.nlink_flag == HG_TRUE)
     369           0 :             md.link_count(in.nlink);
     370          11 :         if(in.size_flag == HG_TRUE)
     371           6 :             md.size(in.size);
     372          11 :         if(in.atime_flag == HG_TRUE)
     373          10 :             md.atime(in.atime);
     374          11 :         if(in.mtime_flag == HG_TRUE)
     375          10 :             md.mtime(in.mtime);
     376          11 :         if(in.ctime_flag == HG_TRUE)
     377          10 :             md.ctime(in.ctime);
     378          22 :         gkfs::metadata::update(in.path, md);
     379          11 :         out.err = 0;
     380           0 :     } catch(const std::exception& e) {
     381             :         // TODO handle NotFoundException
     382           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__);
     383           0 :         out.err = 1;
     384             :     }
     385             : 
     386          11 :     GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
     387          11 :                                   out.err);
     388          11 :     auto hret = margo_respond(handle, &out);
     389          11 :     if(hret != HG_SUCCESS) {
     390           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     391             :     }
     392             : 
     393             :     // Destroy handle when finished
     394          11 :     margo_free_input(handle, &in);
     395          11 :     margo_destroy(handle);
     396          11 :     return HG_SUCCESS;
     397             : }
     398             : 
     399             : /**
     400             :  * @brief Serves a request to update the file size to a given value in the KV
     401             :  * store.
     402             :  * @internal
     403             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     404             :  * placed in the response.
     405             :  * @endinteral
     406             :  * @param handle Mercury RPC handle
     407             :  * @return Mercury error code to Mercury
     408             :  */
     409             : hg_return_t
     410          41 : rpc_srv_update_metadentry_size(hg_handle_t handle) {
     411          41 :     rpc_update_metadentry_size_in_t in{};
     412          41 :     rpc_update_metadentry_size_out_t out{};
     413             : 
     414          41 :     auto ret = margo_get_input(handle, &in);
     415          41 :     if(ret != HG_SUCCESS)
     416           0 :         GKFS_DATA->spdlogger()->error(
     417           0 :                 "{}() Failed to retrieve input from handle", __func__);
     418          41 :     assert(ret == HG_SUCCESS);
     419          41 :     GKFS_DATA->spdlogger()->debug(
     420             :             "{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__,
     421          41 :             in.path, in.size, in.offset, in.append);
     422             : 
     423          41 :     try {
     424          41 :         out.ret_offset = gkfs::metadata::update_size(
     425          82 :                 in.path, in.size, in.offset, (in.append == HG_TRUE));
     426          41 :         out.err = 0;
     427           0 :     } catch(const gkfs::metadata::NotFoundException& e) {
     428           0 :         GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
     429           0 :                                       in.path);
     430           0 :         out.err = ENOENT;
     431           0 :     } catch(const std::exception& e) {
     432           0 :         GKFS_DATA->spdlogger()->error(
     433             :                 "{}() Failed to update metadentry size on DB: '{}'", __func__,
     434           0 :                 e.what());
     435           0 :         out.err = EBUSY;
     436             :     }
     437             : 
     438          41 :     GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
     439          41 :                                   out.err);
     440          41 :     auto hret = margo_respond(handle, &out);
     441          41 :     if(hret != HG_SUCCESS) {
     442           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     443             :     }
     444             : 
     445             :     // Destroy handle when finished
     446          41 :     margo_free_input(handle, &in);
     447          41 :     margo_destroy(handle);
     448          41 :     return HG_SUCCESS;
     449             : }
     450             : 
     451             : /**
     452             :  * @brief Serves a request to return the current file size.
     453             :  * @internal
     454             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     455             :  * placed in the response.
     456             :  * @endinteral
     457             :  * @param handle Mercury RPC handle
     458             :  * @return Mercury error code to Mercury
     459             :  */
     460             : hg_return_t
     461           5 : rpc_srv_get_metadentry_size(hg_handle_t handle) {
     462           5 :     rpc_path_only_in_t in{};
     463           5 :     rpc_get_metadentry_size_out_t out{};
     464             : 
     465             : 
     466           5 :     auto ret = margo_get_input(handle, &in);
     467           5 :     if(ret != HG_SUCCESS)
     468           0 :         GKFS_DATA->spdlogger()->error(
     469           0 :                 "{}() Failed to retrieve input from handle", __func__);
     470           5 :     assert(ret == HG_SUCCESS);
     471           5 :     GKFS_DATA->spdlogger()->debug(
     472             :             "{}() Got update metadentry size RPC with path '{}'", __func__,
     473           5 :             in.path);
     474             : 
     475             :     // do update
     476           5 :     try {
     477           5 :         out.ret_size = gkfs::metadata::get_size(in.path);
     478           5 :         out.err = 0;
     479           0 :     } catch(const gkfs::metadata::NotFoundException& e) {
     480           0 :         GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
     481           0 :                                       in.path);
     482           0 :         out.err = ENOENT;
     483           0 :     } catch(const std::exception& e) {
     484           0 :         GKFS_DATA->spdlogger()->error(
     485             :                 "{}() Failed to get metadentry size from DB: '{}'", __func__,
     486           0 :                 e.what());
     487           0 :         out.err = EBUSY;
     488             :     }
     489             : 
     490           5 :     GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
     491           5 :                                   out.err);
     492           5 :     auto hret = margo_respond(handle, &out);
     493           5 :     if(hret != HG_SUCCESS) {
     494           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     495             :     }
     496             : 
     497             :     // Destroy handle when finished
     498           5 :     margo_free_input(handle, &in);
     499           5 :     margo_destroy(handle);
     500           5 :     return HG_SUCCESS;
     501             : }
     502             : 
     503             : /**
     504             :  * @brief Serves a request to return all file system objects in a directory.
     505             :  * @internal
     506             :  * This handler triggers a KV store scan starting at the given path prefix that
     507             :  * represents a directory. All KV store entries are returned via a bulk transfer
     508             :  * as it can involve an arbitrary number of entries.
     509             :  *
     510             :  * Note, the bulk buffer size is decided by the client statically although it
     511             :  * doesn't know if it the space is sufficient to accomodate all entries. This is
     512             :  * planned to be fixed in the future.
     513             :  *
     514             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     515             :  * placed in the response.
     516             :  * @endinteral
     517             :  * @param handle Mercury RPC handle
     518             :  * @return Mercury error code to Mercury
     519             :  */
     520             : hg_return_t
     521          25 : rpc_srv_get_dirents(hg_handle_t handle) {
     522          25 :     rpc_get_dirents_in_t in{};
     523          25 :     rpc_get_dirents_out_t out{};
     524          25 :     out.err = EIO;
     525          25 :     out.dirents_size = 0;
     526          25 :     hg_bulk_t bulk_handle = nullptr;
     527             : 
     528             :     // Get input parmeters
     529          25 :     auto ret = margo_get_input(handle, &in);
     530          25 :     if(ret != HG_SUCCESS) {
     531           0 :         GKFS_DATA->spdlogger()->error(
     532             :                 "{}() Could not get RPC input data with err '{}'", __func__,
     533           0 :                 ret);
     534           0 :         out.err = EBUSY;
     535          25 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     536             :     }
     537             : 
     538             :     // Retrieve size of source buffer
     539          25 :     auto hgi = margo_get_info(handle);
     540          25 :     auto mid = margo_hg_info_get_instance(hgi);
     541          25 :     auto bulk_size = margo_bulk_get_size(in.bulk_handle);
     542          25 :     GKFS_DATA->spdlogger()->debug("{}() Got RPC: path '{}' bulk_size '{}' ",
     543          25 :                                   __func__, in.path, bulk_size);
     544             : 
     545             :     // Get directory entries from local DB
     546          25 :     vector<pair<string, bool>> entries{};
     547          25 :     try {
     548          50 :         entries = gkfs::metadata::get_dirents(in.path);
     549           0 :     } catch(const ::exception& e) {
     550           0 :         GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'",
     551           0 :                                       __func__, e.what());
     552           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     553             :     }
     554             : 
     555          25 :     GKFS_DATA->spdlogger()->trace(
     556             :             "{}() path '{}' Read database with '{}' entries", __func__, in.path,
     557          25 :             entries.size());
     558             : 
     559          25 :     if(entries.empty()) {
     560          10 :         out.err = 0;
     561          10 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     562             :     }
     563             : 
     564             :     // Calculate total output size
     565             :     // TODO OPTIMIZATION: this can be calculated inside db_get_dirents
     566          15 :     size_t tot_names_size = 0;
     567        1044 :     for(auto const& e : entries) {
     568        1029 :         tot_names_size += e.first.size();
     569             :     }
     570             : 
     571             :     // tot_names_size (# characters in entry) + # entries * (bool size + char
     572             :     // size for \0 character)
     573          15 :     size_t out_size =
     574          15 :             tot_names_size + entries.size() * (sizeof(bool) + sizeof(char));
     575          15 :     if(bulk_size < out_size) {
     576             :         // Source buffer is smaller than total output size
     577           0 :         GKFS_DATA->spdlogger()->error(
     578             :                 "{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!",
     579           0 :                 __func__, bulk_size, out_size);
     580           0 :         out.err = ENOBUFS;
     581          25 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     582             :     }
     583             : 
     584          15 :     void* bulk_buf; // buffer for bulk transfer
     585             :     // create bulk handle and allocated memory for buffer with out_size
     586             :     // information
     587          15 :     ret = margo_bulk_create(mid, 1, nullptr, &out_size, HG_BULK_READ_ONLY,
     588             :                             &bulk_handle);
     589          15 :     if(ret != HG_SUCCESS) {
     590           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
     591           0 :                                       __func__);
     592           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     593             :     }
     594             :     // access the internally allocated memory buffer and put it into bulk_buf
     595          15 :     uint32_t actual_count; // number of segments. we use one here because we
     596             :                            // push the whole buffer at once
     597          15 :     ret = margo_bulk_access(bulk_handle, 0, out_size, HG_BULK_READ_ONLY, 1,
     598             :                             &bulk_buf, &out_size, &actual_count);
     599          15 :     if(ret != HG_SUCCESS || actual_count != 1) {
     600           0 :         GKFS_DATA->spdlogger()->error(
     601             :                 "{}() Failed to access allocated buffer from bulk handle",
     602           0 :                 __func__);
     603           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     604             :     }
     605             : 
     606          15 :     GKFS_DATA->spdlogger()->trace(
     607             :             "{}() path '{}' entries '{}' out_size '{}'. Set up local read only bulk handle and allocated buffer with size '{}'",
     608          15 :             __func__, in.path, entries.size(), out_size, out_size);
     609             : 
     610             :     // Serialize output data on local buffer
     611          15 :     auto out_buff_ptr = static_cast<char*>(bulk_buf);
     612          15 :     auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
     613          15 :     auto names_ptr = out_buff_ptr + entries.size();
     614             : 
     615        1044 :     for(auto const& e : entries) {
     616        1029 :         if(e.first.empty()) {
     617           0 :             GKFS_DATA->spdlogger()->warn(
     618             :                     "{}() Entry in readdir() empty. If this shows up, something else is very wrong.",
     619           0 :                     __func__);
     620             :         }
     621        1029 :         *bool_ptr = e.second;
     622        1029 :         bool_ptr++;
     623             : 
     624        1029 :         const auto name = e.first.c_str();
     625        1029 :         ::strcpy(names_ptr, name);
     626             :         // number of characters + \0 terminator
     627        1029 :         names_ptr += e.first.size() + 1;
     628             :     }
     629             : 
     630          15 :     GKFS_DATA->spdlogger()->trace(
     631             :             "{}() path '{}' entries '{}' out_size '{}'. Copied data to bulk_buffer. NEXT bulk_transfer",
     632          15 :             __func__, in.path, entries.size(), out_size);
     633             : 
     634          15 :     ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0,
     635             :                               bulk_handle, 0, out_size);
     636          15 :     if(ret != HG_SUCCESS) {
     637           0 :         GKFS_DATA->spdlogger()->error(
     638             :                 "{}() Failed to push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'",
     639           0 :                 __func__, entries.size(), in.path, bulk_size, out_size);
     640           0 :         out.err = EBUSY;
     641           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     642             :     }
     643             : 
     644          15 :     out.dirents_size = entries.size();
     645          15 :     out.err = 0;
     646          15 :     GKFS_DATA->spdlogger()->debug(
     647             :             "{}() Sending output response err '{}' dirents_size '{}'. DONE",
     648          15 :             __func__, out.err, out.dirents_size);
     649          15 :     if(GKFS_DATA->enable_stats()) {
     650          15 :         GKFS_DATA->stats()->add_value_iops(
     651             :                 gkfs::utils::Stats::IopsOp::iops_dirent);
     652             :     }
     653          15 :     return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     654             : }
     655             : 
     656             : /* Sends the name-size-ctime of a specific directory
     657             :  * Used to accelerate find
     658             :  * It mimics get_dirents, but uses a tuple
     659             :  */
     660             : 
     661             : /**
     662             :  * @brief Serves a request to return all file system objects in a directory
     663             :  * including their size and create timestamp.
     664             :  * @internal
     665             :  * This is an extension to the above rpc_srv_get_dirents. However, this handler
     666             :  * is an optimization which needs to be refactored and merged with with
     667             :  * rpc_srv_get_dirents due to redundant code (TODO).
     668             :  *
     669             :  * Note, the bulk buffer size is decided by the client statically although it
     670             :  * doesn't know if it the space is sufficient to accommodate all entries. This
     671             :  * is planned to be fixed in the future (TODO).
     672             :  *
     673             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     674             :  * placed in the response.
     675             :  * @endinteral
     676             :  * @param handle Mercury RPC handle
     677             :  * @return Mercury error code to Mercury
     678             :  */
     679             : hg_return_t
     680           4 : rpc_srv_get_dirents_extended(hg_handle_t handle) {
     681           4 :     rpc_get_dirents_in_t in{};
     682           4 :     rpc_get_dirents_out_t out{};
     683           4 :     out.err = EIO;
     684           4 :     out.dirents_size = 0;
     685           4 :     hg_bulk_t bulk_handle = nullptr;
     686             : 
     687             :     // Get input parmeters
     688           4 :     auto ret = margo_get_input(handle, &in);
     689           4 :     if(ret != HG_SUCCESS) {
     690           0 :         GKFS_DATA->spdlogger()->error(
     691             :                 "{}() Could not get RPC input data with err '{}'", __func__,
     692           0 :                 ret);
     693           0 :         out.err = EBUSY;
     694           4 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     695             :     }
     696             : 
     697             :     // Retrieve size of source buffer
     698           4 :     auto hgi = margo_get_info(handle);
     699           4 :     auto mid = margo_hg_info_get_instance(hgi);
     700           4 :     auto bulk_size = margo_bulk_get_size(in.bulk_handle);
     701           4 :     GKFS_DATA->spdlogger()->debug("{}() Got RPC: path '{}' bulk_size '{}' ",
     702           4 :                                   __func__, in.path, bulk_size);
     703             : 
     704             :     // Get directory entries from local DB
     705           4 :     vector<tuple<string, bool, size_t, time_t>> entries{};
     706           4 :     try {
     707           8 :         entries = gkfs::metadata::get_dirents_extended(in.path);
     708           0 :     } catch(const ::exception& e) {
     709           0 :         GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'",
     710           0 :                                       __func__, e.what());
     711           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     712             :     }
     713             : 
     714           4 :     GKFS_DATA->spdlogger()->trace(
     715             :             "{}() path '{}' Read database with '{}' entries", __func__, in.path,
     716           4 :             entries.size());
     717             : 
     718           4 :     if(entries.empty()) {
     719           2 :         out.err = 0;
     720           2 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     721             :     }
     722             : 
     723             :     // Calculate total output size
     724             :     // TODO OPTIMIZATION: this can be calculated inside db_get_dirents
     725           2 :     size_t tot_names_size = 0;
     726           6 :     for(auto const& e : entries) {
     727           4 :         tot_names_size += (get<0>(e)).size();
     728             :     }
     729             : 
     730             :     // tot_names_size (# characters in entry) + # entries * (bool size + char
     731             :     // size for \0 character)
     732           2 :     size_t out_size =
     733           2 :             tot_names_size + entries.size() * (sizeof(bool) + sizeof(char) +
     734           2 :                                                sizeof(size_t) + sizeof(time_t));
     735           2 :     if(bulk_size < out_size) {
     736             :         // Source buffer is smaller than total output size
     737           0 :         GKFS_DATA->spdlogger()->error(
     738             :                 "{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!",
     739           0 :                 __func__, bulk_size, out_size);
     740           0 :         out.err = ENOBUFS;
     741           4 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     742             :     }
     743             : 
     744           2 :     void* bulk_buf; // buffer for bulk transfer
     745             :     // create bulk handle and allocated memory for buffer with out_size
     746             :     // information
     747           2 :     ret = margo_bulk_create(mid, 1, nullptr, &out_size, HG_BULK_READ_ONLY,
     748             :                             &bulk_handle);
     749           2 :     if(ret != HG_SUCCESS) {
     750           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
     751           0 :                                       __func__);
     752           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     753             :     }
     754             :     // access the internally allocated memory buffer and put it into bulk_buf
     755           2 :     uint32_t actual_count; // number of segments. we use one here because we
     756             :                            // push the whole buffer at once
     757           2 :     ret = margo_bulk_access(bulk_handle, 0, out_size, HG_BULK_READ_ONLY, 1,
     758             :                             &bulk_buf, &out_size, &actual_count);
     759           2 :     if(ret != HG_SUCCESS || actual_count != 1) {
     760           0 :         GKFS_DATA->spdlogger()->error(
     761             :                 "{}() Failed to access allocated buffer from bulk handle",
     762           0 :                 __func__);
     763           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     764             :     }
     765             : 
     766           2 :     GKFS_DATA->spdlogger()->trace(
     767             :             "{}() path '{}' entries '{}' out_size '{}'. Set up local read only bulk handle and allocated buffer with size '{}'",
     768           2 :             __func__, in.path, entries.size(), out_size, out_size);
     769             : 
     770             :     // Serialize output data on local buffer
     771             :     // The parenthesis are extremely important, if not the + will be size_t or
     772             :     // time_t size and not char
     773           2 :     auto out_buff_ptr = static_cast<char*>(bulk_buf);
     774           2 :     auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
     775           2 :     auto size_ptr = reinterpret_cast<size_t*>((out_buff_ptr) +
     776           2 :                                               (entries.size() * sizeof(bool)));
     777           2 :     auto ctime_ptr = reinterpret_cast<time_t*>(
     778             :             (out_buff_ptr) +
     779           2 :             (entries.size() * (sizeof(bool) + sizeof(size_t))));
     780           2 :     auto names_ptr =
     781             :             out_buff_ptr +
     782           2 :             (entries.size() * (sizeof(bool) + sizeof(size_t) + sizeof(time_t)));
     783             : 
     784           6 :     for(auto const& e : entries) {
     785           4 :         if((get<0>(e)).empty()) {
     786           0 :             GKFS_DATA->spdlogger()->warn(
     787             :                     "{}() Entry in readdir() empty. If this shows up, something else is very wrong.",
     788           0 :                     __func__);
     789             :         }
     790           4 :         *bool_ptr = (get<1>(e));
     791           4 :         bool_ptr++;
     792             : 
     793           4 :         *size_ptr = (get<2>(e));
     794           4 :         size_ptr++;
     795             : 
     796           4 :         *ctime_ptr = (get<3>(e));
     797           4 :         ctime_ptr++;
     798             : 
     799           4 :         const auto name = (get<0>(e)).c_str();
     800           4 :         ::strcpy(names_ptr, name);
     801             :         // number of characters + \0 terminator
     802           4 :         names_ptr += ((get<0>(e)).size() + 1);
     803             :     }
     804             : 
     805           2 :     GKFS_DATA->spdlogger()->trace(
     806             :             "{}() path '{}' entries '{}' out_size '{}'. Copied data to bulk_buffer. NEXT bulk_transfer",
     807           2 :             __func__, in.path, entries.size(), out_size);
     808           2 :     ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0,
     809             :                               bulk_handle, 0, out_size);
     810           2 :     if(ret != HG_SUCCESS) {
     811           0 :         GKFS_DATA->spdlogger()->error(
     812             :                 "{}() Failed to push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'",
     813           0 :                 __func__, entries.size(), in.path, bulk_size, out_size);
     814           0 :         out.err = EBUSY;
     815           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     816             :     }
     817             : 
     818           2 :     out.dirents_size = entries.size();
     819           2 :     out.err = 0;
     820           2 :     GKFS_DATA->spdlogger()->debug(
     821             :             "{}() Sending output response err '{}' dirents_size '{}'. DONE",
     822           2 :             __func__, out.err, out.dirents_size);
     823           2 :     return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     824             : }
     825             : 
     826             : #if defined(HAS_SYMLINKS) || defined(HAS_RENAME)
     827             : /**
     828             :  * @brief Serves a request create a symbolic link and supports rename
     829             :  * @internal
     830             :  * The state of this function is unclear and requires a complete refactor.
     831             :  *
     832             :  * All exceptions must be caught here and dealt with accordingly. Any errors are
     833             :  * placed in the response.
     834             :  * @endinteral
     835             :  * @param handle Mercury RPC handle
     836             :  * @return Mercury error code to Mercury
     837             :  */
     838             : hg_return_t
     839          20 : rpc_srv_mk_symlink(hg_handle_t handle) {
     840          20 :     rpc_mk_symlink_in_t in{};
     841          20 :     rpc_err_out_t out{};
     842             : 
     843          20 :     auto ret = margo_get_input(handle, &in);
     844          20 :     if(ret != HG_SUCCESS) {
     845           0 :         GKFS_DATA->spdlogger()->error(
     846           0 :                 "{}() Failed to retrieve input from handle", __func__);
     847             :     }
     848          20 :     GKFS_DATA->spdlogger()->debug(
     849             :             "{}() Got RPC with path '{}' and target path '{}'", __func__,
     850          20 :             in.path, in.target_path);
     851             :     // do update
     852          20 :     try {
     853          60 :         gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
     854             : #ifdef HAS_RENAME
     855          20 :         if(md.blocks() == -1) {
     856             :             // We need to fill the rename path as this is an inverse path
     857             :             // old -> new
     858          20 :             md.rename_path(in.target_path);
     859             :         } else {
     860             : #endif // HAS_RENAME
     861          20 :             md.target_path(in.target_path);
     862             : #ifdef HAS_RENAME
     863             :         }
     864             : #endif // HAS_RENAME
     865          40 :         GKFS_DATA->spdlogger()->debug(
     866             :                 "{}() Updating path '{}' with metadata '{}'", __func__, in.path,
     867          20 :                 md.serialize());
     868          40 :         gkfs::metadata::update(in.path, md);
     869          20 :         out.err = 0;
     870           0 :     } catch(const std::exception& e) {
     871             :         // TODO handle NotFoundException
     872           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__);
     873           0 :         out.err = 1;
     874             :     }
     875             : 
     876          20 :     GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
     877          20 :                                   out.err);
     878          20 :     auto hret = margo_respond(handle, &out);
     879          20 :     if(hret != HG_SUCCESS) {
     880           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
     881             :     }
     882             : 
     883             :     // Destroy handle when finished
     884          20 :     margo_free_input(handle, &in);
     885          20 :     margo_destroy(handle);
     886          20 :     return HG_SUCCESS;
     887             : }
     888             : 
     889             : #endif // HAS_SYMLINKS || HAS_RENAME
     890             : 
     891             : } // namespace
     892             : 
     893        2134 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_create)
     894             : 
     895        2718 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_stat)
     896             : 
     897           6 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_decr_size)
     898             : 
     899          16 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove_metadata)
     900             : 
     901           6 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove_data)
     902             : 
     903          22 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry)
     904             : 
     905          82 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry_size)
     906             : 
     907          10 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_metadentry_size)
     908             : 
     909          50 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents)
     910             : 
     911           8 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents_extended)
     912             : #ifdef HAS_SYMLINKS
     913             : 
     914          40 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink)
     915             : 
     916             : #endif

Generated by: LCOV version 1.16