diff --git a/CHANGELOG.md b/CHANGELOG.md index e669d9123633daa55215dd1be0e54ecd928c1eb7..df0b9bc3d3738e2247e16371d80683618e5a49d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] ### New +- Added file system expansion support ([!196](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/196)). + - Added the tool `gkfs_malleability` to steer start, status, and finalize requests for expansion operations. + - `-DGKFS_BUILD_TOOLS=ON` must be set for CMake to build the tool. + - Overhauled the `gkfs` run script to accommodate the new tool. + - During expansion, redistribution of data is performed by the daemons. Therefore, an RPC client for daemons was added. + - See Readme for usage details. - Propagate PKG_CONFIG_PATH to dependency scripts ([!185](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/185)). - Added syscall support for listxattr family ([!186](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_request/186)). - Remove optimization, removing one RPC per operation ([!195](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_request/195)). diff --git a/README.md b/README.md index e57f23483ca2cb93d679a14d9d16de45ff704075..3c6004aefcb2073ee54df67acf31d8bce826c81a 100644 --- a/README.md +++ b/README.md @@ -159,7 +159,7 @@ to be empty. For MPI application, the `LD_PRELOAD` variable can be passed with the `-x` argument for `mpirun/mpiexec`. -## Run GekkoFS daemons on multiple nodes (beta version!) +## Run GekkoFS daemons on multiple nodes The `scripts/run/gkfs` script can be used to simplify starting the GekkoFS daemon on one or multiple nodes. To start GekkoFS on multiple nodes, a Slurm environment that can execute `srun` is required. Users can further @@ -168,9 +168,9 @@ modify `scripts/run/gkfs.conf` to mold default configurations to their environme The following options are available for `scripts/run/gkfs`: ```bash -usage: gkfs [-h/--help] [-r/--rootdir ] [-m/--mountdir ] [-a/--args ] [-f/--foreground ] - [--srun ] [-n/--numnodes ] [--cpuspertask <64>] [--numactl ] [-v/--verbose ] - {start,stop} +usage: gkfs [-h/--help] [-r/--rootdir ] [-m/--mountdir ] [-a/--args ] [--proxy ] [-f/--foreground ] + [--srun ] [-n/--numnodes ] [--cpuspertask <64>] [-v/--verbose ] + {start,expand,stop} This script simplifies the starting and stopping GekkoFS daemons. If daemons are started on multiple nodes, @@ -178,21 +178,23 @@ usage: gkfs [-h/--help] [-r/--rootdir ] [-m/--mountdir ] [-a/--args additional permanent configurations can be set. positional arguments: - command Command to execute: 'start' and 'stop' + COMMAND Command to execute: 'start', 'stop', 'expand' optional arguments: -h, --help Shows this help message and exits - -r, --rootdir Providing the rootdir path for GekkoFS daemons. - -m, --mountdir Providing the mountdir path for GekkoFS daemons. - -a, --args + -r, --rootdir The rootdir path for GekkoFS daemons. + -m, --mountdir The mountdir path for GekkoFS daemons. + -d, --daemon_args + --proxy Start proxy after the daemons are running. Add various additional daemon arguments, e.g., "-l ib0 -P ofi+psm2". + -p, --proxy_args -f, --foreground Starts the script in the foreground. Daemons are stopped by pressing 'q'. --srun Use srun to start daemons on multiple nodes. -n, --numnodes GekkoFS daemons are started on n nodes. Nodelist is extracted from Slurm via the SLURM_JOB_ID env variable. --cpuspertask <#cores> Set the number of cores the daemons can use. Must use '--srun'. - --numactl Use numactl for the daemon. Modify gkfs.conf for further numactl configurations. -c, --config Path to configuration file. By defaults looks for a 'gkfs.conf' in this directory. + -e, --expand_hostfile Path to the hostfile with new nodes where GekkoFS should be extended to (hostfile contains one line per node). -v, --verbose Increase verbosity ``` @@ -415,6 +417,58 @@ Press 'q' to exit Please consult `include/config.hpp` for additional configuration options. Note, GekkoFS proxy does not support replication. +### File system expansion + +GekkoFS supports extending the current daemon configuration to additional compute nodes. This includes redistribution of +the existing data and metadata and therefore scales file system performance and capacity of existing data. Note, +that it is the user's responsibility to not access the GekkoFS file system during redistribution. A corresponding +feature that is transparent to the user is planned. Note also, if the GekkoFS proxy is used, they need to be manually +restarted, after expansion. + +To enable this feature, the following CMake compilation flags are required to build the `gkfs_malleability` tool: +`-DGKFS_BUILD_TOOLS=ON`. The `gkfs_malleability` tool is then available in the `build/tools` directory. Please consult +`-h` for its arguments. While the tool can be used manually to expand the file system, the `scripts/run/gkfs` script +should be used instead which invokes the `gkfs_malleability` tool. + +The only requirement for extending the file system is a hostfile containing the hostnames/IPs of the new nodes (one line +per host). Example starting the file system. The `DAEMON_NODELIST` in the `gkfs.conf` is set to a hostfile containing +the initial set of file system nodes.: + +```bash +~/gekkofs/scripts/run/gkfs -c ~/run/gkfs_verbs_expandtest.conf start +* [gkfs] Starting GekkoFS daemons (4 nodes) ... +* [gkfs] GekkoFS daemons running +* [gkfs] Startup time: 10.853 seconds +``` + +... Some computation ... + +Expanding the file system. Using `-e ` to specify the new nodes. Redistribution is done automatically with a +progress bar. When finished, the file system is ready to use in the new configuration: + +```bash +~/gekkofs/scripts/run/gkfs -c ~/run/gkfs_verbs_expandtest.conf -e ~/hostfile_expand expand +* [gkfs] Starting GekkoFS daemons (8 nodes) ... +* [gkfs] GekkoFS daemons running +* [gkfs] Startup time: 1.058 seconds +Expansion process from 4 nodes to 12 nodes launched... +* [gkfs] Expansion progress: +[####################] 0/4 left +* [gkfs] Redistribution process done. Finalizing ... +* [gkfs] Expansion done. +``` + +Stop the file system: + +```bash +~/gekkofs/scripts/run/gkfs -c ~/run/gkfs_verbs_expandtest.conf stop +* [gkfs] Stopping daemon with pid 16462 +srun: sending Ctrl-C to StepId=282378.1 +* [gkfs] Stopping daemon with pid 16761 +srun: sending Ctrl-C to StepId=282378.2 +* [gkfs] Shutdown time: 1.032 seconds +``` + ## Acknowledgment This software was partially supported by the EC H2020 funded NEXTGenIO project (Project ID: 671951, www.nextgenio.eu). diff --git a/include/client/CMakeLists.txt b/include/client/CMakeLists.txt index 6815fc95321a27ecce55b407d10075cb53cad2ca..e4ce8983c2f1d91a77ea76ed9523b74f35696e32 100644 --- a/include/client/CMakeLists.txt +++ b/include/client/CMakeLists.txt @@ -72,6 +72,7 @@ target_sources( rpc/forward_management.hpp rpc/forward_metadata.hpp rpc/forward_data.hpp + rpc/forward_malleability.hpp syscalls/args.hpp syscalls/decoder.hpp syscalls/errno.hpp diff --git a/include/client/rpc/forward_malleability.hpp b/include/client/rpc/forward_malleability.hpp new file mode 100644 index 0000000000000000000000000000000000000000..770a8a5f112c4c53c6b993ea02f9e6370bc3331c --- /dev/null +++ b/include/client/rpc/forward_malleability.hpp @@ -0,0 +1,45 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS' POSIX interface. + + GekkoFS' POSIX interface is free software: you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GekkoFS' POSIX interface is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with GekkoFS' POSIX interface. If not, see + . + + SPDX-License-Identifier: LGPL-3.0-or-later +*/ + +#ifndef GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP +#define GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP + +namespace gkfs::malleable::rpc { + +int +forward_expand_start(int old_server_conf, int new_server_conf); + +int +forward_expand_status(); + +int +forward_expand_finalize(); +} // namespace gkfs::malleable::rpc + +#endif // GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index 485619e60ea432d64baa4e7d2b79e126c3c34b73..3bf7ca3dbd3bea808d48533e07d14701be8bc461 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -63,7 +63,9 @@ hg_proc_void_t(hg_proc_t proc, void* data) { } // namespace hermes::detail -namespace gkfs::rpc { +namespace gkfs { + +namespace rpc { //============================================================================== // definitions for fs_config @@ -3693,8 +3695,330 @@ struct get_dirents_extended_proxy { size_t m_dirents_size; }; }; +} // namespace rpc +namespace malleable::rpc { + +//============================================================================== +// definitions for expand_start +struct expand_start { + + // forward declarations of public input/output types for this RPC + class input; + + class output; + + // traits used so that the engine knows what to do with the RPC + using self_type = expand_start; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = rpc_expand_start_in_t; + using mercury_output_type = rpc_err_out_t; + + // RPC public identifier + // (N.B: we reuse the same IDs assigned by Margo so that the daemon + // understands Hermes RPCs) + constexpr static const uint64_t public_id = 50; + + // RPC internal Mercury identifier + constexpr static const hg_id_t mercury_id = 0; + + // RPC name + constexpr static const auto name = gkfs::malleable::rpc::tag::expand_start; + + // requires response? + constexpr static const auto requires_response = true; + + // Mercury callback to serialize input arguments + constexpr static const auto mercury_in_proc_cb = + HG_GEN_PROC_NAME(rpc_expand_start_in_t); + + // Mercury callback to serialize output arguments + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_err_out_t); + + class input { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input(const uint32_t old_server_conf, uint32_t new_server_conf) + : m_old_server_conf(old_server_conf), + m_new_server_conf(new_server_conf) {} + + input(input&& rhs) = default; + + input(const input& other) = default; + + input& + operator=(input&& rhs) = default; + + input& + operator=(const input& other) = default; + + uint32_t + old_server_conf() const { + return m_old_server_conf; + } + + uint32_t + new_server_conf() const { + return m_new_server_conf; + } + + explicit input(const rpc_expand_start_in_t& other) + : m_old_server_conf(other.old_server_conf), + m_new_server_conf(other.new_server_conf) {} + + explicit operator rpc_expand_start_in_t() { + return {m_old_server_conf, m_new_server_conf}; + } + + private: + uint32_t m_old_server_conf; + uint32_t m_new_server_conf; + }; + + class output { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err() {} + + output(int32_t err) : m_err(err) {} + + output(output&& rhs) = default; + + output(const output& other) = default; + + output& + operator=(output&& rhs) = default; + + output& + operator=(const output& other) = default; + + explicit output(const rpc_err_out_t& out) { + m_err = out.err; + } + + int32_t + err() const { + return m_err; + } + + private: + int32_t m_err; + }; +}; + +//============================================================================== +// definitions for expand_status +struct expand_status { + + // forward declarations of public input/output types for this RPC + class input; + + class output; + + // traits used so that the engine knows what to do with the RPC + using self_type = expand_status; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = hermes::detail::hg_void_t; + using mercury_output_type = rpc_err_out_t; + + // RPC public identifier + // (N.B: we reuse the same IDs assigned by Margo so that the daemon + // understands Hermes RPCs) + constexpr static const uint64_t public_id = 51; + + // RPC internal Mercury identifier + constexpr static const hg_id_t mercury_id = 0; + + // RPC name + constexpr static const auto name = gkfs::malleable::rpc::tag::expand_status; + + // requires response? + constexpr static const auto requires_response = true; + + // Mercury callback to serialize input arguments + constexpr static const auto mercury_in_proc_cb = + hermes::detail::hg_proc_void_t; + + // Mercury callback to serialize output arguments + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_err_out_t); + + class input { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input() {} + + input(input&& rhs) = default; + + input(const input& other) = default; + + input& + operator=(input&& rhs) = default; + + input& + operator=(const input& other) = default; + + explicit input(const hermes::detail::hg_void_t& other) {} + + explicit operator hermes::detail::hg_void_t() { + return {}; + } + }; + + class output { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err() {} + + output(int32_t err) : m_err(err) {} + + output(output&& rhs) = default; + + output(const output& other) = default; + + output& + operator=(output&& rhs) = default; + + output& + operator=(const output& other) = default; + + explicit output(const rpc_err_out_t& out) { + m_err = out.err; + } + + int32_t + err() const { + return m_err; + } + + private: + int32_t m_err; + }; +}; + +//============================================================================== +// definitions for expand_finalize +struct expand_finalize { + + // forward declarations of public input/output types for this RPC + class input; + + class output; + + // traits used so that the engine knows what to do with the RPC + using self_type = expand_finalize; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = hermes::detail::hg_void_t; + using mercury_output_type = rpc_err_out_t; + + // RPC public identifier + // (N.B: we reuse the same IDs assigned by Margo so that the daemon + // understands Hermes RPCs) + constexpr static const uint64_t public_id = 52; + + // RPC internal Mercury identifier + constexpr static const hg_id_t mercury_id = 0; + + // RPC name + constexpr static const auto name = + gkfs::malleable::rpc::tag::expand_finalize; + + // requires response? + constexpr static const auto requires_response = true; + + // Mercury callback to serialize input arguments + constexpr static const auto mercury_in_proc_cb = + hermes::detail::hg_proc_void_t; + + // Mercury callback to serialize output arguments + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_err_out_t); + + class input { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input() {} + + input(input&& rhs) = default; + + input(const input& other) = default; + + input& + operator=(input&& rhs) = default; + + input& + operator=(const input& other) = default; + + explicit input(const hermes::detail::hg_void_t& other) {} + + explicit operator hermes::detail::hg_void_t() { + return {}; + } + }; + + class output { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err() {} + + output(int32_t err) : m_err(err) {} + + output(output&& rhs) = default; + + output(const output& other) = default; + + output& + operator=(output&& rhs) = default; + + output& + operator=(const output& other) = default; + + explicit output(const rpc_err_out_t& out) { + m_err = out.err; + } + + int32_t + err() const { + return m_err; + } + + private: + int32_t m_err; + }; +}; -} // namespace gkfs::rpc +} // namespace malleable::rpc +} // namespace gkfs #endif // GKFS_RPCS_TYPES_HPP diff --git a/include/client/user_functions.hpp b/include/client/user_functions.hpp index cc87b386e1f43d285d05d16ad3935a107aa9f450..5f96ab3c1927dfb5ffcd3233ccccf64ec3994e05 100644 --- a/include/client/user_functions.hpp +++ b/include/client/user_functions.hpp @@ -40,7 +40,8 @@ extern "C" { struct linux_dirent64; -namespace gkfs::syscall { +namespace gkfs { +namespace syscall { int gkfs_open(const std::string& path, mode_t mode, int flags); @@ -77,7 +78,34 @@ gkfs_remove(const std::string& path); std::vector gkfs_get_file_list(const std::string& path); -} // namespace gkfs::syscall +} // namespace syscall +namespace malleable { + +/** + * @brief Start an expansion of the file system + * @param old_server_conf old number of nodes + * @param new_server_conf new number of nodes + * @return error code + */ +int +expand_start(int old_server_conf, int new_server_conf); + +/** + * @brief Check for the current status of the expansion process + * @return 0 when finished, positive numbers indicate how many daemons + * are still redistributing data + */ +int +expand_status(); + +/** + * @brief Finalize the expansion process + * @return error code + */ +int +expand_finalize(); +} // namespace malleable +} // namespace gkfs extern "C" int diff --git a/include/common/common_defs.hpp b/include/common/common_defs.hpp index 206ab7de192c4cf2741d0da16a63184d4bd0f1bb..58a67f17cd5074d1eb8c0900509a9c4f92ec2d09 100644 --- a/include/common/common_defs.hpp +++ b/include/common/common_defs.hpp @@ -29,10 +29,15 @@ #ifndef GEKKOFS_COMMON_DEFS_HPP #define GEKKOFS_COMMON_DEFS_HPP +namespace gkfs { +namespace client { +// This must be equivalent to the line set in the gkfs script +constexpr auto hostsfile_end_str = "#FS_INSTANCE_END"; + +} // namespace client +namespace rpc { // These constexpr set the RPC's identity and which handler the receiver end // should use -namespace gkfs::rpc { - using chnk_id_t = unsigned long; struct ChunkStat { unsigned long chunk_size; @@ -40,7 +45,6 @@ struct ChunkStat { unsigned long chunk_free; }; - namespace tag { constexpr auto fs_config = "rpc_srv_fs_config"; @@ -78,6 +82,7 @@ constexpr auto client_proxy_get_dirents_extended = // Specific RPCs between daemon and proxy constexpr auto proxy_daemon_write = "proxy_daemon_rpc_srv_write_data"; constexpr auto proxy_daemon_read = "proxy_daemon_rpc_srv_read_data"; + } // namespace tag namespace protocol { @@ -102,11 +107,19 @@ constexpr auto all_remote_protocols = {ofi_sockets, ofi_tcp, ofi_verbs, ucx_rc, ucx_ud}; #pragma GCC diagnostic pop } // namespace protocol -} // namespace gkfs::rpc +} // namespace rpc + +namespace malleable::rpc::tag { +constexpr auto expand_start = "rpc_srv_expand_start"; +constexpr auto expand_status = "rpc_srv_expand_status"; +constexpr auto expand_finalize = "rpc_srv_expand_finalize"; +// migrate data uses the write rpc +constexpr auto migrate_metadata = "rpc_srv_migrate_metadata"; +} // namespace malleable::rpc::tag -namespace gkfs::config::syscall::stat { +namespace config::syscall::stat { // Number 512-byte blocks allocated as it is in the linux kernel (struct_stat.h) constexpr auto st_nblocksize = 512; -} // namespace gkfs::config::syscall::stat - +} // namespace config::syscall::stat +} // namespace gkfs #endif // GEKKOFS_COMMON_DEFS_HPP diff --git a/include/common/rpc/distributor.hpp b/include/common/rpc/distributor.hpp index 68ac5ded0e5b0461ada89505673d4958e6836c3f..eb8bca77fd32e9855668278824159b5061c62ca6 100644 --- a/include/common/rpc/distributor.hpp +++ b/include/common/rpc/distributor.hpp @@ -56,6 +56,9 @@ public: virtual unsigned int hosts_size() const = 0; + virtual void + hosts_size(unsigned int size) = 0; + virtual host_t locate_data(const std::string& path, const chunkid_t& chnk_id, unsigned int hosts_size, const int num_copy) = 0; @@ -64,7 +67,7 @@ public: locate_file_metadata(const std::string& path, const int num_copy) const = 0; virtual std::vector - locate_directory_metadata(const std::string& path) const = 0; + locate_directory_metadata() const = 0; }; @@ -83,6 +86,9 @@ public: unsigned int hosts_size() const override; + void + hosts_size(unsigned int size) override; + host_t localhost() const override; @@ -99,7 +105,7 @@ public: const int num_copy) const override; std::vector - locate_directory_metadata(const std::string& path) const override; + locate_directory_metadata() const override; }; class LocalOnlyDistributor : public Distributor { @@ -116,6 +122,9 @@ public: unsigned int hosts_size() const override; + void + hosts_size(unsigned int size) override; + host_t locate_data(const std::string& path, const chunkid_t& chnk_id, const int num_copy) const override; @@ -125,7 +134,7 @@ public: const int num_copy) const override; std::vector - locate_directory_metadata(const std::string& path) const override; + locate_directory_metadata() const override; }; class ForwarderDistributor : public Distributor { @@ -144,6 +153,9 @@ public: unsigned int hosts_size() const override; + void + hosts_size(unsigned int size) override; + host_t locate_data(const std::string& path, const chunkid_t& chnk_id, const int num_copy) const override final; @@ -157,7 +169,7 @@ public: const int num_copy) const override; std::vector - locate_directory_metadata(const std::string& path) const override; + locate_directory_metadata() const override; }; /* @@ -197,6 +209,9 @@ public: unsigned int hosts_size() const override; + void + hosts_size(unsigned int size) override; + host_t locate_data(const std::string& path, const chunkid_t& chnk_id, const int num_copy) const override; @@ -210,7 +225,7 @@ public: const int num_copy) const override; std::vector - locate_directory_metadata(const std::string& path) const override; + locate_directory_metadata() const override; }; } // namespace gkfs::rpc diff --git a/include/common/rpc/rpc_types.hpp b/include/common/rpc/rpc_types.hpp index 40fc200057d679ee837c72debf6ad053333050fb..d86015be51fa5e4ce08a3297d7107507a12cc0d1 100644 --- a/include/common/rpc/rpc_types.hpp +++ b/include/common/rpc/rpc_types.hpp @@ -164,5 +164,14 @@ MERCURY_GEN_PROC(rpc_proxy_get_dirents_in_t, ((hg_const_string_t) (path))((int32_t) (server))( (hg_bulk_t) (bulk_handle))) +// malleability client <-> daemon + +MERCURY_GEN_PROC(rpc_expand_start_in_t, + ((uint32_t) (old_server_conf))((uint32_t) (new_server_conf))) + +// malleability daemon <-> daemon + +MERCURY_GEN_PROC(rpc_migrate_metadata_in_t, + ((hg_const_string_t) (key))((hg_const_string_t) (value))) #endif // LFS_RPC_TYPES_HPP diff --git a/include/daemon/CMakeLists.txt b/include/daemon/CMakeLists.txt index 26466ba16138af272d680b78b9f081a4f516c454..977c51cbd407bf48fdd7e70b48770e53bc416e93 100644 --- a/include/daemon/CMakeLists.txt +++ b/include/daemon/CMakeLists.txt @@ -36,6 +36,8 @@ target_sources( classes/rpc_data.hpp handler/rpc_defs.hpp handler/rpc_util.hpp + malleability/malleable_manager.hpp + malleability/rpc/forward_redistribution.hpp ) if(GKFS_ENABLE_AGIOS) diff --git a/include/daemon/backend/data/chunk_storage.hpp b/include/daemon/backend/data/chunk_storage.hpp index 14a4cf31edbc62d8f2083dfdcb930bd326af59fe..ce97c3a423b0cace0b0f26dc81f72124b9920749 100644 --- a/include/daemon/backend/data/chunk_storage.hpp +++ b/include/daemon/backend/data/chunk_storage.hpp @@ -39,6 +39,7 @@ #include #include #include +#include /* Forward declarations */ namespace spdlog { @@ -186,6 +187,12 @@ public: */ [[nodiscard]] ChunkStat chunk_stat() const; + + std::filesystem::recursive_directory_iterator + get_all_chunk_files(); + + std::string + get_chunk_directory(); }; } // namespace gkfs::data diff --git a/include/daemon/backend/metadata/db.hpp b/include/daemon/backend/metadata/db.hpp index f1f629e0a8f2f080615f9ff911de0d1d8999e548..00131cb2cf4a89b7df7bd08d3a0d7d451a4cca43 100644 --- a/include/daemon/backend/metadata/db.hpp +++ b/include/daemon/backend/metadata/db.hpp @@ -164,8 +164,14 @@ public: * @brief Iterate over complete database, note ONLY used for debugging and * is therefore unused. */ - void + void* iterate_all() const; + + /** + * @brief Returns an estimated db size, i.e., number of KV pairs + */ + uint64_t + db_size() const; }; } // namespace gkfs::metadata diff --git a/include/daemon/backend/metadata/metadata_backend.hpp b/include/daemon/backend/metadata/metadata_backend.hpp index 84f7a8f41b8aec319c1c2d7fd159f4af79929083..584590354beacea18566803e1335988aa5a2a194 100644 --- a/include/daemon/backend/metadata/metadata_backend.hpp +++ b/include/daemon/backend/metadata/metadata_backend.hpp @@ -73,8 +73,11 @@ public: virtual std::vector> get_dirents_extended(const std::string& dir) const = 0; - virtual void + virtual void* iterate_all() const = 0; + + virtual uint64_t + db_size() const = 0; }; template @@ -137,9 +140,14 @@ public: return static_cast(*this).get_dirents_extended_impl(dir); } - void + void* iterate_all() const { - static_cast(*this).iterate_all_impl(); + return static_cast(*this).iterate_all_impl(); + } + + uint64_t + db_size() const { + return static_cast(*this).db_size_impl(); } }; diff --git a/include/daemon/backend/metadata/parallax_backend.hpp b/include/daemon/backend/metadata/parallax_backend.hpp index eda56690d65cdc26e808262d39d24716bac05196..c351c0fe6917557fa6e657828ae9d1d5878ef634 100644 --- a/include/daemon/backend/metadata/parallax_backend.hpp +++ b/include/daemon/backend/metadata/parallax_backend.hpp @@ -190,7 +190,7 @@ public: * Code example for iterating all entries in KV store. This is for debug * only as it is too expensive */ - void + void* iterate_all_impl() const; }; diff --git a/include/daemon/backend/metadata/rocksdb_backend.hpp b/include/daemon/backend/metadata/rocksdb_backend.hpp index a54472da16a5e22832aacad8c4e19df9f5e1d766..824ae6fc96d2c12ed909f79a346bb74d64a8d43d 100644 --- a/include/daemon/backend/metadata/rocksdb_backend.hpp +++ b/include/daemon/backend/metadata/rocksdb_backend.hpp @@ -174,8 +174,11 @@ public: * Code example for iterating all entries in KV store. This is for debug * only as it is too expensive */ - void + void* iterate_all_impl() const; + + uint64_t + db_size_impl() const; }; } // namespace gkfs::metadata diff --git a/include/daemon/classes/fs_data.hpp b/include/daemon/classes/fs_data.hpp index 4cde4170e1aa762bea5c7c75c21bb8df23995be9..a59cd184b4f5125aa6413fee7f3f0d7315bc3415 100644 --- a/include/daemon/classes/fs_data.hpp +++ b/include/daemon/classes/fs_data.hpp @@ -51,12 +51,18 @@ namespace utils { class Stats; } +namespace malleable { +class MalleableManager; +} + namespace daemon { class FsData { private: - FsData() = default; + FsData(); + + ~FsData(); // logger std::shared_ptr spdlogger_; @@ -104,6 +110,15 @@ private: // Prometheus std::string prometheus_gateway_ = gkfs::config::stats::prometheus_gateway; + // maintenance mode is used to prevent new RPCs to the filesystem and + // indicates for clients: try again. Is set to true when redist is running + bool maintenance_mode_ = false; + ABT_mutex maintenance_mode_mutex_; + // redist_running_ indicates to client that redistribution is running + bool redist_running_ = false; + + std::shared_ptr malleable_manager_; + public: static FsData* getInstance() { @@ -284,6 +299,25 @@ public: void prometheus_gateway(const std::string& prometheus_gateway_); + + bool + maintenance_mode() const; + + void + maintenance_mode(bool maintenance_mode); + + bool + redist_running() const; + + void + redist_running(bool redist_running); + + const std::shared_ptr& + malleable_manager() const; + + void + malleable_manager(const std::shared_ptr& + malleable_manager); }; diff --git a/include/daemon/classes/rpc_data.hpp b/include/daemon/classes/rpc_data.hpp index c98b7d3ad3807fdf10c11d8311ab5d0f521ef585..4ccc6e615d75af2b9fd07edf3775d71d1d02eea3 100644 --- a/include/daemon/classes/rpc_data.hpp +++ b/include/daemon/classes/rpc_data.hpp @@ -30,6 +30,7 @@ #define LFS_RPC_DATA_HPP #include +#include namespace gkfs { @@ -41,6 +42,11 @@ class Distributor; namespace daemon { +struct margo_client_ids { + hg_id_t migrate_metadata_id; + hg_id_t migrate_data_id; +}; + class RPCData { private: @@ -50,6 +56,13 @@ private: // contexts that were created at init time margo_instance_id server_rpc_mid_; margo_instance_id proxy_server_rpc_mid_; + // client + margo_instance_id client_rpc_mid_; + margo_client_ids rpc_client_ids_{}; + std::map rpc_endpoints_; + uint64_t hosts_size_; + uint64_t local_host_id_; + // Argobots I/O pools and execution streams ABT_pool io_pool_; @@ -83,7 +96,34 @@ public: proxy_server_rpc_mid(); void - proxy_server_rpc_mid(margo_instance* proxy_server_rpc_mid); + proxy_server_rpc_mid(margo_instance* client_rpc_mid); + + margo_instance* + client_rpc_mid(); + + void + client_rpc_mid(margo_instance* client_rpc_mid); + + margo_client_ids& + rpc_client_ids(); + + std::map& + rpc_endpoints(); + + void + rpc_endpoints(const std::map& rpc_endpoints); + + uint64_t + hosts_size() const; + + void + hosts_size(uint64_t hosts_size); + + uint64_t + local_host_id() const; + + void + local_host_id(uint64_t local_host_id); ABT_pool io_pool() const; diff --git a/include/daemon/handler/rpc_defs.hpp b/include/daemon/handler/rpc_defs.hpp index 371094966785986fc6161f59f5bd38775938d19f..8f3f47bad2c6c1c596cddbe501febbe7cccfab1f 100644 --- a/include/daemon/handler/rpc_defs.hpp +++ b/include/daemon/handler/rpc_defs.hpp @@ -84,4 +84,15 @@ DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_read) DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_write) +// malleability + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_expand_start) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_expand_status) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) + + #endif // GKFS_DAEMON_RPC_DEFS_HPP diff --git a/include/daemon/malleability/malleable_manager.hpp b/include/daemon/malleability/malleable_manager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..12ac587f4a902e620a7c25f0c1915c896c160d28 --- /dev/null +++ b/include/daemon/malleability/malleable_manager.hpp @@ -0,0 +1,67 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ +#ifndef GEKKOFS_DAEMON_MALLEABLE_MANAGER_HPP +#define GEKKOFS_DAEMON_MALLEABLE_MANAGER_HPP + +#include + +namespace gkfs::malleable { + +class MalleableManager { +private: + ABT_thread redist_thread_; + + // TODO next 3 functions are mostly copy paste from preload_util. FIX + + std::vector> + load_hostfile(const std::string& path); + + std::vector> + read_hosts_file(); + + void + connect_to_hosts( + const std::vector>& hosts); + + int + redistribute_metadata(); + + void + redistribute_data(); + + static void + expand_abt(void* _arg); + +public: + void + expand_start(int old_server_conf, int new_server_conf); +}; +} // namespace gkfs::malleable + + +#endif // GEKKOFS_MALLEABLE_MANAGER_HPP diff --git a/include/daemon/malleability/rpc/forward_redistribution.hpp b/include/daemon/malleability/rpc/forward_redistribution.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b450a65257fb355f70c56134ef42d0d82c322628 --- /dev/null +++ b/include/daemon/malleability/rpc/forward_redistribution.hpp @@ -0,0 +1,46 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#ifndef GEKKOFS_DAEMON_FORWARD_REDISTRIBUTION_HPP +#define GEKKOFS_DAEMON_FORWARD_REDISTRIBUTION_HPP + +#include +#include + +namespace gkfs::malleable::rpc { + +int +forward_metadata(std::string& key, std::string& value, unsigned int dest_id); + +int +forward_data(const std::string& path, void* buf, const size_t count, + const uint64_t chnk_id, const uint64_t dest_id); + +} // namespace gkfs::malleable::rpc + +#endif // GEKKOFS_DAEMON_FORWARD_REDISTRIBUTION_HPP \ No newline at end of file diff --git a/scripts/run/gkfs b/scripts/run/gkfs index 502f3bc3b0e8cf993eabb440438a6e2e3f2c4d69..8582c025ebf771ad1af711c2e0f3c6766b4c53d4 100755 --- a/scripts/run/gkfs +++ b/scripts/run/gkfs @@ -15,25 +15,34 @@ fi C_AST_GREEN="${C_GREEN}*${C_NONE} [gkfs] " C_AST_YELLOW="${C_BYELLOW}*${C_NONE} [gkfs] " C_AST_RED="${C_BRED}*${C_NONE} [gkfs] " + +# Important const globals +FS_INSTANCE_MARKER_CONST="#FS_INSTANCE_END" ####################################### # Poll GekkoFS hostsfile until all daemons are started. # Exits with 1 if daemons cannot be started. # Globals: # HOSTSFILE # NODE_NUM +# NODE_CNT_EXPAND +# COMMAND # Arguments: # None # Outputs: # Writes error to stdout ####################################### wait_for_gkfs_daemons() { - sleep 2 + sleep 1 local server_wait_cnt=0 local nodes=1 if [[ -n ${NODE_NUM} ]]; then nodes=${NODE_NUM} fi - until [ $(($(wc -l "${HOSTSFILE}" 2> /dev/null | awk '{print $1}') + 0)) -eq "${nodes}" ] + # when expanding the total number of nodes is: initial nodelist + expand nodelist + if [[ ${COMMAND} == *"expand"* ]]; then + nodes=${NODE_CNT_EXPAND} + fi + until [ $(($(grep -cv '^#' "${HOSTSFILE}" 2> /dev/null | awk '{print $1}') + 0)) -eq "${nodes}" ] do #echo "Waiting for all servers to report connection. Try $server_wait_cnt" sleep 2 @@ -48,8 +57,8 @@ wait_for_gkfs_daemons() { # Creates a pid file for a given pid. If pid file exists, we check if its pids are still valid. # If valid, an additional line is added. Otherwise, the pid in the file is deleted. # Globals: -# SRUN_DAEMON_PID_FILE -# SRUN_PROXY_PID_FILE +# DAEMON_PID_FILE +# PROXY_PID_FILE # VERBOSE # Arguments: # path to pid file @@ -57,15 +66,15 @@ wait_for_gkfs_daemons() { # Outputs: # Writes status to stdout if VERBOSE is true ####################################### -create_pid_file() { +write_pid_file() { local pid_file=${1} local pid=${2} if [[ ${VERBOSE} == true ]]; then echo -e "${C_AST_GREEN}Creating pid file at ${pid_file} with pid ${pid} ..." fi - # if PID file exists another daemon could run + # if PID file exists another daemon (or srun) could run if [[ -e ${pid_file} ]]; then - local pid_file_tmp=${SRUN_DAEMON_PID_FILE}.swp + local pid_file_tmp=${DAEMON_PID_FILE}.swp # create empty tmp file truncate -s 0 "${pid_file_tmp}" while IFS= read -r line @@ -99,10 +108,13 @@ create_pid_file() { # GKFS_DAEMON_LOG_PATH # GKFS_DAEMON_LOG_LEVEL # RUN_FOREGROUND +# DAEMON_BIN +# PROXY_BIN +# COMMAND # Outputs: # Writes status to stdout ####################################### -start_daemon() { +start_daemons() { local node_list local srun_daemon_cmd local srun_proxy_cmd @@ -160,10 +172,14 @@ start_daemon() { echo -e "${C_AST_GREEN}cpus_per_task: ${CPUS_PER_TASK}" [[ ${USE_PROXY} == true ]] && echo -e "${C_AST_GREEN}Proxy enabled" fi - if [[ ${VERBOSE} == true ]]; then - echo -e "${C_AST_GREEN}Cleaning host file ..." + # sanity checks before starting + if [[ ${COMMAND} == *"start"* ]]; then + # only clear hostfile when starting for the first time + if [[ ${VERBOSE} == true ]]; then + echo -e "${C_AST_GREEN}Cleaning host file ..." + fi + rm "${HOSTSFILE}" 2> /dev/null fi - rm "${HOSTSFILE}" 2> /dev/null # Setting up base daemon cmd local daemon_cmd="${DAEMON_BIN} -r ${ROOTDIR} -m ${MOUNTDIR} -H ${HOSTSFILE} ${DAEMON_ARGS_}" if [[ ${USE_PROXY} == true ]]; then @@ -173,24 +189,24 @@ start_daemon() { if [[ -n ${DAEMON_AFFINITY_} ]]; then daemon_cmd="${DAEMON_AFFINITY_} ${daemon_cmd}" fi - # final daemon execute command + # final daemon execute COMMAND daemon_execute="${srun_daemon_cmd} ${SRUN_DAEMON_ARGS} ${daemon_cmd}" - # Setting up base proxy command + # Setting up base proxy COMMAND if [[ ${USE_PROXY} == true ]]; then local proxy_cmd="${PROXY_BIN} -H ${HOSTSFILE} --pid-path ${PROXY_LOCAL_PID_FILE} ${PROXY_ARGS_}" # Set cpu affinity for proxy if [[ -n ${PROXY_AFFINITY_} ]]; then proxy_cmd="${PROXY_AFFINITY_} ${proxy_cmd}" fi - # final proxy execute command + # final proxy execute COMMAND proxy_execute="${srun_proxy_cmd} ${SRUN_PROXY_ARGS} ${proxy_cmd}" fi if [[ ${VERBOSE} == true ]]; then - echo -e "${C_AST_GREEN}Full execute DAEMON command:" + echo -e "${C_AST_GREEN}Full execute DAEMON COMMAND:" echo -e "${C_AST_GREEN}# $daemon_execute" - [[ ${USE_PROXY} == true ]] && echo -e "${C_AST_GREEN}Full execute PROXY command:" + [[ ${USE_PROXY} == true ]] && echo -e "${C_AST_GREEN}Full execute PROXY COMMAND:" [[ ${USE_PROXY} == true ]] && echo -e "${C_AST_GREEN}# $proxy_execute" fi # setup environment variables @@ -254,24 +270,24 @@ start_daemon() { fi done else - create_pid_file ${SRUN_DAEMON_PID_FILE} ${daemon_pid} + write_pid_file ${DAEMON_PID_FILE} ${daemon_pid} if [[ ${USE_PROXY} == true ]]; then - create_pid_file ${SRUN_PROXY_PID_FILE} ${proxy_pid} + write_pid_file ${PROXY_PID_FILE} ${proxy_pid} fi fi } ####################################### # Stops GekkoFS daemons for the configured pid file # Globals: -# SRUN_DAEMON_PID_FILE -# SRUN_PROXY_PID_FILE +# DAEMON_PID_FILE +# PROXY_PID_FILE # VERBOSE # Outputs: # Writes status to stdout ####################################### stop_daemons() { - local pid_file=${SRUN_DAEMON_PID_FILE} - local proxy_pid_file=${SRUN_PROXY_PID_FILE} + local pid_file=${DAEMON_PID_FILE} + local proxy_pid_file=${PROXY_PID_FILE} # if no daemon or proxy pid file exists, exit if [[ ! -e ${pid_file} ]] && [[ ! -e ${proxy_pid_file} ]]; then echo -e "${C_AST_RED}No pid files found -> no daemon or proxy running. Exiting ..." @@ -301,6 +317,8 @@ stop_daemons() { if [[ -e ${pid_file} ]]; then while IFS= read -r line do + # if line starts with # continue + [[ ${line} =~ ^#.*$ ]] && continue if ps -p "${line}" > /dev/null; then echo -e "${C_AST_GREEN}Stopping daemon with pid ${line}" start_time="$(date -u +%s.%3N)" @@ -318,6 +336,142 @@ stop_daemons() { echo -e "${C_AST_GREEN}Shutdown time: ${elapsed} seconds" fi } + +####################################### +# Sets up expand progress for later operation +# Globals: +# RUN_FOREGROUND +# EXPAND_NODELIST +# HOSTSFILE +# DAEMON_NODELIST +# USE_PROXY +# GKFS_MALLEABILITY_BIN_ +# VERBOSE +# Outputs: +# sets GKFS_MALLEABILITY_BIN_ if not already given by config +####################################### +expand_setup() { + # sanity checks + if [[ ${RUN_FOREGROUND} == true ]]; then + echo -e "${C_AST_RED}ERROR: Cannot run in foreground for expansion. Exiting ..." + exit 1 + fi + if [[ -z ${EXPAND_NODELIST} ]]; then + echo -e "${C_AST_RED}ERROR: No expand host file given. We need to know which nodes should be used. Exiting ..." + exit 1 + fi + # if proxy is enabled error out + # to support proxy, all proxies need to be shutdown during expansion and started up after again + # to get the new configuration. + if [[ ${USE_PROXY} == true ]]; then + echo -e "${C_AST_RED}ERROR: Proxy not supported for expansion. Exiting ..." + exit 1 + fi + # check that gkfs host file exists + if [[ ! -f ${HOSTSFILE} ]]; then + echo -e "${C_AST_RED}ERROR: No GekkoFS hostfile for expansion found at ${HOSTSFILE}. Exiting ..." + exit 1 + fi + # check that daemon pid file exists + if [[ ! -f ${DAEMON_PID_FILE} ]]; then + echo -e "${C_AST_RED}ERROR: No daemon pid file found at ${DAEMON_PID_FILE}." + echo -e "${C_AST_RED} Existing daemon must run in background for extension. Exiting ..." + exit 1 + fi + # modify all necessary environment variables from the config file to fit expand + DAEMON_NODELIST_=${DAEMON_NODELIST} + # Set daemon node list based on given expand hostfile + DAEMON_NODELIST_=$(readlink -f ${EXPAND_NODELIST}) + # setup + # This must be equivalent to the line set in include/common/common_defs.hpp + echo "$FS_INSTANCE_MARKER_CONST" >> "${HOSTSFILE}" + # check that the gkfs_malleability binary exists in $PATH if not already set via config + if [[ -z ${GKFS_MALLEABILITY_BIN_} ]]; then + GKFS_MALLEABILITY_BIN_=$(COMMAND -v gkfs_malleability) + fi + # if not found check if it exists in the parent directory of the daemon bin + if [[ -z ${GKFS_MALLEABILITY_BIN_} ]]; then + # check that the gkfs_malleability binary exists somewhere in the parent directory where daemon bin is located + if [[ -f $(dirname ${DAEMON_BIN})/gkfs_malleability ]]; then + GKFS_MALLEABILITY_BIN_=$(readlink -f $(dirname ${DAEMON_BIN})/gkfs_malleability) + else + echo -e "${C_AST_RED}ERROR: gkfs_malleability binary not found. Exiting ..." + exit 1 + fi + fi +} + +####################################### +# Prints expansion progress +# Input: +# $1 current +# $2 total +# VERBOSE +# Outputs: +# Writes status to stdout +####################################### +show_expand_progress() { + local current="$1" + local total="$2" + local remaining=$((total - current)) + local progress=$(( (remaining * 100) / total )) + local bar_length=20 + local filled_length=$(( (progress * bar_length) / 100 )) + local empty_length=$(( bar_length - filled_length )) + + # Clear the entire line and move cursor to the beginning + tput el1; tput cr + + printf "[" + for ((i=0; i] [-m/--mountdir ] [-a/--args ] [--proxy ] [-f/--foreground ] [--srun ] [-n/--numnodes ] [--cpuspertask <64>] [-v/--verbose ] - {start,stop} + {start,expand,stop} " } ####################################### @@ -343,7 +497,7 @@ help_msg() { additional permanent configurations can be set. positional arguments: - command Command to execute: 'start' and 'stop' + COMMAND Command to execute: 'start', 'stop', 'expand' optional arguments: -h, --help Shows this help message and exits @@ -359,6 +513,7 @@ help_msg() { Nodelist is extracted from Slurm via the SLURM_JOB_ID env variable. --cpuspertask <#cores> Set the number of cores the daemons can use. Must use '--srun'. -c, --config Path to configuration file. By defaults looks for a 'gkfs.conf' in this directory. + -e, --expand_hostfile Path to the hostfile with new nodes where GekkoFS should be extended to (hostfile contains one line per node). -v, --verbose Increase verbosity " } @@ -408,8 +563,10 @@ PROXY_BIN=$(readlink -f ${PROXY_BIN}) PRELOAD_LIB=$(readlink -f ${PRELOAD_LIB}) HOSTSFILE=$(readlink -f ${HOSTSFILE}) PROXY_LOCAL_PID_FILE=$(readlink -f ${PROXY_LOCAL_PID_FILE}) -SRUN_DAEMON_PID_FILE=$(readlink -f ${SRUN_DAEMON_PID_FILE}) -SRUN_PROXY_PID_FILE=$(readlink -f ${SRUN_PROXY_PID_FILE}) +DAEMON_PID_FILE=$(readlink -f ${DAEMON_PID_FILE}) +PROXY_PID_FILE=$(readlink -f ${PROXY_PID_FILE}) +EXPAND_NODELIST="" +GKFS_MALLEABILITY_BIN_=${GKFS_MALLEABILITY_BIN} # parse input POSITIONAL=() @@ -474,6 +631,11 @@ while [[ $# -gt 0 ]]; do shift # past argument shift # past value ;; + -e | --expand_hostfile) + EXPAND_NODELIST=$2 + shift # past argument + shift # past value + ;; -h | --help) help_msg exit @@ -496,18 +658,20 @@ if [[ -z ${1+x} ]]; then usage_short exit 1 fi -command="${1}" +COMMAND="${1}" # checking input -if [[ ${command} != *"start"* ]] && [[ ${command} != *"stop"* ]]; then - echo -e "${C_AST_RED}ERROR: command ${command} not supported" +if [[ ${COMMAND} != *"start"* ]] && [[ ${COMMAND} != *"stop"* ]] && [[ ${COMMAND} != *"expand"* ]]; then + echo -e "${C_AST_RED}ERROR: COMMAND ${COMMAND} not supported" usage_short exit 1 fi # Run script -if [[ ${command} == "start" ]]; then - start_daemon -elif [[ ${command} == "stop" ]]; then +if [[ ${COMMAND} == "start" ]]; then + start_daemons +elif [[ ${COMMAND} == "stop" ]]; then stop_daemons +elif [[ ${COMMAND} == "expand" ]]; then + add_daemons fi if [[ ${VERBOSE} == true ]]; then echo -e "${C_AST_GREEN}Nothing left to do. Exiting :)" diff --git a/scripts/run/gkfs.conf b/scripts/run/gkfs.conf index fc7922ddb28d2dd3a5d12c0fdf48411b78a53ed7..b71e85fd32bca58485331f6eb385bf9626fe4bfa 100644 --- a/scripts/run/gkfs.conf +++ b/scripts/run/gkfs.conf @@ -6,16 +6,23 @@ DAEMON_BIN=../../build/src/daemon/gkfs_daemon PROXY_BIN=../../build/src/proxy/gkfs_proxy # client configuration (needs to be set for all clients) -LIBGKFS_HOSTS_FILE=./gkfs_hostfile +LIBGKFS_HOSTS_FILE=/home/XXX/workdir/gkfs_hosts.txt + +# tools (if build) +GKFS_MALLEABILITY_BIN=../../build/tools/gkfs_malleability ## daemon configuration -DAEMON_ROOTDIR=/dev/shm/vef_gkfs_rootdir -DAEMON_MOUNTDIR=/dev/shm/vef_gkfs_mountdir +#DAEMON_ROOTDIR=/dev/shm/vef_gkfs_rootdir +DAEMON_ROOTDIR=/dev/shm/gkfs_rootdir +#DAEMON_MOUNTDIR=/dev/shm/vef_gkfs_mountdir +DAEMON_MOUNTDIR=/tmp/gkfs_mountdir # additional daemon arguments (see `gkfs_daemon -h`) # use numactl to pin daemon to socket DAEMON_ARGS="-l lo -c" # use cpu affinity. Set this eg to `taskset -c ...` DAEMON_AFFINITY="" +# used when run in background +DAEMON_PID_FILE=./gkfs_daemon.pid ## proxy configuration USE_PROXY=false @@ -24,6 +31,8 @@ PROXY_LOCAL_PID_FILE=/dev/shm/vef_gkfs_proxy.pid PROXY_ARGS="-p ofi+sockets" # use cpu affinity. Set this eg to `taskset -c ...` PROXY_AFFINITY="" +# used when run in background +PROXY_PID_FILE=./gkfs_proxy.pid ## slurm configuration # Use Slurm's srun to start the daemons on multiple nodes and set specific srun args @@ -35,13 +44,10 @@ SRUN_ARGS="--overlap --ntasks-per-node=1 --overcommit --overlap --oversubscribe SRUN_DAEMON_ARGS="" # Specific srun args for proxy SRUN_PROXY_ARGS="" -# path to daemon pid file; created where the script is run -SRUN_DAEMON_PID_FILE=./gkfs_daemon.pid -SRUN_PROXY_PID_FILE=./gkfs_proxy.pid # logging -GKFS_DAEMON_LOG_LEVEL=info -GKFS_DAEMON_LOG_PATH=/dev/shm/gkfs_daemon.log +GKFS_DAEMON_LOG_LEVEL=trace +GKFS_DAEMON_LOG_PATH=/tmp/gkfs_daemon.log GKFS_PROXY_LOG_LEVEL=info GKFS_PROXY_LOG_PATH=/dev/shm/gkfs_proxy.log # Modify the following for the client diff --git a/scripts/run/gkfs_io500.conf b/scripts/run/gkfs_io500.conf index eaca98fde15d3a9fff6e63f4f09ac874965aac87..5a3cb688e1aded65f5417365d189724c84060870 100644 --- a/scripts/run/gkfs_io500.conf +++ b/scripts/run/gkfs_io500.conf @@ -18,6 +18,8 @@ DAEMON_MOUNTDIR=/dev/shm/vef_gkfs_mountdir DAEMON_ARGS="-P ofi+verbs -l ib0 -c" # use cpu affinity. Set this eg to `taskset -c ...` DAEMON_AFFINITY="taskset -c 0-63" +# used when run in background +DAEMON_PID_FILE=/lustre/project/nhr-admire/vef/run/io500/gkfs_daemon.pid ## proxy configuration USE_PROXY=false @@ -26,6 +28,8 @@ PROXY_LOCAL_PID_FILE=/dev/shm/vef_gkfs_proxy.pid PROXY_ARGS="-p ofi+verbs" # use cpu affinity. Set this eg to `taskset -c ...` PROXY_AFFINITY="taskset -c 0-63" +# used when run in background +PROXY_PID_FILE=/lustre/project/nhr-admire/vef/run/io500/gkfs_proxy.pid ## slurm configuration # Use Slurm's srun to start the daemons on multiple nodes and set specific srun args @@ -37,9 +41,6 @@ SRUN_ARGS="--overlap --ntasks-per-node=1 --overcommit --overlap --oversubscribe SRUN_DAEMON_ARGS="" # Specific srun args for proxy SRUN_PROXY_ARGS="" -# path to daemon pid file; created where the script is run -SRUN_DAEMON_PID_FILE=/lustre/project/nhr-admire/vef/run/io500/gkfs_daemon.pid -SRUN_PROXY_PID_FILE=/lustre/project/nhr-admire/vef/run/io500/gkfs_proxy.pid # logging configuration GKFS_DAEMON_LOG_LEVEL=info diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index d2415233daade3bb758c4d7f1527afb528f04bf6..e2a53a0826e59b91889a981bb51b1e100b4ae6b2 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -67,12 +67,14 @@ target_sources( preload.cpp preload_context.cpp preload_util.cpp + malleability.cpp rpc/rpc_types.cpp rpc/forward_data.cpp rpc/forward_data_proxy.cpp rpc/forward_management.cpp rpc/forward_metadata.cpp rpc/forward_metadata_proxy.cpp + rpc/forward_malleability.cpp syscalls/detail/syscall_info.c syscalls/util.S ) diff --git a/src/client/malleability.cpp b/src/client/malleability.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0e2a2e8af8712fd98f134b349066c86f22bab788 --- /dev/null +++ b/src/client/malleability.cpp @@ -0,0 +1,80 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS' POSIX interface. + + GekkoFS' POSIX interface is free software: you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GekkoFS' POSIX interface is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with GekkoFS' POSIX interface. If not, see + . + + SPDX-License-Identifier: LGPL-3.0-or-later +*/ + +#include +#include +#include +#include + +#include + +using namespace std; + +namespace gkfs::malleable { + +int +expand_start(int old_server_conf, int new_server_conf) { + LOG(INFO, "{}() Expand operation enter", __func__); + // sanity checks + if(old_server_conf == new_server_conf) { + auto err_str = + "ERR: Old server configuration is the same as the new one"; + cerr << err_str << endl; + LOG(ERROR, "{}() {}", __func__, err_str); + return -1; + } + if(CTX->hosts().size() != static_cast(old_server_conf)) { + auto err_str = + "ERR: Old server configuration does not match the number of hosts in hostsfile"; + cerr << err_str << endl; + LOG(ERROR, "{}() {}", __func__, err_str); + return -1; + } + // TODO check that hostsfile contains endmarker + return gkfs::malleable::rpc::forward_expand_start(old_server_conf, + new_server_conf); +} + +int +expand_status() { + LOG(INFO, "{}() enter", __func__); + auto res = gkfs::malleable::rpc::forward_expand_status(); + LOG(INFO, "{}() '{}' nodes working on extend operation.", __func__, res); + return res; +} + +int +expand_finalize() { + LOG(INFO, "{}() enter", __func__); + auto res = gkfs::malleable::rpc::forward_expand_finalize(); + LOG(INFO, "{}() extend operation finalized. ", __func__); + return res; +} + +} // namespace gkfs::malleable \ No newline at end of file diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 7c769b1cba70f825fce0db00f0331c2f6d79808d..806db66395d988387f7c1277df8786a2f9370585 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -161,11 +161,15 @@ load_hostfile(const std::string& path) { string uri; std::smatch match; while(getline(lf, line)) { + // if line starts with #, it indicates the end of current FS instance + // Further hosts are not part of the file system instance yet and are + // therefore skipped The hostfile is ordered, so nothgin below this line + // can contain valid hosts + if(line.find(gkfs::client::hostsfile_end_str) != string::npos) + break; if(!regex_match(line, match, line_re)) { - LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", path, line); - throw runtime_error( fmt::format("unrecognized line format: '{}'", line)); } @@ -510,4 +514,4 @@ lookup_proxy_addr() { CTX->proxy_host(addr); } -} // namespace gkfs::utils +} // namespace gkfs::utils \ No newline at end of file diff --git a/src/client/rpc/forward_malleability.cpp b/src/client/rpc/forward_malleability.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bf59dad5dbe6a31793f802c37af21d352ffd30a4 --- /dev/null +++ b/src/client/rpc/forward_malleability.cpp @@ -0,0 +1,222 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS' POSIX interface. + + GekkoFS' POSIX interface is free software: you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GekkoFS' POSIX interface is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with GekkoFS' POSIX interface. If not, see + . + + SPDX-License-Identifier: LGPL-3.0-or-later +*/ +#include +#include +#include +#include +#include +#include + +namespace gkfs::malleable::rpc { + +int +forward_expand_start(int old_server_conf, int new_server_conf) { + LOG(INFO, "{}() enter", __func__); + auto const targets = CTX->distributor()->locate_directory_metadata(); + + auto err = 0; + // send async RPCs + std::vector> handles; + + for(std::size_t i = 0; i < targets.size(); ++i) { + + // Setup rpc input parameters for each host + auto endp = CTX->hosts().at(targets[i]); + + gkfs::malleable::rpc::expand_start::input in(old_server_conf, + new_server_conf); + + try { + LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]); + handles.emplace_back( + ld_network_service + ->post(endp, + in)); + } catch(const std::exception& ex) { + LOG(ERROR, + "{}() Unable to send non-blocking forward_expand_start() [peer: {}] err '{}'", + __func__, targets[i], ex.what()); + err = EBUSY; + break; // we need to gather responses from already sent RPCS + } + } + + LOG(INFO, "{}() send expand_start rpc to '{}' targets", __func__, + targets.size()); + + // wait for RPC responses + for(std::size_t i = 0; i < handles.size(); ++i) { + + gkfs::malleable::rpc::expand_start::output out; + + try { + out = handles[i].get().at(0); + + if(out.err() != 0) { + LOG(ERROR, + "{}() Failed to retrieve dir entries from host '{}'. Error '{}'", + __func__, targets[i], strerror(out.err())); + err = out.err(); + // We need to gather all responses before exiting + continue; + } + } catch(const std::exception& ex) { + LOG(ERROR, + "{}() Failed to get rpc output.. [target host: {}] err '{}'", + __func__, targets[i], ex.what()); + err = EBUSY; + // We need to gather all responses before exiting + continue; + } + } + return err; +} + +int +forward_expand_status() { + LOG(INFO, "{}() enter", __func__); + auto const targets = CTX->distributor()->locate_directory_metadata(); + + auto err = 0; + // send async RPCs + std::vector> + handles; + + for(std::size_t i = 0; i < targets.size(); ++i) { + + // Setup rpc input parameters for each host + auto endp = CTX->hosts().at(targets[i]); + + try { + LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]); + handles.emplace_back( + ld_network_service + ->post(endp)); + } catch(const std::exception& ex) { + LOG(ERROR, + "{}() Unable to send non-blocking forward_expand_status() [peer: {}] err '{}'", + __func__, targets[i], ex.what()); + err = EBUSY; + break; // we need to gather responses from already sent RPCS + } + } + + LOG(INFO, "{}() send expand_status rpc to '{}' targets", __func__, + targets.size()); + + // wait for RPC responses + for(std::size_t i = 0; i < handles.size(); ++i) { + gkfs::malleable::rpc::expand_status::output out; + try { + out = handles[i].get().at(0); + if(out.err() > 0) { + LOG(DEBUG, + "{}() Host '{}' not done yet with malleable operation.", + __func__, targets[i]); + err += out.err(); + } + if(out.err() < 0) { + // ignore. shouldn't happen for now + LOG(ERROR, + "{}() Host '{}' is unable to check for expansion progress. (shouldn't happen)", + __func__, targets[i]); + } + } catch(const std::exception& ex) { + LOG(ERROR, + "{}() Failed to get rpc output.. [target host: {}] err '{}'", + __func__, targets[i], ex.what()); + err = EBUSY; + // We need to gather all responses before exiting + continue; + } + } + return err; +} + +int +forward_expand_finalize() { + LOG(INFO, "{}() enter", __func__); + auto const targets = CTX->distributor()->locate_directory_metadata(); + + auto err = 0; + // send async RPCs + std::vector> + handles; + + for(std::size_t i = 0; i < targets.size(); ++i) { + + // Setup rpc input parameters for each host + auto endp = CTX->hosts().at(targets[i]); + + try { + LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]); + handles.emplace_back( + ld_network_service + ->post( + endp)); + } catch(const std::exception& ex) { + LOG(ERROR, + "{}() Unable to send non-blocking forward_expand_finalize() [peer: {}] err '{}'", + __func__, targets[i], ex.what()); + err = EBUSY; + break; // we need to gather responses from already sent RPCS + } + } + + LOG(INFO, "{}() send expand_finalize rpc to '{}' targets", __func__, + targets.size()); + + // wait for RPC responses + for(std::size_t i = 0; i < handles.size(); ++i) { + + gkfs::malleable::rpc::expand_finalize::output out; + + try { + out = handles[i].get().at(0); + + if(out.err() != 0) { + LOG(ERROR, "{}() Failed finalize on host '{}'. Error '{}'", + __func__, targets[i], strerror(out.err())); + err = out.err(); + // We need to gather all responses before exiting + continue; + } + } catch(const std::exception& ex) { + LOG(ERROR, + "{}() Failed to get rpc output.. [target host: {}] err '{}'", + __func__, targets[i], ex.what()); + err = EBUSY; + // We need to gather all responses before exiting + continue; + } + } + return err; +} + +} // namespace gkfs::malleable::rpc \ No newline at end of file diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index c5c30957352c4f517ff0d6234492f3cf304b5eed..4cfe6f60aceff189eef52a1638b1915eeee5dcc1 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -642,7 +642,7 @@ forward_get_dirents(const string& path) { LOG(DEBUG, "{}() enter for path '{}'", __func__, path) - auto const targets = CTX->distributor()->locate_directory_metadata(path); + auto const targets = CTX->distributor()->locate_directory_metadata(); /* preallocate receiving buffer. The actual size is not known yet. * @@ -793,7 +793,7 @@ forward_get_dirents_single(const string& path, int server) { LOG(DEBUG, "{}() enter for path '{}'", __func__, path) - auto const targets = CTX->distributor()->locate_directory_metadata(path); + auto const targets = CTX->distributor()->locate_directory_metadata(); /* preallocate receiving buffer. The actual size is not known yet. * diff --git a/src/client/rpc/rpc_types.cpp b/src/client/rpc/rpc_types.cpp index 8dc03c911671e78851d51dc9439fb7ea62ca0aa5..ed447bdd21831f5eaf6cacdceff0c3d177605ed4 100644 --- a/src/client/rpc/rpc_types.cpp +++ b/src/client/rpc/rpc_types.cpp @@ -60,6 +60,12 @@ hermes::detail::register_user_request_types(uint32_t provider_id) { (void) registered_requests().add(provider_id); (void) registered_requests().add( provider_id); + (void) registered_requests().add( + provider_id); + (void) registered_requests().add( + provider_id); + (void) registered_requests().add( + provider_id); } else { (void) registered_requests().add( provider_id); diff --git a/src/common/rpc/distributor.cpp b/src/common/rpc/distributor.cpp index ad558b9e95d31f820aaf5d07ad2fa8f5b13a4840..9fd4d90b540801c3fdb2787217bfac7580c3b5e8 100644 --- a/src/common/rpc/distributor.cpp +++ b/src/common/rpc/distributor.cpp @@ -52,6 +52,11 @@ SimpleHashDistributor::hosts_size() const { return hosts_size_; } +void +SimpleHashDistributor::hosts_size(unsigned int size) { + hosts_size_ = size; +} + host_t SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id, const int num_copy) const { @@ -78,7 +83,7 @@ SimpleHashDistributor::locate_file_metadata(const string& path, } ::vector -SimpleHashDistributor::locate_directory_metadata(const string& path) const { +SimpleHashDistributor::locate_directory_metadata() const { return all_hosts_; } @@ -95,6 +100,11 @@ LocalOnlyDistributor::hosts_size() const { return hosts_size_; } +void +LocalOnlyDistributor::hosts_size(unsigned int size) { + hosts_size_ = size; +} + host_t LocalOnlyDistributor::locate_data(const string& path, const chunkid_t& chnk_id, const int num_copy) const { @@ -108,7 +118,7 @@ LocalOnlyDistributor::locate_file_metadata(const string& path, } ::vector -LocalOnlyDistributor::locate_directory_metadata(const string& path) const { +LocalOnlyDistributor::locate_directory_metadata() const { return {localhost_}; } @@ -128,6 +138,11 @@ ForwarderDistributor::hosts_size() const { return hosts_size_; } +void +ForwarderDistributor::hosts_size(unsigned int size) { + hosts_size_ = size; +} + host_t ForwarderDistributor::locate_data(const std::string& path, const chunkid_t& chnk_id, @@ -150,7 +165,7 @@ ForwarderDistributor::locate_file_metadata(const std::string& path, std::vector -ForwarderDistributor::locate_directory_metadata(const std::string& path) const { +ForwarderDistributor::locate_directory_metadata() const { return all_hosts_; } @@ -239,6 +254,11 @@ GuidedDistributor::hosts_size() const { return hosts_size_; } +void +GuidedDistributor::hosts_size(unsigned int size) { + hosts_size_ = size; +} + host_t GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id, unsigned int hosts_size, const int num_copy) { @@ -282,7 +302,7 @@ GuidedDistributor::locate_file_metadata(const string& path, ::vector -GuidedDistributor::locate_directory_metadata(const string& path) const { +GuidedDistributor::locate_directory_metadata() const { return all_hosts_; } diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index 89af71c7624109df1d90b7c4bae1b29fd030b193..d71a36d547874c2b64d8995794d3abfc4754eee5 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -48,6 +48,9 @@ target_sources( classes/rpc_data.cpp handler/srv_metadata.cpp handler/srv_management.cpp + handler/srv_malleability.cpp + malleability/malleable_manager.cpp + malleability/rpc/forward_redistribution.cpp PUBLIC ${CMAKE_SOURCE_DIR}/include/config.hpp ${CMAKE_SOURCE_DIR}/include/version.hpp.in ) diff --git a/src/daemon/backend/CMakeLists.txt b/src/daemon/backend/CMakeLists.txt index daecf5514414352da34fca806786a5e5183c6a70..aab7312f396159e8d164640ebb6b2bcd62fbd73a 100644 --- a/src/daemon/backend/CMakeLists.txt +++ b/src/daemon/backend/CMakeLists.txt @@ -27,4 +27,4 @@ ################################################################################ add_subdirectory(metadata) -add_subdirectory(data) +add_subdirectory(data) \ No newline at end of file diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index 467bde8fdd6660ebb7b394eb32dbdc9c1016b2e0..dac7830c02dfcce3867956eac949259656943376 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include @@ -340,4 +341,18 @@ ChunkStorage::chunk_stat() const { return {chunksize_, bytes_total / chunksize_, bytes_free / chunksize_}; } +fs::recursive_directory_iterator +ChunkStorage::get_all_chunk_files() { + auto chunk_dir = fs::path(root_path_); + if(!fs::exists(chunk_dir)) { + throw ChunkStorageException(ENOENT, "Chunk directory does not exist"); + } + return fs::recursive_directory_iterator(chunk_dir); +} + +std::string +ChunkStorage::get_chunk_directory() { + return root_path_; +} + } // namespace gkfs::data \ No newline at end of file diff --git a/src/daemon/backend/metadata/db.cpp b/src/daemon/backend/metadata/db.cpp index da015da43d529bbfc108a4c3bdc9cc96e222a5c7..a2e0e59b3e0f78375469b94ff359598469463644 100644 --- a/src/daemon/backend/metadata/db.cpp +++ b/src/daemon/backend/metadata/db.cpp @@ -173,16 +173,20 @@ MetadataDB::get_dirents_extended(const std::string& dir) const { return backend_->get_dirents_extended(root_path); } - /** * @internal * Code example for iterating all entries in KV store. This is for debug only as * it is too expensive. * @endinternal */ -void +void* MetadataDB::iterate_all() const { - backend_->iterate_all(); + return backend_->iterate_all(); +} + +uint64_t +MetadataDB::db_size() const { + return backend_->db_size(); } } // namespace gkfs::metadata diff --git a/src/daemon/backend/metadata/parallax_backend.cpp b/src/daemon/backend/metadata/parallax_backend.cpp index ee9e5e9ec31e9b03ea1e9aa78e2541bc8fd5d06b..16da6473824b0fdb909efc8faa45cd98abe39488 100644 --- a/src/daemon/backend/metadata/parallax_backend.cpp +++ b/src/daemon/backend/metadata/parallax_backend.cpp @@ -529,8 +529,10 @@ ParallaxBackend::get_dirents_extended_impl(const std::string& dir) const { * Code example for iterating all entries in KV store. This is for debug only as * it is too expensive */ -void -ParallaxBackend::iterate_all_impl() const {} +void* +ParallaxBackend::iterate_all_impl() const { + return nullptr; +} } // namespace gkfs::metadata diff --git a/src/daemon/backend/metadata/rocksdb_backend.cpp b/src/daemon/backend/metadata/rocksdb_backend.cpp index 508e7bd82df15a91806a44f68a7fb1e63a63b778..0c6fd55e6b3de9136a77d2c678ec06ebd4197baf 100644 --- a/src/daemon/backend/metadata/rocksdb_backend.cpp +++ b/src/daemon/backend/metadata/rocksdb_backend.cpp @@ -388,17 +388,26 @@ RocksDBBackend::get_dirents_extended_impl(const std::string& dir) const { * Code example for iterating all entries in KV store. This is for debug only as * it is too expensive */ -void +void* RocksDBBackend::iterate_all_impl() const { - std::string key; - std::string val; + // std::string key; + // std::string val; // Do RangeScan on parent inode - auto iter = db_->NewIterator(rdb::ReadOptions()); - for(iter->SeekToFirst(); iter->Valid(); iter->Next()) { - key = iter->key().ToString(); - val = iter->value().ToString(); - std::cout << key << std::endl; - } + // auto iter = db_->NewIterator(rdb::ReadOptions()); + // for(iter->SeekToFirst(); iter->Valid(); iter->Next()) { + // key = iter->key().ToString(); + // val = iter->value().ToString(); + // } + // TODO Fix this hacky solution. Returning void* is not a good idea :> + return static_cast(db_->NewIterator(rdb::ReadOptions())); +} + +uint64_t +RocksDBBackend::db_size_impl() const { + // TODO error handling + uint64_t num_keys = 0; + db_->GetAggregatedIntProperty("rocksdb.estimate-num-keys", &num_keys); + return num_keys; } /** diff --git a/src/daemon/classes/fs_data.cpp b/src/daemon/classes/fs_data.cpp index 909c26fe95338c9259009d22fec45c8b48284fdb..c2d09ffdd26b56fd7240e21c2a6deff9c41b06eb 100644 --- a/src/daemon/classes/fs_data.cpp +++ b/src/daemon/classes/fs_data.cpp @@ -33,6 +33,14 @@ namespace gkfs::daemon { +FsData::FsData() { + ABT_mutex_create(&maintenance_mode_mutex_); +} + +FsData::~FsData() { + ABT_mutex_free(&maintenance_mode_mutex_); +} + // getter/setter const std::shared_ptr& @@ -314,4 +322,44 @@ FsData::prometheus_gateway(const std::string& prometheus_gateway) { FsData::prometheus_gateway_ = prometheus_gateway; } +bool +FsData::maintenance_mode() const { + return maintenance_mode_; +} + +void +FsData::maintenance_mode(bool maintenance_mode) { + ABT_mutex_lock(maintenance_mode_mutex_); + if(maintenance_mode && maintenance_mode_) { + auto err_str = + "Critical error: Maintenance mode enabled twice, e.g., due to multiple expand requests. This is not a allowed and should not happen."; + spdlogger()->error(err_str); + throw std::runtime_error(err_str); + } + maintenance_mode_ = maintenance_mode; + ABT_mutex_unlock(maintenance_mode_mutex_); +} + +bool +FsData::redist_running() const { + return redist_running_; +} + +void +FsData::redist_running(bool redist_running) { + redist_running_ = redist_running; +} + +const std::shared_ptr& +FsData::malleable_manager() const { + return malleable_manager_; +} + +void +FsData::malleable_manager( + const std::shared_ptr& + malleable_manager) { + malleable_manager_ = malleable_manager; +} + } // namespace gkfs::daemon diff --git a/src/daemon/classes/rpc_data.cpp b/src/daemon/classes/rpc_data.cpp index cf8ba8d873158350ac05eccbd76c473be290850f..6ec5c0673865524efd9bcc0f2db62c64d75be1ea 100644 --- a/src/daemon/classes/rpc_data.cpp +++ b/src/daemon/classes/rpc_data.cpp @@ -54,6 +54,50 @@ RPCData::proxy_server_rpc_mid(margo_instance* proxy_server_rpc_mid) { RPCData::proxy_server_rpc_mid_ = proxy_server_rpc_mid; } +margo_instance* +RPCData::client_rpc_mid() { + return client_rpc_mid_; +} + +void +RPCData::client_rpc_mid(margo_instance* client_rpc_mid) { + RPCData::client_rpc_mid_ = client_rpc_mid; +} + +margo_client_ids& +RPCData::rpc_client_ids() { + return rpc_client_ids_; +} + +std::map& +RPCData::rpc_endpoints() { + return rpc_endpoints_; +} + +void +RPCData::rpc_endpoints(const std::map& rpc_endpoints) { + rpc_endpoints_ = rpc_endpoints; +} + +uint64_t +RPCData::hosts_size() const { + return hosts_size_; +} +void +RPCData::hosts_size(uint64_t hosts_size) { + hosts_size_ = hosts_size; +} + +uint64_t +RPCData::local_host_id() const { + return local_host_id_; +} + +void +RPCData::local_host_id(uint64_t local_host_id) { + local_host_id_ = local_host_id; +} + ABT_pool RPCData::io_pool() const { return io_pool_; diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 073fb60a1f2ef93a1253efacae1549524908a907..a00aa352cbf319694e81af0c20107b175cb1737a 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -47,6 +47,7 @@ #include #include #include +#include #include #ifdef GKFS_ENABLE_AGIOS @@ -176,6 +177,16 @@ register_server_rpcs(margo_instance_id mid) { rpc_srv_truncate); MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t, rpc_chunk_stat_out_t, rpc_srv_get_chunk_stat); + // malleability + MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::expand_start, + rpc_expand_start_in_t, rpc_err_out_t, rpc_srv_expand_start); + MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::expand_status, void, + rpc_err_out_t, rpc_srv_expand_status); + MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::expand_finalize, void, + rpc_err_out_t, rpc_srv_expand_finalize); + MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::migrate_metadata, + rpc_migrate_metadata_in_t, rpc_err_out_t, + rpc_srv_migrate_metadata); } /** @@ -235,6 +246,59 @@ init_rpc_server() { register_server_rpcs(mid); } +/** + * @brief Registers RPC handlers to a given Margo instance. + * @internal + * Registering is done by associating a Margo instance id (mid) with the RPC + * name and its handler function including defined input/out structs + * @endinternal + * @param margo_instance_id + */ +void +register_client_rpcs(margo_instance_id mid) { + RPC_DATA->rpc_client_ids().migrate_metadata_id = + MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::migrate_metadata, + rpc_migrate_metadata_in_t, rpc_err_out_t, NULL); + // this is just a write + RPC_DATA->rpc_client_ids().migrate_data_id = + MARGO_REGISTER(mid, gkfs::rpc::tag::write, rpc_write_data_in_t, + rpc_data_out_t, NULL); +} + +/** + * @brief Initializes the daemon RPC client. + * @throws std::runtime_error on failure + */ +void +init_rpc_client() { + struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER; + hg_options.auto_sm = GKFS_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE; + hg_options.stats = HG_FALSE; + if(gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->rpc_protocol()) + hg_options.na_init_info.progress_mode = NA_NO_BLOCK; + // Start Margo (this will also initialize Argobots and Mercury internally) + auto margo_config = fmt::format( + R"({{ "use_progress_thread" : true, "rpc_thread_count" : {} }})", + 0); + // auto margo_config = "{}"; + struct margo_init_info args = {nullptr}; + args.json_config = margo_config.c_str(); + args.hg_init_info = &hg_options; + auto* mid = margo_init_ext(GKFS_DATA->rpc_protocol().c_str(), + MARGO_CLIENT_MODE, &args); + + if(mid == MARGO_INSTANCE_NULL) { + throw runtime_error("Failed to initialize the Margo RPC client"); + } + + GKFS_DATA->spdlogger()->info( + "{}() RPC client initialization successful for protocol {}", + __func__, GKFS_DATA->rpc_protocol()); + + RPC_DATA->client_rpc_mid(mid); + register_client_rpcs(mid); +} + void register_proxy_server_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t, @@ -403,20 +467,6 @@ init_environment() { // init margo for proxy RPC if(!GKFS_DATA->bind_proxy_addr().empty()) { - GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ... ", - __func__); - try { - auto distributor = - std::make_shared(); - RPC_DATA->distributor(distributor); - } catch(const std::exception& e) { - GKFS_DATA->spdlogger()->error( - "{}() Failed to initialize Distributor: {}", __func__, - e.what()); - throw; - } - GKFS_DATA->spdlogger()->debug("{}() Distributed running.", __func__); - GKFS_DATA->spdlogger()->debug( "{}() Initializing proxy RPC server: '{}'", __func__, GKFS_DATA->bind_proxy_addr()); @@ -464,6 +514,48 @@ init_environment() { if(!GKFS_DATA->hosts_file().empty()) { gkfs::utils::populate_hosts_file(); } + + // Init margo client + GKFS_DATA->spdlogger()->debug("{}() Initializing RPC client: '{}'", + __func__, GKFS_DATA->rpc_protocol()); + try { + init_rpc_client(); + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to initialize RPC client: {}", __func__, e.what()); + throw; + } + GKFS_DATA->spdlogger()->debug("{}() RPC client running.", __func__); + + // Needed for client + GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ... ", + __func__); + try { + auto distributor = std::make_shared(); + RPC_DATA->distributor(distributor); + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to initialize Distributor: {}", __func__, + e.what()); + throw; + } + GKFS_DATA->spdlogger()->debug("{}() Distributed running.", __func__); + + GKFS_DATA->spdlogger()->debug("{}() Initializing MalleableManager...", + __func__); + try { + auto malleable_manager = + std::make_shared(); + GKFS_DATA->malleable_manager(malleable_manager); + + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to initialize MalleableManager: {}", __func__, + e.what()); + throw; + } + GKFS_DATA->spdlogger()->debug("{}() MalleableManager running.", __func__); + GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready."); } @@ -524,6 +616,12 @@ destroy_enviroment() { GKFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__); GKFS_DATA->close_mdb(); + if(RPC_DATA->client_rpc_mid() != nullptr) { + GKFS_DATA->spdlogger()->info("{}() Finalizing margo RPC client ...", + __func__); + margo_finalize(RPC_DATA->client_rpc_mid()); + } + // Delete rootdir/metadir if requested if(!keep_rootdir) { diff --git a/src/daemon/handler/srv_malleability.cpp b/src/daemon/handler/srv_malleability.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6a732bee728570c5f2bee4ddc312efe4fc838005 --- /dev/null +++ b/src/daemon/handler/srv_malleability.cpp @@ -0,0 +1,149 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS' POSIX interface. + + GekkoFS' POSIX interface is free software: you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GekkoFS' POSIX interface is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with GekkoFS' POSIX interface. If not, see + . + + SPDX-License-Identifier: LGPL-3.0-or-later +*/ +#include +#include +#include +#include +#include + +#include + +extern "C" { +#include +} + +using namespace std; + +namespace { + +hg_return_t +rpc_srv_expand_start(hg_handle_t handle) { + rpc_expand_start_in_t in; + rpc_err_out_t out; + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + GKFS_DATA->spdlogger()->debug( + "{}() Got RPC with old conf '{}' new conf '{}'", __func__, + in.old_server_conf, in.new_server_conf); + try { + // if maintenance mode is already set, error is thrown -- not allowed + GKFS_DATA->maintenance_mode(true); + GKFS_DATA->malleable_manager()->expand_start(in.old_server_conf, + in.new_server_conf); + out.err = 0; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to start expansion: '{}' ", + __func__, e.what()); + GKFS_DATA->maintenance_mode(false); + out.err = -1; + } + + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, + out.err); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); +} + +hg_return_t +rpc_srv_expand_status(hg_handle_t handle) { + rpc_err_out_t out; + GKFS_DATA->spdlogger()->debug("{}() Got RPC ", __func__); + try { + // return 1 if redistribution is running, 0 otherwise. + out.err = GKFS_DATA->redist_running() ? 1 : 0; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to check status for expansion: '{}'", __func__, + e.what()); + out.err = -1; + } + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, + out.err); + return gkfs::rpc::cleanup_respond(&handle, &out); +} + +hg_return_t +rpc_srv_expand_finalize(hg_handle_t handle) { + rpc_err_out_t out; + GKFS_DATA->spdlogger()->debug("{}() Got RPC ", __func__); + try { + GKFS_DATA->maintenance_mode(false); + out.err = 0; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to finalize expansion: '{}'", + __func__, e.what()); + out.err = -1; + } + + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, + out.err); + return gkfs::rpc::cleanup_respond(&handle, &out); +} + +hg_return_t +rpc_srv_migrate_metadata(hg_handle_t handle) { + rpc_migrate_metadata_in_t in{}; + rpc_err_out_t out{}; + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + GKFS_DATA->spdlogger()->debug("{}() Got RPC with key '{}' value '{}'", + __func__, in.key, in.value); + try { + // create metadentry + GKFS_DATA->mdb()->put(in.key, in.value); + out.err = 0; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to create KV entry: '{}'", + __func__, e.what()); + out.err = -1; + } + + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, + out.err); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); +} + +} // namespace + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_start) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_status) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) diff --git a/src/daemon/malleability/malleable_manager.cpp b/src/daemon/malleability/malleable_manager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..01290463a078f31ce34447f5fe0c9bbcda92061f --- /dev/null +++ b/src/daemon/malleability/malleable_manager.cpp @@ -0,0 +1,361 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +extern "C" { +#include +#include +#include +#include +} + +using namespace std; +namespace fs = std::filesystem; + +namespace gkfs::malleable { + +// TODO The following three functions are almost identical to the proxy code +// They should be moved to a common and shared between the proxy and the daemon +vector> +MalleableManager::load_hostfile(const std::string& path) { + + GKFS_DATA->spdlogger()->info("{}() Loading hosts file '{}'", __func__, + path); + + ifstream lf(path); + if(!lf) { + throw runtime_error(fmt::format("Failed to open hosts file '{}': {}", + path, strerror(errno))); + } + vector> hosts; + const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$", + regex::ECMAScript | regex::optimize); + string line; + string host; + string uri; + std::smatch match; + while(getline(lf, line)) { + // if line starts with #, it indicates the end of current FS instance + // It is therefore skipped + if(line[0] == '#') + continue; + if(!regex_match(line, match, line_re)) { + GKFS_DATA->spdlogger()->error( + "{}() Unrecognized line format: [path: '{}', line: '{}']", + path, line); + throw runtime_error( + fmt::format("unrecognized line format: '{}'", line)); + } + host = match[1]; + uri = match[2]; + hosts.emplace_back(host, uri); + } + if(hosts.empty()) { + throw runtime_error( + "Hosts file found but no suitable addresses could be extracted"); + } + // sort hosts so that data always hashes to the same place + std::sort(hosts.begin(), hosts.end()); + // remove rootdir suffix from host after sorting as no longer required + for(auto& h : hosts) { + auto idx = h.first.rfind("#"); + if(idx != string::npos) + h.first.erase(idx, h.first.length()); + } + return hosts; +} + +vector> +MalleableManager::read_hosts_file() { + auto hostfile = GKFS_DATA->hosts_file(); + GKFS_DATA->spdlogger()->info("{}() Reading hosts file...", __func__); + + vector> hosts; + try { + hosts = load_hostfile(hostfile); + } catch(const exception& e) { + auto emsg = fmt::format("Failed to load hosts file: {}", e.what()); + throw runtime_error(emsg); + } + + if(hosts.empty()) { + throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile)); + } + GKFS_DATA->spdlogger()->info("{}() Number of hosts after expansion '{}'", + __func__, hosts.size()); + return hosts; +} + +void +MalleableManager::connect_to_hosts( + const vector>& hosts) { + auto local_hostname = gkfs::rpc::get_my_hostname(true); + bool local_host_found = false; + + RPC_DATA->hosts_size(hosts.size()); + vector host_ids(hosts.size()); + // populate vector with [0, ..., host_size - 1] + ::iota(::begin(host_ids), ::end(host_ids), 0); + /* + * Shuffle hosts to balance addr lookups to all hosts + * Too many concurrent lookups send to same host + * could overwhelm the server, + * returning error when addr lookup + */ + ::random_device rd; // obtain a random number from hardware + ::mt19937 g(rd()); // seed the random generator + ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector + // lookup addresses and put abstract server addresses into rpc_addresses + for(const auto& id : host_ids) { + const auto& hostname = hosts.at(id).first; + const auto& uri = hosts.at(id).second; + + hg_addr_t svr_addr = HG_ADDR_NULL; + + // try to look up 3 times before erroring out + hg_return_t ret; + for(uint32_t i = 0; i < 4; i++) { + ret = margo_addr_lookup(RPC_DATA->client_rpc_mid(), uri.c_str(), + &svr_addr); + if(ret != HG_SUCCESS) { + // still not working after 5 tries. + if(i == 3) { + auto err_msg = + fmt::format("{}() Unable to lookup address '{}'", + __func__, uri); + throw runtime_error(err_msg); + } + // Wait a random amount of time and try again + ::mt19937 eng(rd()); // seed the random generator + ::uniform_int_distribution<> distr( + 50, 50 * (i + 2)); // define the range + ::this_thread::sleep_for(std::chrono::milliseconds(distr(eng))); + } else { + break; + } + } + if(svr_addr == HG_ADDR_NULL) { + auto err_msg = fmt::format( + "{}() looked up address is NULL for address '{}'", __func__, + uri); + throw runtime_error(err_msg); + } + RPC_DATA->rpc_endpoints().insert(make_pair(id, svr_addr)); + + if(!local_host_found && hostname == local_hostname) { + GKFS_DATA->spdlogger()->debug("{}() Found local host: {}", __func__, + hostname); + RPC_DATA->local_host_id(id); + local_host_found = true; + } + GKFS_DATA->spdlogger()->debug("{}() Found daemon: id '{}' uri '{}'", + __func__, id, uri); + } + if(!local_host_found) { + auto err_msg = fmt::format( + "{}() Local host '{}' not found in hosts file. This should not happen.", + __func__, local_hostname); + throw runtime_error(err_msg); + } +} + +int +MalleableManager::redistribute_metadata() { + uint64_t count = 0; + auto estimate_db_size = GKFS_DATA->mdb()->db_size(); + auto percent_interval = estimate_db_size / 100; + GKFS_DATA->spdlogger()->info( + "{}() Starting metadata redistribution for '{}' estimated number of KV pairs...", + __func__, estimate_db_size); + int migration_err = 0; + string key, value; + auto iter = + static_cast(GKFS_DATA->mdb()->iterate_all()); + // TODO parallelize + for(iter->SeekToFirst(); iter->Valid(); iter->Next()) { + key = iter->key().ToString(); + value = iter->value().ToString(); + if(key == "/") { + continue; + } + auto dest_id = RPC_DATA->distributor()->locate_file_metadata(key, 0); + GKFS_DATA->spdlogger()->trace( + "{}() Migration: key {} and value {}. From host {} to host {}", + __func__, key, value, RPC_DATA->local_host_id(), dest_id); + if(dest_id == RPC_DATA->local_host_id()) { + GKFS_DATA->spdlogger()->trace("{}() SKIP", __func__); + continue; + } + auto err = gkfs::malleable::rpc::forward_metadata(key, value, dest_id); + if(err != 0) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to migrate metadata for key '{}'", __func__, + key); + migration_err++; + } + GKFS_DATA->mdb()->remove(key); + count++; + if(percent_interval > 0 && count % percent_interval == 0) { + GKFS_DATA->spdlogger()->info( + "{}() Metadata migration {}%/100% completed...", __func__, + count / percent_interval); + } + } + GKFS_DATA->spdlogger()->info("{}() Metadata redistribution completed.", + __func__); + return migration_err; +} + +void +MalleableManager::redistribute_data() { + GKFS_DATA->spdlogger()->info("{}() Starting data redistribution...", + __func__); + + auto chunk_dir = fs::path(GKFS_DATA->storage()->get_chunk_directory()); + auto dir_iterator = GKFS_DATA->storage()->get_all_chunk_files(); + + // TODO this can be parallelized, e.g., async chunk I/O + for(const auto& entry : dir_iterator) { + if(!entry.is_regular_file()) { + continue; + } + // path under chunkdir as placed in the rootdir + auto rel_chunk_dir = fs::relative(entry, chunk_dir); + // chunk id from this entry used for determining destination + uint64_t chunk_id = stoul(rel_chunk_dir.filename().string()); + // mountdir gekkofs path used for determining destination + auto gkfs_path = rel_chunk_dir.parent_path().string(); + ::replace(gkfs_path.begin(), gkfs_path.end(), ':', '/'); + gkfs_path = "/" + gkfs_path; + auto dest_id = + RPC_DATA->distributor()->locate_data(gkfs_path, chunk_id, 0); + GKFS_DATA->spdlogger()->trace( + "{}() Migrating chunkfile: {} for gkfs file {} chnkid {} destid {}", + __func__, rel_chunk_dir.string(), gkfs_path, chunk_id, dest_id); + if(dest_id == RPC_DATA->local_host_id()) { + GKFS_DATA->spdlogger()->trace("{}() SKIPPERS", __func__); + continue; + } + auto fd = open(entry.path().c_str(), O_RDONLY); + if(fd < 0) { + GKFS_DATA->spdlogger()->error("{}() Failed to open chunkfile: {}", + __func__, entry.path().c_str()); + continue; + } + auto buf = new char[entry.file_size()]; + auto bytes_read = read(fd, buf, entry.file_size()); + if(bytes_read < 0) { + GKFS_DATA->spdlogger()->error("{}() Failed to read chunkfile: {}", + __func__, entry.path().c_str()); + continue; + } + auto err = gkfs::malleable::rpc::forward_data( + gkfs_path, buf, bytes_read, chunk_id, dest_id); + if(err != 0) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to migrate data for chunkfile: {}", __func__, + entry.path().c_str()); + } + close(fd); + GKFS_DATA->spdlogger()->trace( + "{}() Data migration completed for chunkfile: {}. Removing ...", + __func__, entry.path().c_str()); + // remove file after migration + auto entry_dir = entry.path().parent_path(); + try { + fs::remove(entry); + if(fs::is_empty(entry_dir)) { + fs::remove(entry_dir); + } + } catch(const fs::filesystem_error& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to remove chunkfile: {}", + __func__, entry.path().c_str()); + } + GKFS_DATA->spdlogger()->trace("{}() Done for chunkfile: {}", __func__, + entry.path().c_str()); + } + + GKFS_DATA->spdlogger()->info("{}() Data redistribution completed.", + __func__); +} + +void +MalleableManager::expand_abt(void* _arg) { + GKFS_DATA->spdlogger()->info("{}() Starting expansion process...", + __func__); + GKFS_DATA->redist_running(true); + GKFS_DATA->malleable_manager()->redistribute_metadata(); + try { + GKFS_DATA->malleable_manager()->redistribute_data(); + } catch(const gkfs::data::ChunkStorageException& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to redistribute data: '{}'", + __func__, e.what()); + } + GKFS_DATA->redist_running(false); + GKFS_DATA->spdlogger()->info( + "{}() Expansion process successfully finished.", __func__); +} + +// PUBLIC + +void +MalleableManager::expand_start(int old_server_conf, int new_server_conf) { + auto hosts = read_hosts_file(); + if(hosts.size() != static_cast(new_server_conf)) { + throw runtime_error( + fmt::format("MalleableManager::{}() Something is wrong. " + "Number of hosts in hosts file ({}) " + "does not match new server configuration ({})", + __func__, hosts.size(), new_server_conf)); + } + connect_to_hosts(hosts); + RPC_DATA->distributor()->hosts_size(hosts.size()); + auto abt_err = + ABT_thread_create(RPC_DATA->io_pool(), expand_abt, + ABT_THREAD_ATTR_NULL, nullptr, &redist_thread_); + if(abt_err != ABT_SUCCESS) { + auto err_str = fmt::format( + "MalleableManager::{}() Failed to create ABT thread with abt_err '{}'", + __func__, abt_err); + throw runtime_error(err_str); + } +} + +} // namespace gkfs::malleable diff --git a/src/daemon/malleability/rpc/forward_redistribution.cpp b/src/daemon/malleability/rpc/forward_redistribution.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9bc7d05dc62697394d06e98d1818cb12a04bb7cc --- /dev/null +++ b/src/daemon/malleability/rpc/forward_redistribution.cpp @@ -0,0 +1,158 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#include +#include +#include "common/rpc/rpc_util.hpp" + + +namespace gkfs::malleable::rpc { + +int +forward_metadata(std::string& key, std::string& value, unsigned int dest_id) { + hg_handle_t rpc_handle = nullptr; + rpc_migrate_metadata_in_t in{}; + rpc_err_out_t out{}; + int err; + // set input + in.key = key.c_str(); + in.value = value.c_str(); + // Create handle + GKFS_DATA->spdlogger()->debug("{}() Creating Margo handle ...", __func__); + auto endp = RPC_DATA->rpc_endpoints().at(dest_id); + auto ret = margo_create(RPC_DATA->client_rpc_mid(), endp, + RPC_DATA->rpc_client_ids().migrate_metadata_id, + &rpc_handle); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Critical error. Cannot create margo handle", __func__); + return EBUSY; + } + ret = margo_forward(rpc_handle, &in); + if(ret == HG_SUCCESS) { + // Get response + GKFS_DATA->spdlogger()->trace("{}() Waiting for response", __func__); + ret = margo_get_output(rpc_handle, &out); + if(ret == HG_SUCCESS) { + GKFS_DATA->spdlogger()->debug("{}() Got response success: {}", + __func__, out.err); + err = out.err; + margo_free_output(rpc_handle, &out); + } else { + // something is wrong + err = EBUSY; + GKFS_DATA->spdlogger()->error("{}() while getting rpc output", + __func__); + } + } else { + // something is wrong + err = EBUSY; + GKFS_DATA->spdlogger()->error("{}() sending rpc failed", __func__); + } + + /* clean up resources consumed by this rpc */ + margo_destroy(rpc_handle); + return err; +} + +int +forward_data(const std::string& path, void* buf, const size_t count, + const uint64_t chnk_id, const uint64_t dest_id) { + hg_handle_t rpc_handle = nullptr; + rpc_write_data_in_t in{}; + rpc_data_out_t out{}; + int err = 0; + in.path = path.c_str(); + in.offset = 0; // relative to chunkfile not gkfs file + in.host_id = dest_id; + in.host_size = RPC_DATA->distributor()->hosts_size(); + in.chunk_n = 1; + in.chunk_start = chnk_id; + in.chunk_end = chnk_id; + in.total_chunk_size = count; + std::vector write_ops_vect = {1}; + in.wbitset = gkfs::rpc::compress_bitset(write_ops_vect).c_str(); + + hg_bulk_t bulk_handle = nullptr; + // register local target buffer for bulk access + auto bulk_buf = buf; + auto size = std::make_shared(count); // XXX Why shared ptr? + auto ret = margo_bulk_create(RPC_DATA->client_rpc_mid(), 1, &bulk_buf, + size.get(), HG_BULK_READ_ONLY, &bulk_handle); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to create rpc bulk handle", + __func__); + return EBUSY; + } + in.bulk_handle = bulk_handle; + GKFS_DATA->spdlogger()->trace( + "{}() Sending non-blocking RPC to '{}': path '{}' offset '{}' chunk_n '{}' chunk_start '{}' chunk_end '{}' total_chunk_size '{}'", + __func__, dest_id, in.path, in.offset, in.chunk_n, in.chunk_start, + in.chunk_end, in.total_chunk_size); + ret = margo_create(RPC_DATA->client_rpc_mid(), + RPC_DATA->rpc_endpoints().at(dest_id), + RPC_DATA->rpc_client_ids().migrate_data_id, &rpc_handle); + if(ret != HG_SUCCESS) { + margo_destroy(rpc_handle); + margo_bulk_free(bulk_handle); + return EBUSY; + } + // Send RPC + ret = margo_forward(rpc_handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Unable to send blocking rpc for path {} and recipient {}", + __func__, path, dest_id); + margo_destroy(rpc_handle); + margo_bulk_free(bulk_handle); + return EBUSY; + } + GKFS_DATA->spdlogger()->debug("{}() '1' RPCs sent, waiting for reply ...", + __func__); + ssize_t out_size = 0; + ret = margo_get_output(rpc_handle, &out); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to get rpc output for path {} recipient {}", + __func__, path, dest_id); + err = EBUSY; + } + GKFS_DATA->spdlogger()->debug( + "{}() Got response from target '{}': err '{}' with io_size '{}'", + __func__, dest_id, out.err, out.io_size); + if(out.err != 0) + err = out.err; + else + out_size += static_cast(out.io_size); + margo_free_output(rpc_handle, &out); + margo_destroy(rpc_handle); + margo_bulk_free(bulk_handle); + return err; +} + +} // namespace gkfs::malleable::rpc \ No newline at end of file diff --git a/src/daemon/util.cpp b/src/daemon/util.cpp index 7dc39cd8eb6dc49b356ab8eba2744fcc5657b8da..e49c0aa67348d7b5124a73630f6b31cfcf0fbac9 100644 --- a/src/daemon/util.cpp +++ b/src/daemon/util.cpp @@ -31,6 +31,13 @@ #include +#include // Added for file existence check +#include // Added for sleep (if needed) +#include +#include +#include +#include + using namespace std; namespace gkfs::utils { @@ -47,18 +54,47 @@ namespace gkfs::utils { * access is simultaneous. * @endinternal */ +// void +// populate_hosts_file() { +// const auto& hosts_file = GKFS_DATA->hosts_file(); +// const auto& daemon_addr = RPC_DATA->self_addr_str(); +// const auto& proxy_addr = RPC_DATA->self_proxy_addr_str(); +// GKFS_DATA->spdlogger()->debug("{}() Populating hosts file: '{}'", +// __func__, +// hosts_file); +// ofstream lfstream(hosts_file, ios::out | ios::app); +// if(!lfstream) { +// throw runtime_error(fmt::format("Failed to open hosts file '{}': {}", +// hosts_file, strerror(errno))); +// } +// // if rootdir_suffix is used, append it to hostname +// auto hostname = +// GKFS_DATA->rootdir_suffix().empty() +// ? gkfs::rpc::get_my_hostname(true) +// : fmt::format("{}#{}", gkfs::rpc::get_my_hostname(true), +// GKFS_DATA->rootdir_suffix()); +// auto line_out = fmt::format("{} {}", hostname, daemon_addr); +// if(!proxy_addr.empty()) +// line_out = fmt::format("{} {}", line_out, proxy_addr); +// lfstream << line_out << std::endl; +// +// if(!lfstream) { +// throw runtime_error( +// fmt::format("Failed to write on hosts file '{}': {}", +// hosts_file, strerror(errno))); +// } +// lfstream.close(); +// } + + void populate_hosts_file() { const auto& hosts_file = GKFS_DATA->hosts_file(); const auto& daemon_addr = RPC_DATA->self_addr_str(); const auto& proxy_addr = RPC_DATA->self_proxy_addr_str(); + GKFS_DATA->spdlogger()->debug("{}() Populating hosts file: '{}'", __func__, hosts_file); - ofstream lfstream(hosts_file, ios::out | ios::app); - if(!lfstream) { - throw runtime_error(fmt::format("Failed to open hosts file '{}': {}", - hosts_file, strerror(errno))); - } // if rootdir_suffix is used, append it to hostname auto hostname = GKFS_DATA->rootdir_suffix().empty() @@ -68,16 +104,60 @@ populate_hosts_file() { auto line_out = fmt::format("{} {}", hostname, daemon_addr); if(!proxy_addr.empty()) line_out = fmt::format("{} {}", line_out, proxy_addr); - lfstream << line_out << std::endl; + // Constants for retry mechanism + const int MAX_RETRIES = 5; // Maximum number of retry attempts + const std::chrono::milliseconds RETRY_DELAY( + 3); // Delay between retries (in milliseconds) - if(!lfstream) { - throw runtime_error( - fmt::format("Failed to write on hosts file '{}': {}", - hosts_file, strerror(errno))); + for(int attempt = 1; attempt <= MAX_RETRIES; attempt++) { + { // New scope to close the file after each write attempt + std::ofstream lfstream(hosts_file, std::ios::out | std::ios::app); + if(!lfstream) { + throw std::runtime_error( + fmt::format("Failed to open hosts file '{}': {}", + hosts_file, strerror(errno))); + } + lfstream << line_out << std::endl; + if(!lfstream) { + throw runtime_error( + fmt::format("Failed to write on hosts file '{}': {}", + hosts_file, strerror(errno))); + } + lfstream.close(); + } // lfstream closed here + + // Check if the line is in the file + std::ifstream checkstream(hosts_file); + std::string line; + bool lineFound = false; + while(std::getline(checkstream, line)) { + if(line == line_out) { + lineFound = true; + break; + } + } + checkstream.close(); + + if(lineFound) { + GKFS_DATA->spdlogger()->debug( + "{}() Host successfully written and to hosts file", + __func__); + return; // Success, exit the function + } else { + GKFS_DATA->spdlogger()->warn( + "{}() Host not found after attempt {}, retrying...", + __func__, attempt); + std::this_thread::sleep_for(RETRY_DELAY); // Wait before retrying + } } - lfstream.close(); + + // Failed after all retries + throw std::runtime_error(fmt::format( + "Failed to write line to hosts file '{}' after {} retries", + hosts_file, MAX_RETRIES)); } + /** * @internal * This function removes the entire hosts file even if just one daemon is diff --git a/src/proxy/util.cpp b/src/proxy/util.cpp index c3f5c6045a2fef81ed8481d8b12b2ed18107a4e2..ec0e762ce86946e7948138cdf8151c81d5159612 100644 --- a/src/proxy/util.cpp +++ b/src/proxy/util.cpp @@ -53,6 +53,8 @@ load_hostfile(const std::string& lfpath) { string uri; std::smatch match; while(getline(lf, line)) { + if(line[0] == '#') + continue; if(!regex_match(line, match, line_re)) { PROXY_DATA->log()->debug( "{}() Unrecognized line format: [path: '{}', line: '{}']", diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 9c3e16519551b976c85b23601083fc1f8ee35bbe..f9d7d6b8ec7067cc0b900f6b62774af32d87d6a8 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -1,6 +1,6 @@ ################################################################################ -# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # -# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany # # # # This software was partially supported by the # # EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # @@ -35,4 +35,11 @@ if (GKFS_ENABLE_CLIENT_METRICS) ) target_link_libraries(gkfs_clientmetrics2json PUBLIC msgpack_util nlohmann_json::nlohmann_json) install(TARGETS gkfs_clientmetrics2json RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) -endif () \ No newline at end of file +endif () + +add_executable(gkfs_malleability malleability.cpp) +target_link_libraries(gkfs_malleability + PUBLIC + gkfs_user_lib + CLI11::CLI11) +install(TARGETS gkfs_malleability RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}) \ No newline at end of file diff --git a/tools/malleability.cpp b/tools/malleability.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c39ab7395a90b07e948aefe672478e348c18ffbe --- /dev/null +++ b/tools/malleability.cpp @@ -0,0 +1,155 @@ +/* + Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#include +#include +#include +#include + +#include +#include + + +using namespace std; + +struct cli_options { + bool verbose = false; + bool machine_readable = false; + string action; + string subcommand; +}; + +std::pair +get_expansion_host_num() { + // get hosts file and read how much should be expanded + auto hosts_file_path = std::getenv("LIBGKFS_HOSTS_FILE"); + if(!hosts_file_path) { + std::cerr + << "Error: LIBGKFS_HOSTS_FILE environment variable not set.\n"; + return {-1, -1}; + } + std::ifstream file(hosts_file_path); + if(!file) { + std::cerr << "Error: Unable to open file at " << hosts_file_path + << ".\n"; + return {-1, -1}; // Indicate an error + } + auto initialHostCount = 0; + auto finalHostCount = 0; + auto foundSeparator = false; + std::string line; + + while(std::getline(file, line)) { + if(line == "#FS_INSTANCE_END") { + if(foundSeparator) { + cerr << "marker was found twice. this is not allowed.\n"; + return {-1, -1}; + } + foundSeparator = true; + initialHostCount = finalHostCount; + continue; + } + if(!line.empty()) { + finalHostCount++; + } + } + if(!foundSeparator) { + initialHostCount = finalHostCount; + } + return {initialHostCount, finalHostCount}; +} + +int +main(int argc, const char* argv[]) { + CLI::App desc{"Allowed options"}; + cli_options opts; + + // Global verbose flag + desc.add_flag("--verbose,-v", opts.verbose, "Verbose output"); + desc.add_flag("--machine-readable,-m", opts.machine_readable, + "machine-readable output"); + + + auto expand_args = + desc.add_subcommand("expand", "Expansion-related actions"); + expand_args->add_option("action", opts.action, "Action to perform") + ->required() + ->check(CLI::IsMember({"start", "status", "finalize"})); + try { + desc.parse(argc, argv); + } catch(const CLI::ParseError& e) { + return desc.exit(e); + } + + if(opts.verbose) { // Check the verbose flag from the main options + std::cout << "Verbose mode is on." << std::endl; + } + int res; + gkfs_init(); + + if(opts.action == "start") { + auto [current_instance, expanded_instance] = get_expansion_host_num(); + if(current_instance == -1 || expanded_instance == -1) { + return 1; + } + res = gkfs::malleable::expand_start(current_instance, + expanded_instance); + if(res) { + cout << "Expand start failed. Exiting...\n"; + gkfs_end(); + return -1; + } else { + cout << "Expansion process from " << current_instance + << " nodes to " << expanded_instance << " nodes launched...\n"; + } + } else if(opts.action == "status") { + res = gkfs::malleable::expand_status(); + if(res > 0) { + if(opts.machine_readable) { + cout << res; + } else { + cout << "Expansion in progress: " << res + << " nodes not finished.\n"; + } + } else { + if(opts.machine_readable) { + cout << res; + } else { + cout << "No expansion running/finished.\n"; + } + } + } else if(opts.action == "finalize") { + res = gkfs::malleable::expand_finalize(); + if(opts.machine_readable) { + cout << res; + } else { + cout << "Expand finalize " << res << endl; + } + } + gkfs_end(); +} \ No newline at end of file