From c44a20c47fae747ba85e974985354f6d323a720e Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 19 May 2026 18:27:17 +0200 Subject: [PATCH 1/5] first shrink attempt --- README.md | 73 +++++++ include/client/rpc/forward_malleability.hpp | 12 ++ include/client/user_functions.hpp | 26 +++ include/common/common_defs.hpp | 3 + include/common/rpc/rpc_types_thallium.hpp | 11 ++ include/daemon/handler/rpc_defs.hpp | 10 + .../daemon/malleability/malleable_manager.hpp | 7 +- src/client/malleability.cpp | 39 ++++ src/client/rpc/forward_malleability.cpp | 149 ++++++++++++++ src/daemon/daemon.cpp | 6 + src/daemon/handler/srv_malleability.cpp | 64 ++++++ src/daemon/malleability/malleable_manager.cpp | 44 ++++- .../malleability/test_malleability_tool.py | 91 ++++++++- tools/malleability.cpp | 185 +++++++++++++++--- 14 files changed, 681 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 51b50b610..183829293 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ to I/O, which reduces interferences and improves performance. - [Server-side statistics via Prometheus](#server-side-statistics-via-prometheus) - [GekkoFS proxy](#gekkofs-proxy) - [File system expansion](#file-system-expansion) + - [File system shrinking](#file-system-shrinking) - [Miscellaneous](#miscellaneous) - [External functions](#external-functions) - [Data placement](#data-placement) @@ -508,6 +509,78 @@ srun: sending Ctrl-C to StepId=282378.2 * [gkfs] Shutdown time: 1.032 seconds ``` +## File system shrinking + +GekkoFS supports **shrinking** the current daemon configuration, removing one or more nodes from the cluster while +safely redistributing all existing data and metadata to the remaining nodes. As with expansion, it is the user's +responsibility not to access the file system during redistribution. + +The same `gkfs_malleability` tool (built with `-DGKFS_BUILD_TOOLS=ON`) is used. Shrinking requires two hostfiles: + +| File | Description | +|---|---| +| `gkfs_hosts.txt` | Current (old) hostfile — set via `LIBGKFS_HOSTS_FILE` | +| `gkfs_hosts_new.txt` | New hostfile listing **only** the surviving nodes | + +### Step-by-step + +**1. Create the new hostfile** containing only the nodes that should remain after shrink. +The format is identical to `gkfs_hosts.txt`. The order does not matter — any nodes present in the old file +but absent from the new file will be removed. + +```bash +# Example: remove node4, keep node1–node3 +grep -v node4 gkfs_hosts.txt > gkfs_hosts_new.txt +``` + +**2. Start the shrink** process. Each surviving node redistributes the data that was owned by the removed nodes, +and the removed nodes forward all their data before stopping: + +```bash +LIBGKFS_HOSTS_FILE=gkfs_hosts.txt \ + gkfs_malleability shrink --new-hosts-file gkfs_hosts_new.txt start +Shrink process from 4 nodes to 3 nodes launched... +``` + +The old and new node counts are **auto-detected** from the respective hostfiles. They can be overridden with +`--old-nodes ` and `--new-nodes ` if needed. + +**3. Poll status** until all nodes have finished: + +```bash +LIBGKFS_HOSTS_FILE=gkfs_hosts.txt gkfs_malleability shrink status +No shrink running/finished. +``` + +When active: `Shrink in progress: 2 nodes not finished.` + +**4. Finalize** the shrink. This disables maintenance mode on all remaining daemons and atomically replaces +`gkfs_hosts.txt` with `gkfs_hosts_new.txt`: + +```bash +LIBGKFS_HOSTS_FILE=gkfs_hosts.txt \ + gkfs_malleability shrink --new-hosts-file gkfs_hosts_new.txt finalize +Shrink finalize 0 +Hosts file updated: gkfs_hosts_new.txt -> gkfs_hosts.txt +``` + +After finalize, `gkfs_hosts.txt` contains only the surviving nodes and all clients automatically use the +updated configuration on their next initialization. + +**5. Shut down the removed daemons** (they have already forwarded all data but are still running): + +```bash +# Send SIGTERM to each daemon on the removed nodes +pdsh -w node4 'kill $(cat /tmp/gkfs_daemon.pid)' +``` + +### Environment variables + +| Variable | Description | +|---|---| +| `LIBGKFS_HOSTS_FILE` | Path to the **current** (old) hosts file | +| `LIBGKFS_HOSTS_FILE_NEW` | Alternative to `--new-hosts-file` for the new hosts file | + # Miscellaneous ## External functions diff --git a/include/client/rpc/forward_malleability.hpp b/include/client/rpc/forward_malleability.hpp index 3c3ce5808..d15226469 100644 --- a/include/client/rpc/forward_malleability.hpp +++ b/include/client/rpc/forward_malleability.hpp @@ -37,6 +37,8 @@ SPDX-License-Identifier: LGPL-3.0-or-later */ +#include + #ifndef GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP #define GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP @@ -50,6 +52,16 @@ forward_expand_status(); int forward_expand_finalize(); + +int +forward_shrink_start(int old_server_conf, int new_server_conf, + const std::string& new_hosts_file); + +int +forward_shrink_status(); + +int +forward_shrink_finalize(); } // namespace gkfs::malleable::rpc #endif // GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP diff --git a/include/client/user_functions.hpp b/include/client/user_functions.hpp index 29ab4a324..b505cfa5c 100644 --- a/include/client/user_functions.hpp +++ b/include/client/user_functions.hpp @@ -179,6 +179,32 @@ expand_status(); */ int expand_finalize(); + +/** + * @brief Start a shrinking of the file system + * @param old_server_conf old number of nodes + * @param new_server_conf new number of nodes + * @param new_hosts_file path to hostfile containing only the surviving nodes + * @return error code + */ +int +shrink_start(int old_server_conf, int new_server_conf, + const std::string& new_hosts_file); + +/** + * @brief Check for the current status of the shrinking process + * @return 0 when finished, positive numbers indicate how many daemons + * are still redistributing data + */ +int +shrink_status(); + +/** + * @brief Finalize the shrinking process + * @return error code + */ +int +shrink_finalize(); } // namespace malleable } // namespace gkfs diff --git a/include/common/common_defs.hpp b/include/common/common_defs.hpp index edde5cc54..d92b80c60 100644 --- a/include/common/common_defs.hpp +++ b/include/common/common_defs.hpp @@ -129,6 +129,9 @@ namespace malleable::rpc::tag { constexpr auto expand_start = "rpc_srv_expand_start"; constexpr auto expand_status = "rpc_srv_expand_status"; constexpr auto expand_finalize = "rpc_srv_expand_finalize"; +constexpr auto shrink_start = "rpc_srv_shrink_start"; +constexpr auto shrink_status = "rpc_srv_shrink_status"; +constexpr auto shrink_finalize = "rpc_srv_shrink_finalize"; // migrate data uses the write rpc constexpr auto migrate_metadata = "rpc_srv_migrate_metadata"; } // namespace malleable::rpc::tag diff --git a/include/common/rpc/rpc_types_thallium.hpp b/include/common/rpc/rpc_types_thallium.hpp index b78ea7765..95f2122b6 100644 --- a/include/common/rpc/rpc_types_thallium.hpp +++ b/include/common/rpc/rpc_types_thallium.hpp @@ -433,6 +433,17 @@ struct rpc_expand_start_in_t { } }; +struct rpc_shrink_start_in_t { + uint32_t old_server_conf; + uint32_t new_server_conf; + std::string new_hosts_file; + template + void + serialize(Archive& ar) { + ar(old_server_conf, new_server_conf, new_hosts_file); + } +}; + struct rpc_migrate_metadata_in_t { std::string key; std::string value; diff --git a/include/daemon/handler/rpc_defs.hpp b/include/daemon/handler/rpc_defs.hpp index d5c19e2b7..748d0d65a 100644 --- a/include/daemon/handler/rpc_defs.hpp +++ b/include/daemon/handler/rpc_defs.hpp @@ -143,6 +143,16 @@ rpc_srv_expand_status(const tl::request& req); void rpc_srv_expand_finalize(const tl::request& req); +void +rpc_srv_shrink_start(const tl::request& req, + const gkfs::rpc::rpc_shrink_start_in_t& in); + +void +rpc_srv_shrink_status(const tl::request& req); + +void +rpc_srv_shrink_finalize(const tl::request& req); + void rpc_srv_migrate_metadata(const tl::request& req, const gkfs::rpc::rpc_migrate_metadata_in_t& in); diff --git a/include/daemon/malleability/malleable_manager.hpp b/include/daemon/malleability/malleable_manager.hpp index 92eba3bf2..1c6b33b39 100644 --- a/include/daemon/malleability/malleable_manager.hpp +++ b/include/daemon/malleability/malleable_manager.hpp @@ -56,7 +56,8 @@ private: void connect_to_hosts( - const std::vector>& hosts); + const std::vector>& hosts, + bool is_shrink = false); int redistribute_metadata(); @@ -70,6 +71,10 @@ private: public: void expand_start(int old_server_conf, int new_server_conf); + + void + shrink_start(int old_server_conf, int new_server_conf, + const std::string& new_hosts_file); }; } // namespace gkfs::malleable diff --git a/src/client/malleability.cpp b/src/client/malleability.cpp index ca41ea3d7..aaa0378c5 100644 --- a/src/client/malleability.cpp +++ b/src/client/malleability.cpp @@ -87,4 +87,43 @@ expand_finalize() { return res; } +int +shrink_start(int old_server_conf, int new_server_conf, + const std::string& new_hosts_file) { + LOG(INFO, "{}() Shrink operation enter", __func__); + // sanity checks + if(old_server_conf == new_server_conf) { + auto err_str = + "ERR: Old server configuration is the same as the new one"; + cerr << err_str << endl; + LOG(ERROR, "{}() {}", __func__, err_str); + return -1; + } + if(CTX->hosts().size() != static_cast(old_server_conf)) { + auto err_str = + "ERR: Old server configuration does not match the number of hosts in hostsfile"; + cerr << err_str << endl; + LOG(ERROR, "{}() {}", __func__, err_str); + return -1; + } + return gkfs::malleable::rpc::forward_shrink_start( + old_server_conf, new_server_conf, new_hosts_file); +} + +int +shrink_status() { + LOG(INFO, "{}() enter", __func__); + auto res = gkfs::malleable::rpc::forward_shrink_status(); + LOG(INFO, "{}() '{}' nodes working on shrink operation.", __func__, res); + return res; +} + +int +shrink_finalize() { + LOG(INFO, "{}() enter", __func__); + auto res = gkfs::malleable::rpc::forward_shrink_finalize(); + LOG(INFO, "{}() shrink operation finalized. ", __func__); + return res; +} + } // namespace gkfs::malleable \ No newline at end of file diff --git a/src/client/rpc/forward_malleability.cpp b/src/client/rpc/forward_malleability.cpp index c1d51f3cb..0fedf4aa6 100644 --- a/src/client/rpc/forward_malleability.cpp +++ b/src/client/rpc/forward_malleability.cpp @@ -181,4 +181,153 @@ forward_expand_finalize() { return err; } +int +forward_shrink_start(int old_server_conf, int new_server_conf, + const std::string& new_hosts_file) { + LOG(INFO, "{}() enter", __func__); + const auto& targets = CTX->distributor()->locate_directory_metadata(); + + auto err = 0; + std::vector waiters; + waiters.reserve(targets.size()); + std::vector waiter_targets; + waiter_targets.reserve(targets.size()); + + // define rpc + auto shrink_start_rpc = + CTX->rpc_engine()->define(gkfs::malleable::rpc::tag::shrink_start); + + for(std::size_t i = 0; i < targets.size(); ++i) { + auto target = targets[i]; + try { + LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, target); + + gkfs::rpc::rpc_shrink_start_in_t in; + in.old_server_conf = old_server_conf; + in.new_server_conf = new_server_conf; + in.new_hosts_file = new_hosts_file; + + waiters.push_back( + shrink_start_rpc.on(CTX->hosts().at(target)).async(in)); + waiter_targets.push_back(target); + } catch(const std::exception& ex) { + LOG(ERROR, "Failed to send RPC to host {}: {}", target, ex.what()); + } + } + + LOG(INFO, "{}() send shrink_start rpc to '{}' targets", __func__, + waiters.size()); + + // wait for RPC responses + for(std::size_t i = 0; i < waiters.size(); ++i) { + try { + gkfs::rpc::rpc_err_out_t out = waiters[i].wait(); + if(out.err != 0) { + err = out.err; + } + } catch(const std::exception& ex) { + LOG(ERROR, "RPC wait failed for target {}: {}", waiter_targets[i], + ex.what()); + err = EBUSY; + } + } + return err; +} + +int +forward_shrink_status() { + LOG(INFO, "{}() enter", __func__); + const auto& targets = CTX->distributor()->locate_directory_metadata(); + + auto err = 0; + std::vector waiters; + waiters.reserve(targets.size()); + std::vector waiter_targets; + waiter_targets.reserve(targets.size()); + + auto shrink_status_rpc = + CTX->rpc_engine()->define(gkfs::malleable::rpc::tag::shrink_status); + + for(std::size_t i = 0; i < targets.size(); ++i) { + auto target = targets[i]; + try { + LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, target); + waiters.push_back( + shrink_status_rpc.on(CTX->hosts().at(target)).async()); + waiter_targets.push_back(target); + } catch(const std::exception& ex) { + LOG(ERROR, "Failed to send RPC to host {}: {}", target, ex.what()); + } + } + + LOG(INFO, "{}() send shrink_status rpc to '{}' targets", __func__, + waiters.size()); + + // wait for RPC responses + for(std::size_t i = 0; i < waiters.size(); ++i) { + try { + gkfs::rpc::rpc_err_out_t out = waiters[i].wait(); + if(out.err > 0) { + LOG(DEBUG, "{}() Host '{}' not done yet.", __func__, + waiter_targets[i]); + err += out.err; + } else if(out.err < 0) { + LOG(ERROR, "{}() Host '{}' error.", __func__, + waiter_targets[i]); + } + } catch(const std::exception& ex) { + LOG(ERROR, "RPC wait failed for target {}: {}", waiter_targets[i], + ex.what()); + err = EBUSY; + } + } + return err; +} + +int +forward_shrink_finalize() { + LOG(INFO, "{}() enter", __func__); + const auto& targets = CTX->distributor()->locate_directory_metadata(); + + auto err = 0; + std::vector waiters; + waiters.reserve(targets.size()); + std::vector waiter_targets; + waiter_targets.reserve(targets.size()); + + auto shrink_finalize_rpc = CTX->rpc_engine()->define( + gkfs::malleable::rpc::tag::shrink_finalize); + + for(std::size_t i = 0; i < targets.size(); ++i) { + auto target = targets[i]; + try { + LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, target); + waiters.push_back( + shrink_finalize_rpc.on(CTX->hosts().at(target)).async()); + waiter_targets.push_back(target); + } catch(std::exception& ex) { + LOG(ERROR, "Failed to send RPC to host {}: {}", target, ex.what()); + } + } + + LOG(INFO, "{}() send shrink_finalize rpc to '{}' targets", __func__, + waiters.size()); + + // wait for RPC responses + for(std::size_t i = 0; i < waiters.size(); ++i) { + try { + gkfs::rpc::rpc_err_out_t out = waiters[i].wait(); + if(out.err != 0) { + LOG(ERROR, "Failed finalize on host '{}'", waiter_targets[i]); + err = out.err; + } + } catch(const std::exception& ex) { + LOG(ERROR, "RPC wait failed for target {}: {}", waiter_targets[i], + ex.what()); + err = EBUSY; + } + } + return err; +} + } // namespace gkfs::malleable::rpc \ No newline at end of file diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 2fc5e21ef..450a42e55 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -221,6 +221,12 @@ register_server_rpcs(std::shared_ptr engine) { rpc_srv_expand_status); engine->define(gkfs::malleable::rpc::tag::expand_finalize, rpc_srv_expand_finalize); + engine->define(gkfs::malleable::rpc::tag::shrink_start, + rpc_srv_shrink_start); + engine->define(gkfs::malleable::rpc::tag::shrink_status, + rpc_srv_shrink_status); + engine->define(gkfs::malleable::rpc::tag::shrink_finalize, + rpc_srv_shrink_finalize); engine->define(gkfs::malleable::rpc::tag::migrate_metadata, rpc_srv_migrate_metadata); } diff --git a/src/daemon/handler/srv_malleability.cpp b/src/daemon/handler/srv_malleability.cpp index 7e68c2704..88bae039c 100644 --- a/src/daemon/handler/srv_malleability.cpp +++ b/src/daemon/handler/srv_malleability.cpp @@ -112,6 +112,67 @@ rpc_srv_expand_finalize(const tl::request& req) { req.respond(out); } +void +rpc_srv_shrink_start(const tl::request& req, + const gkfs::rpc::rpc_shrink_start_in_t& in) { + gkfs::rpc::rpc_err_out_t out; + + GKFS_DATA->spdlogger()->debug( + "{}() Got RPC with old conf '{}' new conf '{}' new_hosts_file '{}'", + __func__, in.old_server_conf, in.new_server_conf, + in.new_hosts_file); + try { + GKFS_DATA->maintenance_mode(true); + GKFS_DATA->malleable_manager()->shrink_start( + in.old_server_conf, in.new_server_conf, in.new_hosts_file); + out.err = 0; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to start shrink: '{}' ", + __func__, e.what()); + GKFS_DATA->maintenance_mode(false); + out.err = -1; + } + + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, + out.err); + req.respond(out); +} + +void +rpc_srv_shrink_status(const tl::request& req) { + gkfs::rpc::rpc_err_out_t out; + GKFS_DATA->spdlogger()->debug("{}() Got RPC ", __func__); + try { + out.err = GKFS_DATA->redist_running() ? 1 : 0; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to check status for shrink: '{}'", __func__, + e.what()); + out.err = -1; + } + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, + out.err); + req.respond(out); +} + +void +rpc_srv_shrink_finalize(const tl::request& req) { + gkfs::rpc::rpc_err_out_t out; + GKFS_DATA->spdlogger()->debug("{}() Got RPC ", __func__); + try { + GKFS_DATA->maintenance_mode(false); + out.err = 0; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to finalize shrink: '{}'", + __func__, e.what()); + out.err = -1; + } + + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, + out.err); + req.respond(out); +} + void rpc_srv_migrate_metadata(const tl::request& req, const gkfs::rpc::rpc_migrate_metadata_in_t& in) { @@ -139,4 +200,7 @@ rpc_srv_migrate_metadata(const tl::request& req, // DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_start) // DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_status) // DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) +// DEFINE_MARGO_RPC_HANDLER(rpc_srv_shrink_start) +// DEFINE_MARGO_RPC_HANDLER(rpc_srv_shrink_status) +// DEFINE_MARGO_RPC_HANDLER(rpc_srv_shrink_finalize) // DEFINE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) diff --git a/src/daemon/malleability/malleable_manager.cpp b/src/daemon/malleability/malleable_manager.cpp index dba18d339..51e2b6b9a 100644 --- a/src/daemon/malleability/malleable_manager.cpp +++ b/src/daemon/malleability/malleable_manager.cpp @@ -137,7 +137,8 @@ MalleableManager::read_hosts_file() { void MalleableManager::connect_to_hosts( - const vector>& hosts) { + const vector>& hosts, + bool is_shrink) { auto local_hostname = gkfs::rpc::get_my_hostname(true); bool local_host_found = false; @@ -191,10 +192,17 @@ MalleableManager::connect_to_hosts( __func__, id, uri); } if(!local_host_found) { - auto err_msg = fmt::format( - "{}() Local host '{}' not found in hosts file. This should not happen.", - __func__, local_hostname); - throw runtime_error(err_msg); + if(is_shrink) { + GKFS_DATA->spdlogger()->info( + "{}() Local host '{}' not found in hosts file. This node is being removed.", + __func__, local_hostname); + RPC_DATA->local_host_id(std::numeric_limits::max()); + } else { + auto err_msg = fmt::format( + "{}() Local host '{}' not found in hosts file. This should not happen.", + __func__, local_hostname); + throw runtime_error(err_msg); + } } } @@ -359,4 +367,30 @@ MalleableManager::expand_start(int old_server_conf, int new_server_conf) { } } +void +MalleableManager::shrink_start(int old_server_conf, int new_server_conf, + const std::string& new_hosts_file) { + GKFS_DATA->spdlogger()->info("{}() Loading new hosts file '{}' for shrink", + __func__, new_hosts_file); + auto hosts = load_hostfile(new_hosts_file); + if(hosts.size() != static_cast(new_server_conf)) { + throw runtime_error( + fmt::format("MalleableManager::{}() Something is wrong. " + "Number of hosts in new hosts file ({}) " + "does not match new server configuration ({})", + __func__, hosts.size(), new_server_conf)); + } + connect_to_hosts(hosts, true); + RPC_DATA->distributor()->hosts_size(hosts.size()); + auto abt_err = + ABT_thread_create(RPC_DATA->io_pool(), expand_abt, + ABT_THREAD_ATTR_NULL, nullptr, &redist_thread_); + if(abt_err != ABT_SUCCESS) { + auto err_str = fmt::format( + "MalleableManager::{}() Failed to create ABT thread with abt_err '{}'", + __func__, abt_err); + throw runtime_error(err_str); + } +} + } // namespace gkfs::malleable diff --git a/tests/integration/malleability/test_malleability_tool.py b/tests/integration/malleability/test_malleability_tool.py index ee7a2b767..7e91822f8 100644 --- a/tests/integration/malleability/test_malleability_tool.py +++ b/tests/integration/malleability/test_malleability_tool.py @@ -105,5 +105,92 @@ def test_malleability_failures(gkfwd_daemon_factory, gkfs_client, gkfs_shell): assert cmd.exit_code != 0 assert cmd.stderr.decode() == "ERR: Old server configuration is the same as the new one\n" d00.shutdown() - - \ No newline at end of file + +def test_shrink_malleability(gkfwd_daemon_factory, gkfs_client, gkfs_shell): + """Test that shrinking from 2 nodes to 1 redistributes data correctly.""" + import time + d00 = gkfwd_daemon_factory.create() + d01 = gkfwd_daemon_factory.create() + + time.sleep(5) + + # Create several files across the 2-node cluster so that chunks land on both nodes + for i in range(8): + f = d00.mountdir / f"shrink_file_{i}" + ret = gkfs_client.open(f, os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1, f"open failed for {f}" + ret = gkfs_client.write_validate(f, 1024 * 1024) + assert ret.retval == 0, f"write_validate failed for {f}" + + # Build gkfs_hosts_new.txt containing only d00 (remove d01). + old_hostfile = Path(d00.hostfile) + new_hostfile = old_hostfile.parent / "gkfs_hosts_new.txt" + with open(old_hostfile) as hf_in, open(new_hostfile, "w") as hf_out: + for line in hf_in: + # Keep lines that belong to d00; skip d01 and comment lines + if d00.hostname in line and not line.startswith("#"): + hf_out.write(line) + assert new_hostfile.exists(), "New hosts file was not created" + + libdirs = gkfs_shell._patched_env["LD_LIBRARY_PATH"] + search_path = ":".join(str(p) for p in gkfs_shell._search_paths) + malleability_bin = shutil.which("gkfs_malleability", path=search_path) + + # Check status before starting (should report nothing running) + cmd_str = ( + f"LD_LIBRARY_PATH={libdirs} " + f"LIBGKFS_HOSTS_FILE={old_hostfile} " + f"{malleability_bin} shrink --new-hosts-file {new_hostfile} status" + ) + cmd = gkfs_shell.script(cmd_str, intercept_shell=False) + assert cmd.exit_code == 0 + assert "No shrink running/finished." in cmd.stderr.decode() + + # Start shrink; node counts are auto-detected from both hostfiles + cmd_str = ( + f"LD_LIBRARY_PATH={libdirs} " + f"LIBGKFS_HOSTS_FILE={old_hostfile} " + f"{malleability_bin} shrink --new-hosts-file {new_hostfile} start" + ) + cmd = gkfs_shell.script(cmd_str, intercept_shell=False, timeout=340) + assert cmd.exit_code == 0, f"shrink start failed: {cmd.stderr.decode()}" + assert "Shrink process from 2 nodes to 1 nodes launched" in cmd.stderr.decode() + + # Wait for redistribution to complete (poll with timeout) + deadline = time.time() + 120 + while time.time() < deadline: + cmd_str = ( + f"LD_LIBRARY_PATH={libdirs} " + f"LIBGKFS_HOSTS_FILE={old_hostfile} " + f"{malleability_bin} shrink --new-hosts-file {new_hostfile} status" + ) + cmd = gkfs_shell.script(cmd_str, intercept_shell=False) + if "No shrink running/finished." in cmd.stderr.decode(): + break + time.sleep(2) + else: + pytest.fail("Shrink redistribution did not finish within 120 s") + + # Finalize: gkfs_hosts_new.txt is renamed to gkfs_hosts.txt (old_hostfile) + cmd_str = ( + f"LD_LIBRARY_PATH={libdirs} " + f"LIBGKFS_HOSTS_FILE={old_hostfile} " + f"{malleability_bin} shrink --new-hosts-file {new_hostfile} finalize" + ) + cmd = gkfs_shell.script(cmd_str, intercept_shell=False) + assert cmd.exit_code == 0, f"shrink finalize failed: {cmd.stderr.decode()}" + assert "Hosts file updated" in cmd.stderr.decode() + + # Postcondition: new file is gone, old path now holds the new config + assert not new_hostfile.exists(), "gkfs_hosts_new.txt should have been renamed" + assert old_hostfile.exists(), "hosts file should still exist with updated content" + + # Verify all data is still accessible on the surviving node d00 + for i in range(8): + f = d00.mountdir / f"shrink_file_{i}" + ret = gkfs_client.stat(f) + assert ret.retval == 0, f"stat failed for {f} after shrink" + + d00.shutdown() + d01.shutdown() diff --git a/tools/malleability.cpp b/tools/malleability.cpp index 175bf7d9b..aaccf55b3 100644 --- a/tools/malleability.cpp +++ b/tools/malleability.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -9,6 +10,7 @@ using namespace std; +namespace fs = std::filesystem; struct cli_options { bool verbose = false; @@ -57,6 +59,26 @@ get_expansion_host_num() { return {initialHostCount, finalHostCount}; } +/** + * Count non-empty, non-comment lines in a hosts file. + * Returns -1 on error. + */ +int +count_hosts_in_file(const std::string& path) { + std::ifstream f(path); + if(!f) { + cerr << "Error: Unable to open file at " << path << ".\n"; + return -1; + } + int count = 0; + std::string line; + while(std::getline(f, line)) { + if(!line.empty() && line[0] != '#') + count++; + } + return count; +} + int main(int argc, const char* argv[]) { CLI::App desc{"Allowed options"}; @@ -73,57 +95,158 @@ main(int argc, const char* argv[]) { expand_args->add_option("action", opts.action, "Action to perform") ->required() ->check(CLI::IsMember({"start", "status", "finalize"})); + + int old_nodes = -1; + int new_nodes = -1; + string new_hosts_file; + auto shrink_args = + desc.add_subcommand("shrink", "Shrink-related actions"); + shrink_args->add_option("action", opts.action, "Action to perform") + ->required() + ->check(CLI::IsMember({"start", "status", "finalize"})); + shrink_args + ->add_option("--new-hosts-file", new_hosts_file, + "Path to new hosts file listing only the surviving " + "nodes (e.g. gkfs_hosts_new.txt)") + ->envname("LIBGKFS_HOSTS_FILE_NEW"); + shrink_args->add_option("--old-nodes", old_nodes, + "Old number of nodes (auto-detected from " + "LIBGKFS_HOSTS_FILE if not set)"); + shrink_args->add_option("--new-nodes", new_nodes, + "New number of nodes (auto-detected from " + "--new-hosts-file if not set)"); + try { desc.parse(argc, argv); } catch(const CLI::ParseError& e) { return desc.exit(e); } - if(opts.verbose) { // Check the verbose flag from the main options + if(opts.verbose) { std::cout << "Verbose mode is on." << std::endl; } int res; gkfs_init(); - if(opts.action == "start") { - auto [current_instance, expanded_instance] = get_expansion_host_num(); - if(current_instance == -1 || expanded_instance == -1) { - return 1; - } - res = gkfs::malleable::expand_start(current_instance, - expanded_instance); - if(res) { - cout << "Expand start failed. Exiting...\n"; - gkfs_end(); - cout.flush(); - return -1; - } else { - cerr << "Expansion process from " << current_instance - << " nodes to " << expanded_instance << " nodes launched...\n"; - } - } else if(opts.action == "status") { - res = gkfs::malleable::expand_status(); - if(res > 0) { + if(expand_args->parsed()) { + if(opts.action == "start") { + auto [current_instance, expanded_instance] = + get_expansion_host_num(); + if(current_instance == -1 || expanded_instance == -1) { + return 1; + } + res = gkfs::malleable::expand_start(current_instance, + expanded_instance); + if(res) { + cout << "Expand start failed. Exiting...\n"; + gkfs_end(); + cout.flush(); + return -1; + } else { + cerr << "Expansion process from " << current_instance + << " nodes to " << expanded_instance + << " nodes launched...\n"; + } + } else if(opts.action == "status") { + res = gkfs::malleable::expand_status(); + if(res > 0) { + if(opts.machine_readable) { + cerr << res; + } else { + cerr << "Expansion in progress: " << res + << " nodes not finished.\n"; + } + } else { + if(opts.machine_readable) { + cerr << res; + } else { + cerr << "No expansion running/finished.\n"; + } + } + } else if(opts.action == "finalize") { + res = gkfs::malleable::expand_finalize(); if(opts.machine_readable) { cerr << res; } else { - cerr << "Expansion in progress: " << res - << " nodes not finished.\n"; + cerr << "Expand finalize " << res << endl; + } + } + } else if(shrink_args->parsed()) { + if(opts.action == "start") { + // new_hosts_file is required for start + if(new_hosts_file.empty()) { + cerr << "Error: --new-hosts-file (or LIBGKFS_HOSTS_FILE_NEW) " + "is required for shrink start.\n"; + return 1; + } + // Auto-detect old_nodes from LIBGKFS_HOSTS_FILE if not given + if(old_nodes == -1) { + auto hf = std::getenv("LIBGKFS_HOSTS_FILE"); + if(!hf) { + cerr << "Error: --old-nodes not set and LIBGKFS_HOSTS_FILE " + "is not available.\n"; + return 1; + } + old_nodes = count_hosts_in_file(hf); + if(old_nodes < 0) + return 1; + } + // Auto-detect new_nodes from new_hosts_file if not given + if(new_nodes == -1) { + new_nodes = count_hosts_in_file(new_hosts_file); + if(new_nodes < 0) + return 1; + } + res = gkfs::malleable::shrink_start(old_nodes, new_nodes, + new_hosts_file); + if(res) { + cout << "Shrink start failed. Exiting...\n"; + gkfs_end(); + cout.flush(); + return -1; + } else { + cerr << "Shrink process from " << old_nodes << " nodes to " + << new_nodes << " nodes launched...\n"; + } + } else if(opts.action == "status") { + res = gkfs::malleable::shrink_status(); + if(res > 0) { + if(opts.machine_readable) { + cerr << res; + } else { + cerr << "Shrink in progress: " << res + << " nodes not finished.\n"; + } + } else { + if(opts.machine_readable) { + cerr << res; + } else { + cerr << "No shrink running/finished.\n"; + } } - } else { + } else if(opts.action == "finalize") { + res = gkfs::malleable::shrink_finalize(); if(opts.machine_readable) { cerr << res; } else { - cerr << "No expansion running/finished.\n"; + cerr << "Shrink finalize " << res << endl; + } + // On success: move gkfs_hosts_new.txt -> gkfs_hosts.txt + if(res == 0 && !new_hosts_file.empty()) { + auto hf = std::getenv("LIBGKFS_HOSTS_FILE"); + if(hf) { + try { + fs::rename(new_hosts_file, hf); + cerr << "Hosts file updated: " << new_hosts_file + << " -> " << hf << "\n"; + } catch(const fs::filesystem_error& e) { + cerr << "Warning: Failed to rename hosts file: " + << e.what() << "\n"; + } + } } - } - } else if(opts.action == "finalize") { - res = gkfs::malleable::expand_finalize(); - if(opts.machine_readable) { - cerr << res; - } else { - cerr << "Expand finalize " << res << endl; } } + return 0; } \ No newline at end of file -- GitLab From c42df28841f34018b307ac4028aec927ff7467f9 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 20 May 2026 07:19:07 +0200 Subject: [PATCH 2/5] add hostname --- tests/integration/harness/gkfs.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/integration/harness/gkfs.py b/tests/integration/harness/gkfs.py index 947c93626..86decfa34 100644 --- a/tests/integration/harness/gkfs.py +++ b/tests/integration/harness/gkfs.py @@ -1532,6 +1532,17 @@ class FwdDaemon: def hostfile(self): return self._hostfile + @property + def address(self): + return self._address + + @property + def hostname(self): + """The network address this daemon registered with (e.g. 'lo:54321'). + Useful for filtering hosts-file lines that belong to this daemon. + """ + return self._address + class FwdClient: """ A class to represent a GekkoFS client process with a patched LD_PRELOAD. -- GitLab From af2d5a97035bcad93ab29f82b14d9c0660f138b5 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 20 May 2026 07:40:53 +0200 Subject: [PATCH 3/5] port issue --- tests/integration/malleability/test_malleability_tool.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/integration/malleability/test_malleability_tool.py b/tests/integration/malleability/test_malleability_tool.py index 7e91822f8..8f3a3296e 100644 --- a/tests/integration/malleability/test_malleability_tool.py +++ b/tests/integration/malleability/test_malleability_tool.py @@ -124,14 +124,21 @@ def test_shrink_malleability(gkfwd_daemon_factory, gkfs_client, gkfs_shell): assert ret.retval == 0, f"write_validate failed for {f}" # Build gkfs_hosts_new.txt containing only d00 (remove d01). + # d00.address is "iface:PORT" (e.g. "lo:15680"); extract the port and + # match it against the URI written in the hosts file + # ("ofi+sockets://127.0.0.1:PORT ..."). Each daemon gets a unique + # ephemeral port so this is a reliable discriminator. old_hostfile = Path(d00.hostfile) new_hostfile = old_hostfile.parent / "gkfs_hosts_new.txt" + d00_port = d00.address.split(":")[-1] with open(old_hostfile) as hf_in, open(new_hostfile, "w") as hf_out: for line in hf_in: # Keep lines that belong to d00; skip d01 and comment lines - if d00.hostname in line and not line.startswith("#"): + if not line.startswith("#") and f":{d00_port}" in line: hf_out.write(line) assert new_hostfile.exists(), "New hosts file was not created" + assert new_hostfile.stat().st_size > 0, \ + f"New hosts file is empty; d00 port '{d00_port}' not found in {old_hostfile}" libdirs = gkfs_shell._patched_env["LD_LIBRARY_PATH"] search_path = ":".join(str(p) for p in gkfs_shell._search_paths) -- GitLab From 703a34e01de359409608b5cf00370292cdc53f9d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 20 May 2026 12:11:11 +0200 Subject: [PATCH 4/5] workaround --- src/daemon/malleability/malleable_manager.cpp | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/daemon/malleability/malleable_manager.cpp b/src/daemon/malleability/malleable_manager.cpp index 51e2b6b9a..c3bd3d546 100644 --- a/src/daemon/malleability/malleable_manager.cpp +++ b/src/daemon/malleability/malleable_manager.cpp @@ -140,6 +140,10 @@ MalleableManager::connect_to_hosts( const vector>& hosts, bool is_shrink) { auto local_hostname = gkfs::rpc::get_my_hostname(true); + // Use the daemon's own RPC URI as the primary identifier; it is always + // unique even when multiple daemons share the same hostname (e.g. single + // machine deployments without a rootdir_suffix). + const auto& local_uri = RPC_DATA->self_addr_str(); bool local_host_found = false; RPC_DATA->hosts_size(hosts.size()); @@ -182,9 +186,13 @@ MalleableManager::connect_to_hosts( } } - if(!local_host_found && hostname == local_hostname) { - GKFS_DATA->spdlogger()->debug("{}() Found local host: {}", __func__, - hostname); + // Prefer URI match: it is unique even when multiple daemons share a + // hostname (single-machine deployments without --rootdir-suffix). + // Fall back to hostname match for backward compatibility. + if(!local_host_found && + (!local_uri.empty() ? (uri == local_uri) : (hostname == local_hostname))) { + GKFS_DATA->spdlogger()->debug("{}() Found local host: {} (uri: {})", + __func__, hostname, uri); RPC_DATA->local_host_id(id); local_host_found = true; } @@ -194,13 +202,13 @@ MalleableManager::connect_to_hosts( if(!local_host_found) { if(is_shrink) { GKFS_DATA->spdlogger()->info( - "{}() Local host '{}' not found in hosts file. This node is being removed.", - __func__, local_hostname); + "{}() Local host '{}' (uri: '{}') not found in new hosts file. This node is being removed.", + __func__, local_hostname, local_uri); RPC_DATA->local_host_id(std::numeric_limits::max()); } else { auto err_msg = fmt::format( - "{}() Local host '{}' not found in hosts file. This should not happen.", - __func__, local_hostname); + "{}() Local host '{}' (uri: '{}') not found in hosts file. This should not happen.", + __func__, local_hostname, local_uri); throw runtime_error(err_msg); } } -- GitLab From 410556ee5ab844ea0cb13924f7c7d6548e89f74a Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 10 Jun 2026 14:38:15 +0200 Subject: [PATCH 5/5] changelog updated --- CHANGELOG.md | 1 + src/daemon/malleability/malleable_manager.cpp | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e31396da..ab565a450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Fork support in the client library ([!299](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/299)) - Use LIBGKFS_ENABLE_FORK=1 to enable fork support in the client library. - This is used for example in DLIO. + - Shrink capabilities added ([!304](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/304)) diff --git a/src/daemon/malleability/malleable_manager.cpp b/src/daemon/malleability/malleable_manager.cpp index c3bd3d546..48b18b039 100644 --- a/src/daemon/malleability/malleable_manager.cpp +++ b/src/daemon/malleability/malleable_manager.cpp @@ -190,7 +190,8 @@ MalleableManager::connect_to_hosts( // hostname (single-machine deployments without --rootdir-suffix). // Fall back to hostname match for backward compatibility. if(!local_host_found && - (!local_uri.empty() ? (uri == local_uri) : (hostname == local_hostname))) { + (!local_uri.empty() ? (uri == local_uri) + : (hostname == local_hostname))) { GKFS_DATA->spdlogger()->debug("{}() Found local host: {} (uri: {})", __func__, hostname, uri); RPC_DATA->local_host_id(id); -- GitLab