From 9f9520bafc0f9b913a4ea56f408e46c966242ca2 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 21 Nov 2025 13:49:12 +0100 Subject: [PATCH 01/18] first implementation --- include/client/rpc/forward_metadata.hpp | 28 +++++ include/client/rpc/rpc_types.hpp | 142 +++++++++++++++++++++ include/common/common_defs.hpp | 4 + include/common/metadata.hpp | 7 +- include/common/rpc/rpc_types.hpp | 14 +++ include/config.hpp | 2 + include/daemon/backend/metadata/merge.hpp | 32 ++++- include/daemon/handler/rpc_defs.hpp | 4 + src/client/gkfs_functions.cpp | 42 +++++++ src/client/rpc/forward_metadata.cpp | 66 ++++++++++ src/client/rpc/rpc_types.cpp | 4 + src/common/metadata.cpp | 20 +++ src/daemon/backend/metadata/merge.cpp | 65 ++++++++++ src/daemon/daemon.cpp | 4 + src/daemon/handler/srv_metadata.cpp | 145 +++++++++++++++++++++- 15 files changed, 576 insertions(+), 3 deletions(-) diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index 7ab81e62b..341aa671a 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -105,6 +105,34 @@ forward_mk_symlink(const std::string& path, const std::string& target_path); #endif +/** + * @brief Send an RPC request to write a small amount of data directly + * to the metadata server (inline). + * + * @param path The file path. + * @param buf Pointer to the data buffer. + * @param offset The file offset. + * @param write_size The number of bytes to write. + * @return int Error code (0 on success). + */ +int +forward_write_inline(const std::string& path, const void* buf, off64_t offset, + size_t write_size); + +/** + * @brief Send an RPC request to read a small amount of data directly + * from the metadata server (inline). + * + * @param path The file path. + * @param buf Pointer to the destination buffer. + * @param offset The file offset. + * @param read_size The number of bytes to read. + * @return std::pair Error code and bytes read. + */ +std::pair +forward_read_inline(const std::string& path, void* buf, off64_t offset, + size_t read_size); + } // namespace rpc } // namespace gkfs diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index d093f4455..42be849da 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -2452,6 +2452,146 @@ struct chunk_stat { }; }; +struct write_data_inline { + class input; + class output; + + using self_type = write_data_inline; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = rpc_write_inline_in_t; + using mercury_output_type = rpc_err_out_t; + + constexpr static const uint64_t public_id = 60; // Unique ID + constexpr static const hg_id_t mercury_id = 0; + constexpr static const auto name = gkfs::rpc::tag::write_data_inline; + constexpr static const auto requires_response = true; + constexpr static const auto mercury_in_proc_cb = + HG_GEN_PROC_NAME(rpc_write_inline_in_t); + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_err_out_t); + + class input { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input(const std::string& path, uint64_t offset, const std::string& data, + uint64_t count) + : m_path(path), m_offset(offset), m_data(data), m_count(count) {} + + explicit input(const rpc_write_inline_in_t& other) + : m_path(other.path), m_offset(other.offset), m_data(other.data), + m_count(other.count) {} + + explicit + operator rpc_write_inline_in_t() { + return {m_path.c_str(), m_offset, m_data.c_str(), m_count}; + } + + private: + std::string m_path; + uint64_t m_offset; + std::string m_data; + uint64_t m_count; + }; + + class output { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err(0) {} + explicit output(const rpc_err_out_t& out) : m_err(out.err) {} + int32_t + err() const { + return m_err; + } + + private: + int32_t m_err; + }; +}; + +//============================================================================== +// definitions for read_data_inline +struct read_data_inline { + class input; + class output; + + using self_type = read_data_inline; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = rpc_read_inline_in_t; + using mercury_output_type = rpc_read_inline_out_t; + + constexpr static const uint64_t public_id = 61; // Unique ID + constexpr static const hg_id_t mercury_id = 0; + constexpr static const auto name = gkfs::rpc::tag::read_data_inline; + constexpr static const auto requires_response = true; + constexpr static const auto mercury_in_proc_cb = + HG_GEN_PROC_NAME(rpc_read_inline_in_t); + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_read_inline_out_t); + + class input { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input(const std::string& path, uint64_t offset, uint64_t count) + : m_path(path), m_offset(offset), m_count(count) {} + + explicit input(const rpc_read_inline_in_t& other) + : m_path(other.path), m_offset(other.offset), m_count(other.count) { + } + + explicit + operator rpc_read_inline_in_t() { + return {m_path.c_str(), m_offset, m_count}; + } + + private: + std::string m_path; + uint64_t m_offset; + uint64_t m_count; + }; + + class output { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err(0), m_count(0) {} + explicit output(const rpc_read_inline_out_t& out) + : m_err(out.err), m_data(out.data ? out.data : ""), + m_count(out.count) {} + + int32_t + err() const { + return m_err; + } + const std::string& + data() const { + return m_data; + } + uint64_t + count() const { + return m_count; + } + + private: + int32_t m_err; + std::string m_data; + uint64_t m_count; + }; +}; //============================================================================== // definitions for write_data struct write_data_proxy { @@ -4180,6 +4320,8 @@ struct expand_finalize { }; } // namespace malleable::rpc + + } // namespace gkfs diff --git a/include/common/common_defs.hpp b/include/common/common_defs.hpp index 70e50ca3a..619492f91 100644 --- a/include/common/common_defs.hpp +++ b/include/common/common_defs.hpp @@ -97,6 +97,10 @@ constexpr auto client_proxy_get_dirents_extended = constexpr auto proxy_daemon_write = "proxy_daemon_rpc_srv_write_data"; constexpr auto proxy_daemon_read = "proxy_daemon_rpc_srv_read_data"; +// inline data operations +constexpr auto write_data_inline = "rpc_srv_write_data_inline"; +constexpr auto read_data_inline = "rpc_srv_read_data_inline"; + } // namespace tag namespace protocol { diff --git a/include/common/metadata.hpp b/include/common/metadata.hpp index 714592208..923beec3f 100644 --- a/include/common/metadata.hpp +++ b/include/common/metadata.hpp @@ -72,7 +72,7 @@ private: // renamed path #endif #endif - + std::string inline_data_; void init_time(); @@ -163,6 +163,11 @@ public: #endif // HAS_RENAME #endif // HAS_SYMLINKS + + std::string + inline_data() const; + void + inline_data(const std::string& data); }; } // namespace gkfs::metadata diff --git a/include/common/rpc/rpc_types.hpp b/include/common/rpc/rpc_types.hpp index dd1f6f091..f6b18538f 100644 --- a/include/common/rpc/rpc_types.hpp +++ b/include/common/rpc/rpc_types.hpp @@ -190,4 +190,18 @@ MERCURY_GEN_PROC(rpc_expand_start_in_t, MERCURY_GEN_PROC(rpc_migrate_metadata_in_t, ((hg_const_string_t) (key))((hg_const_string_t) (value))) +// Inline write operations +MERCURY_GEN_PROC(rpc_write_inline_in_t, + ((hg_const_string_t) (path))((hg_uint64_t) (offset))( + (hg_const_string_t) (data))((hg_uint64_t) (count))) + +// Input: path, offset, read length +MERCURY_GEN_PROC(rpc_read_inline_in_t, + ((hg_const_string_t) (path))((hg_uint64_t) (offset))( + (hg_uint64_t) (count))) + +// Output: err, data buffer, bytes read +MERCURY_GEN_PROC( + rpc_read_inline_out_t, + ((hg_int32_t) (err))((hg_const_string_t) (data))((hg_uint64_t) (count))) #endif // LFS_RPC_TYPES_HPP diff --git a/include/config.hpp b/include/config.hpp index 6f2f7cfb5..3ca38a0c5 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -134,6 +134,8 @@ constexpr auto implicit_data_removal = true; // Check for existence of file metadata before create. This done on RocksDB // level constexpr auto create_exist_check = true; +constexpr auto use_inline_data = true; +constexpr auto inline_data_size = 4096; // in bytes } // namespace metadata namespace data { // directory name below rootdir where chunks are placed diff --git a/include/daemon/backend/metadata/merge.hpp b/include/daemon/backend/metadata/merge.hpp index 73d0f71b9..1d2b66b82 100644 --- a/include/daemon/backend/metadata/merge.hpp +++ b/include/daemon/backend/metadata/merge.hpp @@ -56,7 +56,8 @@ enum class OperandID : char { increase_size = 'i', decrease_size = 'd', create = 'c', - update_time = 't' + update_time = 't', + write_inline = 'w' }; @@ -244,6 +245,35 @@ public: AllowSingleOperand() const override; }; +class WriteInlineOperand : public MergeOperand { +private: + constexpr const static char serialize_sep = ':'; + + size_t offset_; + std::string data_; + +public: + WriteInlineOperand(size_t offset, const std::string& data); + + explicit WriteInlineOperand(const rdb::Slice& serialized_op); + + OperandID + id() const override; + + std::string + serialize_params() const override; + + size_t + offset() const { + return offset_; + } + + const std::string& + data() const { + return data_; + } +}; + } // namespace gkfs::metadata #endif // DB_MERGE_HPP diff --git a/include/daemon/handler/rpc_defs.hpp b/include/daemon/handler/rpc_defs.hpp index c79734d21..de6f84f27 100644 --- a/include/daemon/handler/rpc_defs.hpp +++ b/include/daemon/handler/rpc_defs.hpp @@ -107,5 +107,9 @@ DECLARE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) DECLARE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) +// inline data operations +DECLARE_MARGO_RPC_HANDLER(rpc_srv_write_data_inline) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_read_data_inline) #endif // GKFS_DAEMON_RPC_DEFS_HPP diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 962dee8ec..36058e59b 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -1060,6 +1060,23 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, auto num_replicas = CTX->get_replicas(); LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'", __func__, *path, count, offset, is_append); + + + // OPTIMIZATION: Inline Write + if(gkfs::config::metadata::use_inline_data && offset == 0 && + count <= gkfs::config::metadata::inline_data_size) { + + // Attempt inline write via Metadata RPC + auto err = gkfs::rpc::forward_write_inline(file.path(), buf, offset, + count); + + if(err == 0) { + if(update_pos) + file.pos(offset + count); + return count; + } + } + if(CTX->use_write_size_cache() && !is_append) { auto [size_update_cnt, cached_size] = CTX->write_size_cache()->record(*path, offset + count); @@ -1289,6 +1306,31 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count, memset(buf, 0, sizeof(char) * count); } + if(gkfs::config::metadata::use_inline_data && + (offset + count) <= gkfs::config::metadata::inline_data_size) { + + // Forward the read request to the Metadata Server instead of Data + // Server + auto ret = + gkfs::rpc::forward_read_inline(file.path(), buf, offset, count); + auto err = ret.first; + + if(err == 0) { + // Success, return the number of bytes read + return ret.second; + } + + if(err != EAGAIN) { + errno = err; + return -1; + } + // If err == EAGAIN, it means the file exists but data is not inline + // (it migrated to chunks), so proceed to standard read below. + LOG(DEBUG, + "{}() Inline read missed (EAGAIN), falling back to chunk read", + __func__); + } + pair ret; if(gkfs::config::proxy::fwd_io && CTX->use_proxy() && count > gkfs::config::proxy::fwd_io_count_threshold) { diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 6bd9203ea..a78a0d45d 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -1152,4 +1152,70 @@ forward_mk_symlink(const std::string& path, const std::string& target_path) { #endif + +int +forward_write_inline(const std::string& path, const void* buf, off64_t offset, + size_t write_size) { + + auto endp = CTX->hosts().at( + CTX->distributor()->locate_file_metadata(path, 0)); + + try { + LOG(DEBUG, "Sending RPC (Inline Write) path: '{}' size: '{}' ...", path, write_size); + + // Construct std::string from buffer for serialization + // Note: This copies data, which is acceptable for small inline writes + std::string data(static_cast(buf), write_size); + + auto out = ld_network_service + ->post(endp, path, offset, data, write_size) + .get() + .at(0); + + if (out.err() != 0) { + LOG(ERROR, "{}() Daemon reported error: {}", __func__, out.err()); + return out.err(); + } + + return 0; + + } catch(const std::exception& ex) { + LOG(ERROR, "{}() Exception: '{}'", __func__, ex.what()); + return EBUSY; + } +} + +std::pair +forward_read_inline(const std::string& path, void* buf, off64_t offset, + size_t read_size) { + + auto endp = CTX->hosts().at( + CTX->distributor()->locate_file_metadata(path, 0)); + + try { + LOG(DEBUG, "Sending RPC (Inline Read) path: '{}' size: '{}' ...", path, read_size); + + auto out = ld_network_service + ->post(endp, path, offset, read_size) + .get() + .at(0); + + if (out.err() != 0) { + LOG(DEBUG, "{}() Daemon reported error: {}", __func__, out.err()); + return {out.err(), 0}; + } + + // Copy data from string response to user buffer + if (out.count() > 0 && !out.data().empty()) { + std::memcpy(buf, out.data().c_str(), out.count()); + } + + return {0, out.count()}; + + } catch(const std::exception& ex) { + LOG(ERROR, "{}() Exception: '{}'", __func__, ex.what()); + return {EBUSY, 0}; + } +} + } // namespace gkfs::rpc diff --git a/src/client/rpc/rpc_types.cpp b/src/client/rpc/rpc_types.cpp index 1f34d0d5a..5b5f89193 100644 --- a/src/client/rpc/rpc_types.cpp +++ b/src/client/rpc/rpc_types.cpp @@ -79,6 +79,10 @@ hermes::detail::register_user_request_types(uint32_t provider_id) { provider_id); (void) registered_requests().add( provider_id); + (void) registered_requests().add( + provider_id); + (void) registered_requests().add( + provider_id); } else { (void) registered_requests().add( provider_id); diff --git a/src/common/metadata.cpp b/src/common/metadata.cpp index e4f751e48..8fd7aa718 100644 --- a/src/common/metadata.cpp +++ b/src/common/metadata.cpp @@ -184,6 +184,13 @@ Metadata::Metadata(const std::string& binary_str) { #endif // HAS_RENAME #endif // HAS_SYMLINKS + if(ptr < binary_str.data() + binary_str.size()) { + assert(*ptr == MSP); + ptr++; + // The rest of the string is the binary data + inline_data_ = std::string(ptr); + } + // we consumed all the binary string assert(*ptr == '\0'); } @@ -225,6 +232,10 @@ Metadata::serialize() const { #endif // HAS_RENAME #endif // HAS_SYMLINKS + if(gkfs::config::metadata::use_inline_data && !inline_data_.empty()) { + s += MSP; // Separator + s += inline_data_; + } return s; } @@ -343,4 +354,13 @@ Metadata::rename_path(const std::string& rename_path) { #endif // HAS_RENAME #endif // HAS_SYMLINKS +std::string +Metadata::inline_data() const { + return inline_data_; +} + +void +Metadata::inline_data(const std::string& data) { + inline_data_ = data; +} } // namespace gkfs::metadata diff --git a/src/daemon/backend/metadata/merge.cpp b/src/daemon/backend/metadata/merge.cpp index bf8d58692..400149127 100644 --- a/src/daemon/backend/metadata/merge.cpp +++ b/src/daemon/backend/metadata/merge.cpp @@ -167,6 +167,43 @@ UpdateTimeOperand::serialize_params() const { return ::to_string(mtime_); } + +WriteInlineOperand::WriteInlineOperand(const size_t offset, + const std::string& data) + : offset_(offset), data_(data) {} + +WriteInlineOperand::WriteInlineOperand(const rdb::Slice& serialized_op) { + // We expect format: "offset:data" + // Since data is binary, we cannot rely on simple string conversion if nulls + // are present, but we can find the first separator because offset is a + // number. + + // Convert to string view or string to find the separator + std::string_view s(serialized_op.data(), serialized_op.size()); + auto pos = s.find(serialize_sep); + + if(pos == std::string::npos) { + // Fallback/Error case + offset_ = 0; + } else { + // Parse offset + offset_ = std::stoul(std::string(s.substr(0, pos))); + // Everything after separator is data + data_ = std::string(s.substr(pos + 1)); + } +} + +OperandID +WriteInlineOperand::id() const { + return OperandID::write_inline; +} + +std::string +WriteInlineOperand::serialize_params() const { + return fmt::format("{}{}{}", offset_, serialize_sep, data_); +} + + /** * @internal * Merges all operands in chronological order for the same key. @@ -233,6 +270,34 @@ MetadataMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, } else if(operand_id == OperandID::update_time) { auto op = UpdateTimeOperand(parameters); md.mtime(op.mtime()); + } else if(operand_id == OperandID::write_inline) { + + auto op = WriteInlineOperand(parameters); + + // 1. Get a copy of the string (not a reference, as md.inline_data() + // returns by value) + std::string current_data = md.inline_data(); + + // 2. Use the variables from the operand class + size_t offset = op.offset(); + const std::string& data_buffer = op.data(); + size_t data_len = data_buffer.size(); + + // 3. Resize if writing beyond current inline data size + if(offset + data_len > current_data.size()) { + current_data.resize(offset + data_len, '\0'); + } + + // 4. Overwrite data + current_data.replace(offset, data_len, data_buffer); + + // 5. Store updated string back into Metadata object + md.inline_data(current_data); + + // 6. Update file size if we wrote beyond previous EOF + if(current_data.size() > fsize) { + fsize = current_data.size(); + } } else { throw ::runtime_error("Unrecognized merge operand ID: " + (char) operand_id); diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 25b970b58..45d630c92 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -201,6 +201,10 @@ register_server_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::migrate_metadata, rpc_migrate_metadata_in_t, rpc_err_out_t, rpc_srv_migrate_metadata); + MARGO_REGISTER(mid, gkfs::rpc::tag::write_data_inline, rpc_write_inline_in_t, + rpc_data_out_t, rpc_srv_write_data_inline); + MARGO_REGISTER(mid, gkfs::rpc::tag::read_data_inline, rpc_read_inline_in_t, + rpc_read_inline_out_t, rpc_srv_read_data_inline); } /** diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 42b66114f..b463e9f9e 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -1028,6 +1028,145 @@ rpc_srv_rename(hg_handle_t handle) { #endif // HAS_RENAME +/** + * @brief Serves a write request for inline data (stored in RocksDB). + * @return Mercury error code + */ +hg_return_t +rpc_srv_write_data_inline(hg_handle_t handle) { + rpc_write_inline_in_t in{}; + rpc_err_out_t out{}; + out.err = EIO; + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + + GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}'", + __func__, in.path, in.count, in.offset); + + try { + // 1. Check limits + if(in.offset + in.count > gkfs::config::metadata::inline_data_size) { + out.err = EFBIG; // File too large for inline + } else { + // 2. Prepare data string from input buffer + // Note: in.data is a hg_const_string_t (char*) + std::string data_buffer(in.data, in.count); + + // 3. Persist to RocksDB + auto md = gkfs::metadata::get(in.path); + std::string current_data = md.inline_data(); + + // Extend if necessary (Sparse write support) + if(current_data.size() < (in.offset + in.count)) { + current_data.resize(in.offset + in.count, '\0'); + } + + // Apply write + current_data.replace(in.offset, in.count, data_buffer); + md.inline_data(current_data); + + // Update size if file grew + if(current_data.size() > md.size()) { + md.size(current_data.size()); + } + + gkfs::metadata::update(in.path, md); + out.err = 0; + } + } catch(const gkfs::metadata::NotFoundException& e) { + out.err = ENOENT; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to write inline data: '{}'", + __func__, e.what()); + out.err = EIO; + } + + auto hret = margo_respond(handle, &out); + if(hret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + } + + margo_free_input(handle, &in); + margo_destroy(handle); + return HG_SUCCESS; +} + +/** + * @brief Serves a read request for inline data (stored in RocksDB). + * @return Mercury error code + */ +hg_return_t +rpc_srv_read_data_inline(hg_handle_t handle) { + rpc_read_inline_in_t in{}; + rpc_read_inline_out_t out{}; + out.err = EIO; + out.count = 0; + out.data = nullptr; // Important initialization + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + + GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}'", + __func__, in.path, in.count, in.offset); + + std::string data_buffer; // Keep scope alive until respond + + try { + auto md = gkfs::metadata::get(in.path); + + // Check if the file actually has inline data or if it's stored on disk + // We assume here that if the size is larger than the limit, it's NOT + // inline and the client should have checked, but we double check. + if(md.size() > gkfs::config::metadata::inline_data_size) { + out.err = EAGAIN; // Signal client to use Chunk path + } else { + const std::string& stored_data = md.inline_data(); + + if(in.offset >= stored_data.size()) { + // EOF + out.count = 0; + out.err = 0; + } else { + size_t available = stored_data.size() - in.offset; + size_t read_amt = + std::min(static_cast(in.count), available); + + // Substring to return + data_buffer = stored_data.substr(in.offset, read_amt); + + out.data = data_buffer.c_str(); + out.count = read_amt; + out.err = 0; + } + } + } catch(const gkfs::metadata::NotFoundException& e) { + out.err = ENOENT; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to read inline data: '{}'", + __func__, e.what()); + out.err = EIO; + } + + auto hret = margo_respond(handle, &out); + if(hret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + } + + margo_free_input(handle, &in); + margo_destroy(handle); + return HG_SUCCESS; +} + + } // namespace DEFINE_MARGO_RPC_HANDLER(rpc_srv_create) @@ -1056,4 +1195,8 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink) #endif #ifdef HAS_RENAME DEFINE_MARGO_RPC_HANDLER(rpc_srv_rename) -#endif \ No newline at end of file +#endif + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_write_data_inline) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data_inline) \ No newline at end of file -- GitLab From 6863b2e7ad268df3d303d2db6870f792fbf538d0 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 14:29:00 +0100 Subject: [PATCH 02/18] updates for trunc --- include/client/rpc/forward_metadata.hpp | 8 +- include/client/rpc/rpc_types.hpp | 31 ++++-- include/common/metadata.hpp | 5 + include/common/rpc/rpc_types.hpp | 8 +- src/client/gkfs_functions.cpp | 57 ++++++---- src/client/rpc/forward_metadata.cpp | 47 +++++---- src/common/metadata.cpp | 123 ++++++++++++---------- src/daemon/backend/data/chunk_storage.cpp | 4 + src/daemon/backend/metadata/merge.cpp | 9 ++ src/daemon/daemon.cpp | 5 +- src/daemon/handler/srv_metadata.cpp | 18 +++- 11 files changed, 197 insertions(+), 118 deletions(-) diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index 341aa671a..a457f7f48 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -112,12 +112,12 @@ forward_mk_symlink(const std::string& path, const std::string& target_path); * @param path The file path. * @param buf Pointer to the data buffer. * @param offset The file offset. - * @param write_size The number of bytes to write. - * @return int Error code (0 on success). + * @param append_flag Whether to append to the file. + * @return std::pair Error code and offset written. */ -int +std::pair forward_write_inline(const std::string& path, const void* buf, off64_t offset, - size_t write_size); + size_t write_size, bool append_flag); /** * @brief Send an RPC request to read a small amount of data directly diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index 42be849da..5055e2899 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -2461,7 +2461,7 @@ struct write_data_inline { using input_type = input; using output_type = output; using mercury_input_type = rpc_write_inline_in_t; - using mercury_output_type = rpc_err_out_t; + using mercury_output_type = rpc_write_inline_out_t; constexpr static const uint64_t public_id = 60; // Unique ID constexpr static const hg_id_t mercury_id = 0; @@ -2470,7 +2470,7 @@ struct write_data_inline { constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_write_inline_in_t); constexpr static const auto mercury_out_proc_cb = - HG_GEN_PROC_NAME(rpc_err_out_t); + HG_GEN_PROC_NAME(rpc_write_inline_out_t); class input { template @@ -2479,16 +2479,18 @@ struct write_data_inline { public: input(const std::string& path, uint64_t offset, const std::string& data, - uint64_t count) - : m_path(path), m_offset(offset), m_data(data), m_count(count) {} + uint64_t count, bool append) + : m_path(path), m_offset(offset), m_data(data), m_count(count), + m_append(append) {} explicit input(const rpc_write_inline_in_t& other) : m_path(other.path), m_offset(other.offset), m_data(other.data), - m_count(other.count) {} + m_count(other.count), m_append(other.append) {} explicit operator rpc_write_inline_in_t() { - return {m_path.c_str(), m_offset, m_data.c_str(), m_count}; + return {m_path.c_str(), m_offset, m_data.c_str(), m_count, + static_cast(m_append)}; } private: @@ -2496,6 +2498,7 @@ struct write_data_inline { uint64_t m_offset; std::string m_data; uint64_t m_count; + bool m_append; }; class output { @@ -2504,15 +2507,27 @@ struct write_data_inline { hermes::detail::post_to_mercury(ExecutionContext*); public: - output() : m_err(0) {} - explicit output(const rpc_err_out_t& out) : m_err(out.err) {} + output() : m_err(0), m_ret_offset(0), m_io_size(0) {} + explicit output(const rpc_write_inline_out_t& out) + : m_err(out.err), m_ret_offset(out.ret_offset), + m_io_size(out.io_size) {} int32_t err() const { return m_err; } + int64_t + ret_offset() const { + return m_ret_offset; + } + size_t + io_size() const { + return m_io_size; + } private: int32_t m_err; + int64_t m_ret_offset; + size_t m_io_size; }; }; diff --git a/include/common/metadata.hpp b/include/common/metadata.hpp index 923beec3f..22d599736 100644 --- a/include/common/metadata.hpp +++ b/include/common/metadata.hpp @@ -85,6 +85,11 @@ public: Metadata(mode_t mode, const std::string& target_path); +#ifdef HAS_RENAME + Metadata(mode_t mode, const std::string& target_path, + const std::string& rename_path); +#endif + #endif // Construct from a binary representation of the object diff --git a/include/common/rpc/rpc_types.hpp b/include/common/rpc/rpc_types.hpp index f6b18538f..649b80cdc 100644 --- a/include/common/rpc/rpc_types.hpp +++ b/include/common/rpc/rpc_types.hpp @@ -193,7 +193,13 @@ MERCURY_GEN_PROC(rpc_migrate_metadata_in_t, // Inline write operations MERCURY_GEN_PROC(rpc_write_inline_in_t, ((hg_const_string_t) (path))((hg_uint64_t) (offset))( - (hg_const_string_t) (data))((hg_uint64_t) (count))) + (hg_const_string_t) (data))((hg_uint64_t) (count))( + (hg_bool_t) (append))) + +// Output: err, offset (for append), bytes written +MERCURY_GEN_PROC( + rpc_write_inline_out_t, + ((hg_int32_t) (err))((hg_int64_t) (ret_offset))((hg_size_t) (io_size))) // Input: path, offset, read length MERCURY_GEN_PROC(rpc_read_inline_in_t, diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 36058e59b..32fe0f585 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -968,6 +968,9 @@ gkfs_truncate(const std::string& path, off_t length) { while(!md.value().target_path().empty() and md.value().blocks() != -1) { new_path = md.value().target_path(); md = gkfs::utils::get_metadata(md.value().target_path()); + if(!md) { + return -1; + } } // This could be optimized auto size = md->size(); @@ -1063,20 +1066,45 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, // OPTIMIZATION: Inline Write - if(gkfs::config::metadata::use_inline_data && offset == 0 && - count <= gkfs::config::metadata::inline_data_size) { + if(gkfs::config::metadata::use_inline_data && + (is_append || + (offset + count) <= gkfs::config::metadata::inline_data_size)) { // Attempt inline write via Metadata RPC - auto err = gkfs::rpc::forward_write_inline(file.path(), buf, offset, - count); - + auto ret_inline = gkfs::rpc::forward_write_inline( + file.path(), buf, offset, count, is_append); + auto err = ret_inline.first; if(err == 0) { if(update_pos) - file.pos(offset + count); + file.pos(ret_inline.second + count); return count; } } + if(is_append) { + auto ret_offset = + gkfs::utils::update_file_size(*path, count, offset, is_append); + err = ret_offset.first; + if(err) { + LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); + errno = err; + return -1; + } + // When append is set the EOF is set to the offset + // forward_update_metadentry_size returns. This is because + // it is an atomic operation on the server and reserves the + // space for this append + if(ret_offset.second == -1) { + LOG(ERROR, + "update_metadentry_size() received -1 as starting offset. " + "This occurs when the staring offset could not be extracted " + "from RocksDB's merge operations. Inform GekkoFS devs."); + errno = EIO; + return -1; + } + offset = ret_offset.second; + } + if(CTX->use_write_size_cache() && !is_append) { auto [size_update_cnt, cached_size] = CTX->write_size_cache()->record(*path, offset + count); @@ -1090,7 +1118,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, return -1; } } - } else { + } else if(!is_append) { auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, is_append); err = ret_offset.first; @@ -1099,21 +1127,6 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, errno = err; return -1; } - if(is_append) { - // When append is set the EOF is set to the offset - // forward_update_metadentry_size returns. This is because - // it is an atomic operation on the server and reserves the - // space for this append - if(ret_offset.second == -1) { - LOG(ERROR, - "update_metadentry_size() received -1 as starting offset. " - "This occurs when the staring offset could not be extracted " - "from RocksDB's merge operations. Inform GekkoFS devs."); - errno = EIO; - return -1; - } - offset = ret_offset.second; - } } pair ret_write; diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index a78a0d45d..5f7b80a6b 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -1153,35 +1153,38 @@ forward_mk_symlink(const std::string& path, const std::string& target_path) { #endif -int +std::pair forward_write_inline(const std::string& path, const void* buf, off64_t offset, - size_t write_size) { - - auto endp = CTX->hosts().at( - CTX->distributor()->locate_file_metadata(path, 0)); + size_t write_size, bool append_flag) { + + auto endp = + CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0)); try { - LOG(DEBUG, "Sending RPC (Inline Write) path: '{}' size: '{}' ...", path, write_size); + LOG(DEBUG, "Sending RPC (Inline Write) path: '{}' size: '{}' ...", path, + write_size); // Construct std::string from buffer for serialization // Note: This copies data, which is acceptable for small inline writes std::string data(static_cast(buf), write_size); auto out = ld_network_service - ->post(endp, path, offset, data, write_size) - .get() - .at(0); + ->post( + endp, path, offset, data, write_size, + bool_to_merc_bool(append_flag)) + .get() + .at(0); - if (out.err() != 0) { + if(out.err() != 0) { LOG(ERROR, "{}() Daemon reported error: {}", __func__, out.err()); - return out.err(); + return {out.err(), 0}; } - return 0; + return {0, out.ret_offset()}; } catch(const std::exception& ex) { LOG(ERROR, "{}() Exception: '{}'", __func__, ex.what()); - return EBUSY; + return {EBUSY, 0}; } } @@ -1189,24 +1192,26 @@ std::pair forward_read_inline(const std::string& path, void* buf, off64_t offset, size_t read_size) { - auto endp = CTX->hosts().at( - CTX->distributor()->locate_file_metadata(path, 0)); + auto endp = + CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0)); try { - LOG(DEBUG, "Sending RPC (Inline Read) path: '{}' size: '{}' ...", path, read_size); + LOG(DEBUG, "Sending RPC (Inline Read) path: '{}' size: '{}' ...", path, + read_size); auto out = ld_network_service - ->post(endp, path, offset, read_size) - .get() - .at(0); + ->post( + endp, path, offset, read_size) + .get() + .at(0); - if (out.err() != 0) { + if(out.err() != 0) { LOG(DEBUG, "{}() Daemon reported error: {}", __func__, out.err()); return {out.err(), 0}; } // Copy data from string response to user buffer - if (out.count() > 0 && !out.data().empty()) { + if(out.count() > 0 && !out.data().empty()) { std::memcpy(buf, out.data().c_str(), out.count()); } diff --git a/src/common/metadata.cpp b/src/common/metadata.cpp index 8fd7aa718..699e27e19 100644 --- a/src/common/metadata.cpp +++ b/src/common/metadata.cpp @@ -40,6 +40,7 @@ #include #include +#include extern "C" { #include @@ -110,89 +111,102 @@ Metadata::Metadata(const mode_t mode, const std::string& target_path) assert(target_path_.empty() || target_path_[0] == '/'); init_time(); } +#endif +#ifdef HAS_RENAME +#ifdef HAS_SYMLINKS +Metadata::Metadata(const mode_t mode, const std::string& target_path, + const std::string& rename_path) + : mode_(mode), link_count_(0), size_(0), blocks_(0), + target_path_(target_path), rename_path_(rename_path) { + assert(S_ISLNK(mode_) || S_ISDIR(mode_) || S_ISREG(mode_)); + assert(target_path_.empty() || S_ISLNK(mode_)); + assert(target_path_.empty() || target_path_[0] == '/'); + init_time(); +} +#endif #endif Metadata::Metadata(const std::string& binary_str) { size_t read = 0; + const char* ptr = binary_str.data(); + const char* end = ptr + binary_str.size(); - auto ptr = binary_str.data(); + // mode mode_ = static_cast(std::stoul(ptr, &read)); - // we read something assert(read > 0); ptr += read; + assert(ptr < end && *ptr == MSP); + ++ptr; // skip separator - // last parsed char is the separator char - assert(*ptr == MSP); - // yet we have some character to parse - - size_ = std::stol(++ptr, &read); + // size + size_ = std::stol(ptr, &read); assert(read > 0); ptr += read; - // The order is important. don't change. + // optional fields in fixed order if constexpr(gkfs::config::metadata::use_atime) { - assert(*ptr == MSP); - atime_ = static_cast(std::stol(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + atime_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } if constexpr(gkfs::config::metadata::use_mtime) { - assert(*ptr == MSP); - mtime_ = static_cast(std::stol(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + mtime_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } if constexpr(gkfs::config::metadata::use_ctime) { - assert(*ptr == MSP); - ctime_ = static_cast(std::stol(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + ctime_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } if constexpr(gkfs::config::metadata::use_link_cnt) { - assert(*ptr == MSP); - link_count_ = static_cast(std::stoul(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + link_count_ = static_cast(std::stoul(ptr, &read)); assert(read > 0); ptr += read; } - if constexpr(gkfs::config::metadata::use_blocks) { // last one will not - // encounter a - // delimiter anymore - assert(*ptr == MSP); - blocks_ = static_cast(std::stol(++ptr, &read)); + if constexpr(gkfs::config::metadata::use_blocks) { + assert(ptr < end && *ptr == MSP); + ++ptr; + blocks_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } + // symlink target #ifdef HAS_SYMLINKS - // Read target_path - assert(*ptr == MSP); - target_path_ = ++ptr; - // target_path should be there only if this is a link - ptr += target_path_.size(); + if(ptr < end && *ptr == MSP) { + ++ptr; // skip separator + const char* start = ptr; + const char* sep = std::find(ptr, end, MSP); + target_path_.assign(start, sep - start); + ptr = sep; + } #ifdef HAS_RENAME - // Read rename target, we had captured '|' so we need to recover it - if(!target_path_.empty()) { - auto index = target_path_.find_last_of(MSP); - auto size = target_path_.size(); - target_path_ = target_path_.substr(0, index); - ptr -= (size - index); + if(ptr < end && *ptr == MSP) { + ++ptr; // skip separator + const char* start = ptr; + const char* sep = std::find(ptr, end, MSP); + rename_path_.assign(start, sep - start); + ptr = sep; } - assert(*ptr == MSP); - rename_path_ = ++ptr; - ptr += rename_path_.size(); #endif // HAS_RENAME #endif // HAS_SYMLINKS - if(ptr < binary_str.data() + binary_str.size()) { - assert(*ptr == MSP); - ptr++; - // The rest of the string is the binary data - inline_data_ = std::string(ptr); + // inline data + if(gkfs::config::metadata::use_inline_data && ptr < end) { + if(*ptr == MSP) + ++ptr; // optional separator before payload + inline_data_ = std::string(ptr, end - ptr); } - - // we consumed all the binary string - assert(*ptr == '\0'); } std::string @@ -239,20 +253,17 @@ Metadata::serialize() const { return s; } +// Getter/Setter implementations (unchanged) ... + void Metadata::update_atime_now() { - auto time = std::time(nullptr); - atime_ = time; + atime_ = std::time(nullptr); } - void Metadata::update_mtime_now() { - auto time = std::time(nullptr); - mtime_ = time; + mtime_ = std::time(nullptr); } -//-------------------------------------------- GETTER/SETTER - time_t Metadata::atime() const { return atime_; @@ -260,7 +271,7 @@ Metadata::atime() const { void Metadata::atime(time_t atime) { - Metadata::atime_ = atime; + atime_ = atime; } time_t @@ -270,7 +281,7 @@ Metadata::mtime() const { void Metadata::mtime(time_t mtime) { - Metadata::mtime_ = mtime; + mtime_ = mtime; } time_t @@ -280,7 +291,7 @@ Metadata::ctime() const { void Metadata::ctime(time_t ctime) { - Metadata::ctime_ = ctime; + ctime_ = ctime; } mode_t @@ -290,7 +301,7 @@ Metadata::mode() const { void Metadata::mode(mode_t mode) { - Metadata::mode_ = mode; + mode_ = mode; } nlink_t @@ -310,7 +321,7 @@ Metadata::size() const { void Metadata::size(size_t size) { - Metadata::size_ = size; + size_ = size; } blkcnt_t @@ -320,7 +331,7 @@ Metadata::blocks() const { void Metadata::blocks(blkcnt_t blocks) { - Metadata::blocks_ = blocks; + blocks_ = blocks; } #ifdef HAS_SYMLINKS diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index b03e07bad..6ef511de2 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -287,6 +287,8 @@ ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_t chunk_start) { auto chunk_dir = absolute(get_chunks_dir(file_path)); + if(!fs::exists(chunk_dir)) + return; const fs::directory_iterator end; auto err_flag = false; for(fs::directory_iterator chunk_file(chunk_dir); chunk_file != end; @@ -319,6 +321,8 @@ ChunkStorage::truncate_chunk_file(const string& file_path, static_cast(length) <= chunksize_); auto ret = truncate(chunk_path.c_str(), length); if(ret == -1) { + if(errno == ENOENT) + return; auto err_str = fmt::format( "Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); diff --git a/src/daemon/backend/metadata/merge.cpp b/src/daemon/backend/metadata/merge.cpp index 400149127..051a1988e 100644 --- a/src/daemon/backend/metadata/merge.cpp +++ b/src/daemon/backend/metadata/merge.cpp @@ -265,6 +265,15 @@ MetadataMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, auto op = DecreaseSizeOperand(parameters); assert(op.size() <= fsize); // we assume no concurrency here fsize = op.size(); + + // We need to handle inline_data + if(gkfs::config::metadata::use_inline_data) { + auto inline_data = md.inline_data(); + if(inline_data.size() > fsize) { + inline_data.resize(fsize); + md.inline_data(inline_data); + } + } } else if(operand_id == OperandID::create) { continue; } else if(operand_id == OperandID::update_time) { diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 45d630c92..9872376dc 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -201,8 +201,9 @@ register_server_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::migrate_metadata, rpc_migrate_metadata_in_t, rpc_err_out_t, rpc_srv_migrate_metadata); - MARGO_REGISTER(mid, gkfs::rpc::tag::write_data_inline, rpc_write_inline_in_t, - rpc_data_out_t, rpc_srv_write_data_inline); + MARGO_REGISTER(mid, gkfs::rpc::tag::write_data_inline, + rpc_write_inline_in_t, rpc_write_inline_out_t, + rpc_srv_write_data_inline); MARGO_REGISTER(mid, gkfs::rpc::tag::read_data_inline, rpc_read_inline_in_t, rpc_read_inline_out_t, rpc_srv_read_data_inline); } diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index b463e9f9e..b7ee208df 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -1035,8 +1035,10 @@ rpc_srv_rename(hg_handle_t handle) { hg_return_t rpc_srv_write_data_inline(hg_handle_t handle) { rpc_write_inline_in_t in{}; - rpc_err_out_t out{}; + rpc_write_inline_out_t out{}; out.err = EIO; + out.ret_offset = 0; + out.io_size = 0; auto ret = margo_get_input(handle, &in); if(ret != HG_SUCCESS) { @@ -1045,10 +1047,17 @@ rpc_srv_write_data_inline(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } - GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}'", - __func__, in.path, in.count, in.offset); + GKFS_DATA->spdlogger()->debug( + "{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, + in.path, in.count, in.offset, in.append); try { + auto md = gkfs::metadata::get(in.path); + auto current_size = md.size(); + if(in.append) { + in.offset = current_size; + } + // 1. Check limits if(in.offset + in.count > gkfs::config::metadata::inline_data_size) { out.err = EFBIG; // File too large for inline @@ -1058,7 +1067,6 @@ rpc_srv_write_data_inline(hg_handle_t handle) { std::string data_buffer(in.data, in.count); // 3. Persist to RocksDB - auto md = gkfs::metadata::get(in.path); std::string current_data = md.inline_data(); // Extend if necessary (Sparse write support) @@ -1077,6 +1085,8 @@ rpc_srv_write_data_inline(hg_handle_t handle) { gkfs::metadata::update(in.path, md); out.err = 0; + out.ret_offset = in.offset; + out.io_size = in.count; } } catch(const gkfs::metadata::NotFoundException& e) { out.err = ENOENT; -- GitLab From 575a9583d502d3e575bc131a12efd848296e1a21 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 15:12:46 +0100 Subject: [PATCH 03/18] migrate inline if exceeds size --- src/client/gkfs_functions.cpp | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 32fe0f585..8b3850109 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -1081,6 +1081,27 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, } } + // If we are here, we are writing to chunks. + // Check if we need to migrate existing inline data to chunks. + // This is necessary if the file has inline data but we are now writing + // beyond the inline limit (or appending). + if(gkfs::config::metadata::use_inline_data && (is_append || offset > 0)) { + auto md = gkfs::utils::get_metadata(*path); + if(md && !md->inline_data().empty()) { + LOG(DEBUG, "{}() Migrating inline data to chunks. Size: {}", + __func__, md->size()); + // Write inline data to chunks + auto err_migration = gkfs::rpc::forward_write( + *path, md->inline_data().c_str(), 0, md->size(), 0); + if(err_migration.first) { + LOG(ERROR, "{}() Failed to migrate inline data to chunks", + __func__); + errno = err_migration.first; + return -1; + } + } + } + if(is_append) { auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, is_append); -- GitLab From ffd3301001a4682eb2d1229add180f4d1a5c87e6 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 16:52:04 +0100 Subject: [PATCH 04/18] new test --- tests/integration/data/test_inline_data.py | 168 +++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 tests/integration/data/test_inline_data.py diff --git a/tests/integration/data/test_inline_data.py b/tests/integration/data/test_inline_data.py new file mode 100644 index 000000000..4b681bb53 --- /dev/null +++ b/tests/integration/data/test_inline_data.py @@ -0,0 +1,168 @@ +import pytest +import os +import stat +from harness.logger import logger + +def test_inline_append(gkfs_daemon, gkfs_client): + """Test inline data append operations""" + file = gkfs_daemon.mountdir / "file_inline_append" + + # Open file + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write initial data (inline) + buf1 = b'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Close and reopen for append + gkfs_client.close(file) # Assuming close exists, or just open new fd + # Actually gkfs_client.open returns an fd but the harness wrapper might abstract it. + # Looking at test_write_operations.py, it just calls open again. + + ret = gkfs_client.open(file, + os.O_WRONLY | os.O_APPEND, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Append data (inline) + buf2 = b'B' * 100 + ret = gkfs_client.write(file, buf2, len(buf2)) # write with O_APPEND implies append + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 200 + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 200) + assert ret.retval == 200 + assert ret.buf == buf1 + buf2 + +def test_inline_pwrite(gkfs_daemon, gkfs_client): + """Test inline data overwrite using pwrite""" + file = gkfs_daemon.mountdir / "file_inline_pwrite" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write initial data + buf1 = b'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Overwrite middle part + buf2 = b'B' * 50 + ret = gkfs_client.pwrite(file, buf2, len(buf2), 25) + assert ret.retval == len(buf2) + + # Verify size (should be same) + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 100 + + # Verify content + expected = b'A' * 25 + b'B' * 50 + b'A' * 25 + + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 100) + assert ret.retval == 100 + assert ret.buf == expected + +def test_inline_overflow_append(gkfs_daemon, gkfs_client): + """Test appending data that overflows inline limit (migration to chunks)""" + file = gkfs_daemon.mountdir / "file_inline_overflow" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write almost full inline data + buf1 = b'A' * 4000 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Reopen for append + ret = gkfs_client.open(file, + os.O_WRONLY | os.O_APPEND, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Append enough to overflow 4096 + buf2 = b'B' * 200 + ret = gkfs_client.write(file, buf2, len(buf2)) + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 4200 + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 4200) + assert ret.retval == 4200 + assert ret.buf == buf1 + buf2 + +def test_inline_overflow_pwrite(gkfs_daemon, gkfs_client): + """Test pwrite that overflows inline limit (migration to chunks)""" + file = gkfs_daemon.mountdir / "file_inline_overflow_pwrite" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write small inline data + buf1 = b'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Pwrite far beyond inline limit (creating hole) + buf2 = b'B' * 100 + offset = 5000 + ret = gkfs_client.pwrite(file, buf2, len(buf2), offset) + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == offset + len(buf2) + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Read hole + data + # We expect A*100 + zeros + B*100 + # Total size = 5100 + + ret = gkfs_client.read(file, 5100) + assert ret.retval == 5100 + + read_buf = ret.buf + assert read_buf[0:100] == buf1 + assert read_buf[100:offset] == b'\x00' * (offset - 100) + assert read_buf[offset:offset+100] == buf2 -- GitLab From a04e807c0b6f296b7f3629fbd85229db8a134e67 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 16:59:53 +0100 Subject: [PATCH 05/18] removing close --- tests/integration/data/test_inline_data.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/integration/data/test_inline_data.py b/tests/integration/data/test_inline_data.py index 4b681bb53..275725fe9 100644 --- a/tests/integration/data/test_inline_data.py +++ b/tests/integration/data/test_inline_data.py @@ -17,11 +17,6 @@ def test_inline_append(gkfs_daemon, gkfs_client): buf1 = b'A' * 100 ret = gkfs_client.write(file, buf1, len(buf1)) assert ret.retval == len(buf1) - - # Close and reopen for append - gkfs_client.close(file) # Assuming close exists, or just open new fd - # Actually gkfs_client.open returns an fd but the harness wrapper might abstract it. - # Looking at test_write_operations.py, it just calls open again. ret = gkfs_client.open(file, os.O_WRONLY | os.O_APPEND, -- GitLab From adfd7e058b46343e22533b7c0218834b9d77a5fe Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 17:56:00 +0100 Subject: [PATCH 06/18] clean inline --- include/daemon/backend/metadata/db.hpp | 2 +- include/daemon/backend/metadata/merge.hpp | 10 ++++-- .../backend/metadata/metadata_backend.hpp | 6 ++-- .../backend/metadata/parallax_backend.hpp | 2 +- .../backend/metadata/rocksdb_backend.hpp | 2 +- include/daemon/ops/metadentry.hpp | 3 +- src/daemon/backend/metadata/merge.cpp | 34 ++++++++++++++----- .../backend/metadata/parallax_backend.cpp | 3 +- .../backend/metadata/rocksdb_backend.cpp | 7 ++-- src/daemon/handler/srv_metadata.cpp | 22 +++++++++++- src/daemon/ops/metadentry.cpp | 6 ++-- 11 files changed, 73 insertions(+), 24 deletions(-) diff --git a/include/daemon/backend/metadata/db.hpp b/include/daemon/backend/metadata/db.hpp index 72fe2f9b2..2a320f58d 100644 --- a/include/daemon/backend/metadata/db.hpp +++ b/include/daemon/backend/metadata/db.hpp @@ -136,7 +136,7 @@ public: */ off_t increase_size(const std::string& key, size_t io_size, off_t offset, - bool append); + bool append, bool clear_inline = false); /** * @brief Decreases only the size part of the metadata entry via a RocksDB diff --git a/include/daemon/backend/metadata/merge.hpp b/include/daemon/backend/metadata/merge.hpp index 1d2b66b82..10749814e 100644 --- a/include/daemon/backend/metadata/merge.hpp +++ b/include/daemon/backend/metadata/merge.hpp @@ -104,11 +104,12 @@ private: */ uint16_t merge_id_; bool append_; + bool clear_inline_; public: - IncreaseSizeOperand(size_t size); + IncreaseSizeOperand(size_t size, bool clear_inline = false); - IncreaseSizeOperand(size_t size, uint16_t merge_id, bool append); + IncreaseSizeOperand(size_t size, uint16_t merge_id, bool append, bool clear_inline = false); explicit IncreaseSizeOperand(const rdb::Slice& serialized_op); @@ -132,6 +133,11 @@ public: append() const { return append_; } + + bool + clear_inline() const { + return clear_inline_; + } }; /** * @brief Decrease size operand diff --git a/include/daemon/backend/metadata/metadata_backend.hpp b/include/daemon/backend/metadata/metadata_backend.hpp index bdafa5d7c..972a26b74 100644 --- a/include/daemon/backend/metadata/metadata_backend.hpp +++ b/include/daemon/backend/metadata/metadata_backend.hpp @@ -72,7 +72,7 @@ public: virtual off_t increase_size(const std::string& key, size_t size, off_t offset, - bool append) = 0; + bool append, bool clear_inline = false) = 0; virtual void decrease_size(const std::string& key, size_t size) = 0; @@ -133,9 +133,9 @@ public: off_t increase_size(const std::string& key, size_t size, off_t offset, - bool append) { + bool append, bool clear_inline = false) { return static_cast(*this).increase_size_impl(key, size, offset, - append); + append, clear_inline); } void diff --git a/include/daemon/backend/metadata/parallax_backend.hpp b/include/daemon/backend/metadata/parallax_backend.hpp index ced3522ea..d80403837 100644 --- a/include/daemon/backend/metadata/parallax_backend.hpp +++ b/include/daemon/backend/metadata/parallax_backend.hpp @@ -164,7 +164,7 @@ public: */ off_t increase_size_impl(const std::string& key, size_t io_size, off_t offset, - bool append); + bool append, bool clear_inline = false); /** * Decreases the size on the metadata diff --git a/include/daemon/backend/metadata/rocksdb_backend.hpp b/include/daemon/backend/metadata/rocksdb_backend.hpp index 1b0a1fd00..5d01d5782 100644 --- a/include/daemon/backend/metadata/rocksdb_backend.hpp +++ b/include/daemon/backend/metadata/rocksdb_backend.hpp @@ -148,7 +148,7 @@ public: */ off_t increase_size_impl(const std::string& key, size_t io_size, off_t offset, - bool append); + bool append, bool clear_inline = false); /** * Decreases the size on the metadata diff --git a/include/daemon/ops/metadentry.hpp b/include/daemon/ops/metadentry.hpp index 22868ea6b..cf9358d38 100644 --- a/include/daemon/ops/metadentry.hpp +++ b/include/daemon/ops/metadentry.hpp @@ -125,7 +125,8 @@ update(const std::string& path, Metadata& md); * @return starting offset for I/O operation */ off_t -update_size(const std::string& path, size_t io_size, off_t offset, bool append); +update_size(const std::string& path, size_t io_size, off_t offset, bool append, + bool clear_inline = false); /** * @brief Remove metadentry if exists diff --git a/src/daemon/backend/metadata/merge.cpp b/src/daemon/backend/metadata/merge.cpp index 051a1988e..ca3ba32f0 100644 --- a/src/daemon/backend/metadata/merge.cpp +++ b/src/daemon/backend/metadata/merge.cpp @@ -70,13 +70,14 @@ MergeOperand::get_params(const rdb::Slice& serialized_op) { return {serialized_op.data() + 2, serialized_op.size() - 2}; } -IncreaseSizeOperand::IncreaseSizeOperand(const size_t size) - : size_(size), merge_id_(0), append_(false) {} +IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const bool clear_inline) + : size_(size), merge_id_(0), append_(false), clear_inline_(clear_inline) {} IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const uint16_t merge_id, - const bool append) - : size_(size), merge_id_(merge_id), append_(append) {} + const bool append, + const bool clear_inline) + : size_(size), merge_id_(merge_id), append_(append), clear_inline_(clear_inline) {} IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) { size_t read = 0; @@ -86,13 +87,24 @@ IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) { serialized_op[read] == serialize_end) { merge_id_ = 0; append_ = false; + clear_inline_ = false; return; } assert(serialized_op[read] == serialize_sep); // Parse merge id + size_t read_merge_id = 0; merge_id_ = static_cast( - std::stoul(serialized_op.data() + read + 1, nullptr)); + std::stoul(serialized_op.data() + read + 1, &read_merge_id)); append_ = true; + + // Check for clear_inline + size_t next_pos = read + 1 + read_merge_id; + if (next_pos < serialized_op.size() && serialized_op[next_pos] == serialize_sep) { + // we have clear_inline + clear_inline_ = (serialized_op[next_pos + 1] == '1'); + } else { + clear_inline_ = false; + } } OperandID @@ -103,9 +115,9 @@ IncreaseSizeOperand::id() const { string IncreaseSizeOperand::serialize_params() const { // serialize_end avoids rogue characters in the serialized string - if(append_) - return fmt::format("{}{}{}{}", size_, serialize_sep, merge_id_, - serialize_end); + if(append_ || clear_inline_) + return fmt::format("{}{}{}{}{}{}", size_, serialize_sep, merge_id_, + serialize_sep, clear_inline_ ? 1 : 0, serialize_end); else { return fmt::format("{}{}", size_, serialize_end); } @@ -261,6 +273,12 @@ MetadataMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, } else { fsize = ::max(op.size(), fsize); } + + // Handle clear_inline + if(op.clear_inline() && gkfs::config::metadata::use_inline_data) { + md.inline_data(""); + } + } else if(operand_id == OperandID::decrease_size) { auto op = DecreaseSizeOperand(parameters); assert(op.size() <= fsize); // we assume no concurrency here diff --git a/src/daemon/backend/metadata/parallax_backend.cpp b/src/daemon/backend/metadata/parallax_backend.cpp index a21c2ddab..215781989 100644 --- a/src/daemon/backend/metadata/parallax_backend.cpp +++ b/src/daemon/backend/metadata/parallax_backend.cpp @@ -343,7 +343,8 @@ ParallaxBackend::update_impl(const std::string& old_key, */ off_t ParallaxBackend::increase_size_impl(const std::string& key, size_t io_size, - off_t offset, bool append) { + off_t offset, bool append, + bool clear_inline) { lock_guard lock_guard(parallax_mutex_); off_t out_offset = -1; auto value = get(key); diff --git a/src/daemon/backend/metadata/rocksdb_backend.cpp b/src/daemon/backend/metadata/rocksdb_backend.cpp index 3d84fefbe..423e5f641 100644 --- a/src/daemon/backend/metadata/rocksdb_backend.cpp +++ b/src/daemon/backend/metadata/rocksdb_backend.cpp @@ -223,12 +223,13 @@ RocksDBBackend::update_impl(const std::string& old_key, */ off_t RocksDBBackend::increase_size_impl(const std::string& key, size_t io_size, - off_t offset, bool append) { + off_t offset, bool append, + bool clear_inline) { off_t out_offset = -1; if(append) { auto merge_id = gkfs::metadata::gen_unique_id(key); // no offset needed because new size is current file size + io_size - auto uop = IncreaseSizeOperand(io_size, merge_id, append); + auto uop = IncreaseSizeOperand(io_size, merge_id, append, clear_inline); auto s = db_->Merge(write_opts_, key, uop.serialize()); if(!s.ok()) { throw_status_excpt(s); @@ -249,7 +250,7 @@ RocksDBBackend::increase_size_impl(const std::string& key, size_t io_size, } else { // In the standard case we simply add the I/O request size to the // offset. - auto uop = IncreaseSizeOperand(offset + io_size); + auto uop = IncreaseSizeOperand(offset + io_size, clear_inline); auto s = db_->Merge(write_opts_, key, uop.serialize()); if(!s.ok()) { throw_status_excpt(s); diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index b7ee208df..7733ec046 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -444,8 +444,28 @@ rpc_srv_update_metadentry_size(hg_handle_t handle) { in.path, in.size, in.offset, in.append); try { + bool clear_inline = false; + if(in.append == HG_TRUE) { + auto md = gkfs::metadata::get(in.path); + size_t current_size = md.size(); + size_t new_size = current_size + in.size; + if(new_size > gkfs::config::metadata::inline_data_size && + current_size <= gkfs::config::metadata::inline_data_size) { + // Migration needed + std::string inline_data = md.inline_data(); + if(!inline_data.empty()) { + // Write to chunk 0 + GKFS_DATA->storage()->write_chunk(in.path, 0, + inline_data.c_str(), + inline_data.size(), 0); + clear_inline = true; + } + } + } + out.ret_offset = gkfs::metadata::update_size( - in.path, in.size, in.offset, (in.append == HG_TRUE)); + in.path, in.size, in.offset, (in.append == HG_TRUE), + clear_inline); out.err = 0; } catch(const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, diff --git a/src/daemon/ops/metadentry.cpp b/src/daemon/ops/metadentry.cpp index 1c53a6da3..0f1de7aef 100644 --- a/src/daemon/ops/metadentry.cpp +++ b/src/daemon/ops/metadentry.cpp @@ -114,8 +114,10 @@ update(const string& path, Metadata& md) { * @endinternal */ off_t -update_size(const string& path, size_t io_size, off64_t offset, bool append) { - return GKFS_DATA->mdb()->increase_size(path, io_size, offset, append); +update_size(const string& path, size_t io_size, off64_t offset, bool append, + bool clear_inline) { + return GKFS_DATA->mdb()->increase_size(path, io_size, offset, append, + clear_inline); } void -- GitLab From c1896fd35d6e0a23faf6fe5605096f49ad94582e Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 17:57:12 +0100 Subject: [PATCH 07/18] test modification --- tests/integration/data/test_inline_data.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/data/test_inline_data.py b/tests/integration/data/test_inline_data.py index 275725fe9..8db429dde 100644 --- a/tests/integration/data/test_inline_data.py +++ b/tests/integration/data/test_inline_data.py @@ -25,7 +25,7 @@ def test_inline_append(gkfs_daemon, gkfs_client): # Append data (inline) buf2 = b'B' * 100 - ret = gkfs_client.write(file, buf2, len(buf2)) # write with O_APPEND implies append + ret = gkfs_client.write(file, buf2, len(buf2), 1) # write with O_APPEND assert ret.retval == len(buf2) # Verify size @@ -101,7 +101,7 @@ def test_inline_overflow_append(gkfs_daemon, gkfs_client): # Append enough to overflow 4096 buf2 = b'B' * 200 - ret = gkfs_client.write(file, buf2, len(buf2)) + ret = gkfs_client.write(file, buf2, len(buf2), 1) # Pass append flag assert ret.retval == len(buf2) # Verify size -- GitLab From 2139106b93d5904fcb620b6d125b80fbb1bcb316 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 17:58:48 +0100 Subject: [PATCH 08/18] db --- src/daemon/backend/metadata/db.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/daemon/backend/metadata/db.cpp b/src/daemon/backend/metadata/db.cpp index 6483f74a5..cee663bc3 100644 --- a/src/daemon/backend/metadata/db.cpp +++ b/src/daemon/backend/metadata/db.cpp @@ -148,8 +148,8 @@ MetadataDB::update(const std::string& old_key, const std::string& new_key, off_t MetadataDB::increase_size(const std::string& key, size_t io_size, off_t offset, - bool append) { - return backend_->increase_size(key, io_size, offset, append); + bool append, bool clear_inline) { + return backend_->increase_size(key, io_size, offset, append, clear_inline); } void -- GitLab From 52fdb99c5358c215d80e8910ef7cea57d6ebb4ed Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Nov 2025 18:45:19 +0100 Subject: [PATCH 09/18] correcting clean inline --- include/client/preload_util.hpp | 2 +- include/client/rpc/forward_metadata.hpp | 2 +- include/client/rpc/rpc_types.hpp | 13 +++++++--- include/common/rpc/rpc_types.hpp | 3 ++- include/daemon/backend/metadata/merge.hpp | 5 ++-- src/client/gkfs_functions.cpp | 29 +++++++++++++++++++---- src/client/preload_util.cpp | 10 ++++---- src/client/rpc/forward_metadata.cpp | 4 +++- src/daemon/backend/metadata/merge.cpp | 21 +++++++++------- src/daemon/handler/srv_metadata.cpp | 4 ++++ 10 files changed, 66 insertions(+), 27 deletions(-) diff --git a/include/client/preload_util.hpp b/include/client/preload_util.hpp index fbdab48e9..bdff4678c 100644 --- a/include/client/preload_util.hpp +++ b/include/client/preload_util.hpp @@ -98,7 +98,7 @@ metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, */ std::pair update_file_size(const std::string& path, size_t count, off64_t offset, - bool is_append); + bool is_append, bool clear_inline_flag = false); void load_hosts(); diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index a457f7f48..e1761b40a 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -86,7 +86,7 @@ forward_update_metadentry(const std::string& path, std::pair forward_update_metadentry_size(const std::string& path, size_t size, off64_t offset, bool append_flag, - const int num_copies); + bool clear_inline_flag, const int num_copies); std::pair forward_get_metadentry_size(const std::string& path, const int copy); diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index 5055e2899..b5bf35991 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -1130,8 +1130,9 @@ struct update_metadentry_size { public: input(const std::string& path, uint64_t size, int64_t offset, - bool append) - : m_path(path), m_size(size), m_offset(offset), m_append(append) {} + bool append, bool clear_inline = false) + : m_path(path), m_size(size), m_offset(offset), m_append(append), + m_clear_inline(clear_inline) {} input(input&& rhs) = default; @@ -1163,9 +1164,14 @@ struct update_metadentry_size { return m_append; } + bool + clear_inline() const { + return m_clear_inline; + } + explicit input(const rpc_update_metadentry_size_in_t& other) : m_path(other.path), m_size(other.size), m_offset(other.offset), - m_append(other.append) {} + m_append(other.append), m_clear_inline(other.clear_inline) {} explicit operator rpc_update_metadentry_size_in_t() { @@ -1177,6 +1183,7 @@ struct update_metadentry_size { uint64_t m_size; int64_t m_offset; bool m_append; + bool m_clear_inline; }; class output { diff --git a/include/common/rpc/rpc_types.hpp b/include/common/rpc/rpc_types.hpp index 649b80cdc..066855e3c 100644 --- a/include/common/rpc/rpc_types.hpp +++ b/include/common/rpc/rpc_types.hpp @@ -81,7 +81,8 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, ((hg_const_string_t) (path))((hg_uint64_t) (size))( - (hg_int64_t) (offset))((hg_bool_t) (append))) + (hg_int64_t) (offset))((hg_bool_t) (append))( + (hg_bool_t) (clear_inline))) MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, ((hg_int32_t) (err))((hg_int64_t) (ret_offset))) diff --git a/include/daemon/backend/metadata/merge.hpp b/include/daemon/backend/metadata/merge.hpp index 10749814e..e48325618 100644 --- a/include/daemon/backend/metadata/merge.hpp +++ b/include/daemon/backend/metadata/merge.hpp @@ -109,7 +109,8 @@ private: public: IncreaseSizeOperand(size_t size, bool clear_inline = false); - IncreaseSizeOperand(size_t size, uint16_t merge_id, bool append, bool clear_inline = false); + IncreaseSizeOperand(size_t size, uint16_t merge_id, bool append, + bool clear_inline = false); explicit IncreaseSizeOperand(const rdb::Slice& serialized_op); @@ -133,7 +134,7 @@ public: append() const { return append_; } - + bool clear_inline() const { return clear_inline_; diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 8b3850109..da9fff853 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -141,7 +141,8 @@ generate_lock_file(const std::string& path, bool increase) { if(!md) { gkfs::rpc::forward_create(lock_path, 0777 | S_IFREG, 0); } - gkfs::rpc::forward_update_metadentry_size(lock_path, 1, 0, false, 0); + gkfs::rpc::forward_update_metadentry_size(lock_path, 1, 0, false, false, + 0); } else { auto md = gkfs::utils::get_metadata(lock_path); if(md) { @@ -1085,6 +1086,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, // Check if we need to migrate existing inline data to chunks. // This is necessary if the file has inline data but we are now writing // beyond the inline limit (or appending). + bool migrated = false; if(gkfs::config::metadata::use_inline_data && (is_append || offset > 0)) { auto md = gkfs::utils::get_metadata(*path); if(md && !md->inline_data().empty()) { @@ -1099,12 +1101,13 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, errno = err_migration.first; return -1; } + migrated = true; } } if(is_append) { - auto ret_offset = - gkfs::utils::update_file_size(*path, count, offset, is_append); + auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, + is_append, migrated); err = ret_offset.first; if(err) { LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); @@ -1139,9 +1142,25 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, return -1; } } + // If we migrated, we must ensure the server knows to clear inline data + // even if we don't flush the size cache yet. + // However, the current cache implementation doesn't support passing + // extra flags. For now, if migrated is true, we force an update. + if(migrated) { + auto ret_offset = gkfs::utils::update_file_size( + *path, count, offset, is_append, migrated); + err = ret_offset.first; + if(err) { + LOG(ERROR, "update_metadentry_size() failed with err '{}'", + err); + errno = err; + return -1; + } + } + } else if(!is_append) { - auto ret_offset = - gkfs::utils::update_file_size(*path, count, offset, is_append); + auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, + is_append, migrated); err = ret_offset.first; if(err) { LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 39b4eac08..a5a883ffa 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -357,9 +357,10 @@ metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, pair update_file_size(const std::string& path, size_t count, off64_t offset, - bool is_append) { - LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'", - __func__, path, count, offset, is_append); + bool is_append, bool clear_inline_flag) { + LOG(DEBUG, + "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}', clear_inline_flag: '{}'", + __func__, path, count, offset, is_append, clear_inline_flag); pair ret_offset; auto num_replicas = CTX->get_replicas(); if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) { @@ -367,7 +368,8 @@ update_file_size(const std::string& path, size_t count, off64_t offset, path, count, offset, is_append); } else { ret_offset = gkfs::rpc::forward_update_metadentry_size( - path, count, offset, is_append, num_replicas); + path, count, offset, is_append, clear_inline_flag, + num_replicas); } return ret_offset; } diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 5f7b80a6b..f16cd69ab 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -563,6 +563,7 @@ forward_rename(const string& oldpath, const string& newpath, pair forward_update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, + const bool clear_inline_flag, const int num_copies) { if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) { LOG(WARNING, "{} was called even though proxy should be used!", @@ -583,7 +584,8 @@ forward_update_metadentry_size(const string& path, const size_t size, handles.emplace_back( ld_network_service->post( endp, path, size, offset, - bool_to_merc_bool(append_flag))); + bool_to_merc_bool(append_flag), + bool_to_merc_bool(clear_inline_flag))); } catch(const std::exception& ex) { LOG(ERROR, "while getting rpc output"); return make_pair(EBUSY, 0); diff --git a/src/daemon/backend/metadata/merge.cpp b/src/daemon/backend/metadata/merge.cpp index ca3ba32f0..74e013de1 100644 --- a/src/daemon/backend/metadata/merge.cpp +++ b/src/daemon/backend/metadata/merge.cpp @@ -70,14 +70,16 @@ MergeOperand::get_params(const rdb::Slice& serialized_op) { return {serialized_op.data() + 2, serialized_op.size() - 2}; } -IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const bool clear_inline) +IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, + const bool clear_inline) : size_(size), merge_id_(0), append_(false), clear_inline_(clear_inline) {} IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const uint16_t merge_id, const bool append, const bool clear_inline) - : size_(size), merge_id_(merge_id), append_(append), clear_inline_(clear_inline) {} + : size_(size), merge_id_(merge_id), append_(append), + clear_inline_(clear_inline) {} IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) { size_t read = 0; @@ -96,14 +98,15 @@ IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) { merge_id_ = static_cast( std::stoul(serialized_op.data() + read + 1, &read_merge_id)); append_ = true; - + // Check for clear_inline size_t next_pos = read + 1 + read_merge_id; - if (next_pos < serialized_op.size() && serialized_op[next_pos] == serialize_sep) { - // we have clear_inline - clear_inline_ = (serialized_op[next_pos + 1] == '1'); + if(next_pos < serialized_op.size() && + serialized_op[next_pos] == serialize_sep) { + // we have clear_inline + clear_inline_ = (serialized_op[next_pos + 1] == '1'); } else { - clear_inline_ = false; + clear_inline_ = false; } } @@ -273,12 +276,12 @@ MetadataMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, } else { fsize = ::max(op.size(), fsize); } - + // Handle clear_inline if(op.clear_inline() && gkfs::config::metadata::use_inline_data) { md.inline_data(""); } - + } else if(operand_id == OperandID::decrease_size) { auto op = DecreaseSizeOperand(parameters); assert(op.size() <= fsize); // we assume no concurrency here diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 7733ec046..e98262ee1 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -463,6 +463,10 @@ rpc_srv_update_metadentry_size(hg_handle_t handle) { } } + if(in.clear_inline) { + clear_inline = true; + } + out.ret_offset = gkfs::metadata::update_size( in.path, in.size, in.offset, (in.append == HG_TRUE), clear_inline); -- GitLab From 6ae28fe7dfec80cd728f1d87d364cca3403ae4fd Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 25 Nov 2025 06:59:37 +0100 Subject: [PATCH 10/18] fix s3d --- src/daemon/handler/srv_metadata.cpp | 6 ++++-- tests/integration/data/test_inline_data.py | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index e98262ee1..d56548b0e 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -1083,8 +1083,10 @@ rpc_srv_write_data_inline(hg_handle_t handle) { } // 1. Check limits - if(in.offset + in.count > gkfs::config::metadata::inline_data_size) { - out.err = EFBIG; // File too large for inline + if(in.offset + in.count > gkfs::config::metadata::inline_data_size || + md.size() > gkfs::config::metadata::inline_data_size || + (md.size() > 0 && md.inline_data().empty())) { + out.err = EFBIG; // File too large for inline or already in chunks } else { // 2. Prepare data string from input buffer // Note: in.data is a hg_const_string_t (char*) diff --git a/tests/integration/data/test_inline_data.py b/tests/integration/data/test_inline_data.py index 8db429dde..9e7286a05 100644 --- a/tests/integration/data/test_inline_data.py +++ b/tests/integration/data/test_inline_data.py @@ -161,3 +161,5 @@ def test_inline_overflow_pwrite(gkfs_daemon, gkfs_client): assert read_buf[0:100] == buf1 assert read_buf[100:offset] == b'\x00' * (offset - 100) assert read_buf[offset:offset+100] == buf2 + + -- GitLab From 71b1f2a18f1503595f40052b3de14f54fde9ba4d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 25 Nov 2025 14:39:25 +0100 Subject: [PATCH 11/18] fix s3d header behaviour --- src/client/gkfs_functions.cpp | 5 ++-- tests/integration/data/test_inline_data.py | 35 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index da9fff853..aaeeca8e3 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -1087,9 +1087,10 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, // This is necessary if the file has inline data but we are now writing // beyond the inline limit (or appending). bool migrated = false; - if(gkfs::config::metadata::use_inline_data && (is_append || offset > 0)) { + if(gkfs::config::metadata::use_inline_data) { auto md = gkfs::utils::get_metadata(*path); - if(md && !md->inline_data().empty()) { + if(md && md->size() > 0 && + md->size() <= gkfs::config::metadata::inline_data_size) { LOG(DEBUG, "{}() Migrating inline data to chunks. Size: {}", __func__, md->size()); // Write inline data to chunks diff --git a/tests/integration/data/test_inline_data.py b/tests/integration/data/test_inline_data.py index 9e7286a05..99d2846d9 100644 --- a/tests/integration/data/test_inline_data.py +++ b/tests/integration/data/test_inline_data.py @@ -162,4 +162,39 @@ def test_inline_overflow_pwrite(gkfs_daemon, gkfs_client): assert read_buf[100:offset] == b'\x00' * (offset - 100) assert read_buf[offset:offset+100] == buf2 +def test_inline_overwrite_pwrite(gkfs_daemon, gkfs_client): + """Test pwrite at offset 0 that overflows inline limit (migration/clearing)""" + file = gkfs_daemon.mountdir / "file_inline_overwrite_pwrite" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write small inline data + buf1 = b'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Overwrite with large data at offset 0 + # This should force chunk write and clear inline data + buf2 = b'B' * 5000 + ret = gkfs_client.pwrite(file, buf2, len(buf2), 0) + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 5000 + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 5000) + assert ret.retval == 5000 + assert ret.buf == buf2 + -- GitLab From 4056b33d2a3c4e8eb2ccd6ba911847b187572cb5 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 25 Nov 2025 14:52:40 +0100 Subject: [PATCH 12/18] Temp remove inline --- include/config.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/config.hpp b/include/config.hpp index 3ca38a0c5..383e59c40 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -134,7 +134,7 @@ constexpr auto implicit_data_removal = true; // Check for existence of file metadata before create. This done on RocksDB // level constexpr auto create_exist_check = true; -constexpr auto use_inline_data = true; +constexpr auto use_inline_data = false; constexpr auto inline_data_size = 4096; // in bytes } // namespace metadata namespace data { -- GitLab From 48c80f20771817096f3445ef0402a34f2ad26eff Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 26 Nov 2025 09:06:01 +0100 Subject: [PATCH 13/18] solved lockfile with inline --- include/config.hpp | 2 +- src/client/gkfs_functions.cpp | 11 +++++++---- src/daemon/handler/srv_metadata.cpp | 2 ++ 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/include/config.hpp b/include/config.hpp index 383e59c40..3ca38a0c5 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -134,7 +134,7 @@ constexpr auto implicit_data_removal = true; // Check for existence of file metadata before create. This done on RocksDB // level constexpr auto create_exist_check = true; -constexpr auto use_inline_data = false; +constexpr auto use_inline_data = true; constexpr auto inline_data_size = 4096; // in bytes } // namespace metadata namespace data { diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index aaeeca8e3..7fc587e82 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -138,15 +138,18 @@ generate_lock_file(const std::string& path, bool increase) { auto lock_path = path + ".lockgekko"; if(increase) { auto md = gkfs::utils::get_metadata(lock_path); + size_t new_size = 1; if(!md) { gkfs::rpc::forward_create(lock_path, 0777 | S_IFREG, 0); + } else { + new_size = md->size() + 1; } - gkfs::rpc::forward_update_metadentry_size(lock_path, 1, 0, false, false, - 0); + gkfs::rpc::forward_update_metadentry_size(lock_path, new_size, 0, false, + false, 0); } else { auto md = gkfs::utils::get_metadata(lock_path); if(md) { - if(md.value().size() == 1) { + if(md->size() == 1 || md->size() == 0) { LOG(DEBUG, "Deleting Lock file {}", lock_path); gkfs::rpc::forward_remove(lock_path, false, 0); } else { @@ -1361,7 +1364,7 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count, } if(gkfs::config::metadata::use_inline_data && - (offset + count) <= gkfs::config::metadata::inline_data_size) { + offset < gkfs::config::metadata::inline_data_size) { // Forward the read request to the Metadata Server instead of Data // Server diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index d56548b0e..719a300e3 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -1164,6 +1164,8 @@ rpc_srv_read_data_inline(hg_handle_t handle) { // inline and the client should have checked, but we double check. if(md.size() > gkfs::config::metadata::inline_data_size) { out.err = EAGAIN; // Signal client to use Chunk path + } else if(md.size() > 0 && md.inline_data().empty()) { + out.err = EAGAIN; // Signal client to use Chunk path } else { const std::string& stored_data = md.inline_data(); -- GitLab From 9c7b991312eacb7bfefd6ca01b4ec1ffefcc32e9 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 26 Nov 2025 09:37:25 +0100 Subject: [PATCH 14/18] malloc -> calloc mmap --- src/client/gkfs_functions.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 7fc587e82..670df239c 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -2039,13 +2039,18 @@ gkfs_get_file_list(const std::string& path) { void* gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd, off_t offset) { - void* ptr = malloc(length); + void* ptr = calloc(1, length); if(ptr == nullptr) { return MAP_FAILED; } // store info on mmap_set mmap_set.insert(std::make_tuple(ptr, fd, length, offset)); - gkfs::syscall::gkfs_pread(fd, ptr, length, offset); + auto ret = gkfs::syscall::gkfs_pread(fd, ptr, length, offset); + if(ret == -1) { + free(ptr); + mmap_set.erase(std::make_tuple(ptr, fd, length, offset)); + return MAP_FAILED; + } return ptr; } -- GitLab From c6b53bdb4e753e7a91132ed8467fe8f90576aa97 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 26 Nov 2025 09:52:14 +0100 Subject: [PATCH 15/18] mmap prot --- src/client/gkfs_functions.cpp | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 670df239c..1da914ecc 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -50,6 +50,8 @@ #include #include #include +#include +#include #include #ifdef GKFS_ENABLE_CLIENT_METRICS @@ -63,6 +65,7 @@ extern "C" { #include #include #include +#include } using namespace std; @@ -86,7 +89,8 @@ struct dirent_extended { namespace { // set to store void * addr, fd, length and offset -std::set> mmap_set; +// set to store void * addr, fd, length, offset, prot +std::set> mmap_set; /** * Checks if metadata for parent directory exists (can be disabled with * GKFS_CREATE_CHECK_PARENTS). errno may be set @@ -2044,11 +2048,11 @@ gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd, return MAP_FAILED; } // store info on mmap_set - mmap_set.insert(std::make_tuple(ptr, fd, length, offset)); + mmap_set.insert(std::make_tuple(ptr, fd, length, offset, prot)); auto ret = gkfs::syscall::gkfs_pread(fd, ptr, length, offset); if(ret == -1) { free(ptr); - mmap_set.erase(std::make_tuple(ptr, fd, length, offset)); + mmap_set.erase(std::make_tuple(ptr, fd, length, offset, prot)); return MAP_FAILED; } return ptr; @@ -2064,7 +2068,10 @@ gkfs_msync(void* addr, size_t length, int flags) { if(std::get<0>(tuple) == addr) { int fd = std::get<1>(tuple); off_t offset = std::get<3>(tuple); - gkfs::syscall::gkfs_pwrite(fd, addr, length, offset); + int prot = std::get<4>(tuple); + if(prot & PROT_WRITE) { + gkfs::syscall::gkfs_pwrite(fd, addr, length, offset); + } return 0; } } @@ -2085,7 +2092,7 @@ gkfs_munmap(void* addr, size_t length) { auto it = std::find_if( mmap_set.begin(), mmap_set.end(), - [&addr](const std::tuple& t) { + [&addr](const std::tuple& t) { return std::get<0>(t) == addr; }); if(it != mmap_set.end()) { -- GitLab From f6f3b7315336f1a7c474a3509c56025bae2cedd8 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 27 Nov 2025 14:29:43 +0100 Subject: [PATCH 16/18] base64 in inline --- include/common/rpc/rpc_util.hpp | 6 ++ src/client/gkfs_functions.cpp | 1 + src/client/rpc/forward_metadata.cpp | 14 +++- src/common/CMakeLists.txt | 1 + src/common/metadata.cpp | 6 +- src/common/rpc/rpc_util.cpp | 17 ++++- src/daemon/handler/srv_metadata.cpp | 27 ++++++-- tests/integration/data/test_inline_null.py | 74 ++++++++++++++++++++++ 8 files changed, 132 insertions(+), 14 deletions(-) create mode 100644 tests/integration/data/test_inline_null.py diff --git a/include/common/rpc/rpc_util.hpp b/include/common/rpc/rpc_util.hpp index 7b7d32535..c4eb1b06d 100644 --- a/include/common/rpc/rpc_util.hpp +++ b/include/common/rpc/rpc_util.hpp @@ -74,6 +74,12 @@ compress_bitset(const std::vector& bytes); std::vector decompress_bitset(const std::string& compressedString); +std::string +base64_encode(const std::string& data); + +std::string +base64_decode_to_string(const std::string& encoded); + } // namespace gkfs::rpc #endif // GEKKOFS_COMMON_RPC_UTILS_HPP diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 1da914ecc..0ab61c221 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -1100,6 +1100,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, md->size() <= gkfs::config::metadata::inline_data_size) { LOG(DEBUG, "{}() Migrating inline data to chunks. Size: {}", __func__, md->size()); + // Write inline data to chunks auto err_migration = gkfs::rpc::forward_write( *path, md->inline_data().c_str(), 0, md->size(), 0); diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index f16cd69ab..8f7249631 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -1169,10 +1169,14 @@ forward_write_inline(const std::string& path, const void* buf, off64_t offset, // Construct std::string from buffer for serialization // Note: This copies data, which is acceptable for small inline writes std::string data(static_cast(buf), write_size); + // Base64 encode the data to ensure it is safe for RPC transmission (no + // null bytes) + std::string encoded_data = gkfs::rpc::base64_encode(data); auto out = ld_network_service ->post( - endp, path, offset, data, write_size, + endp, path, offset, encoded_data, + encoded_data.size(), bool_to_merc_bool(append_flag)) .get() .at(0); @@ -1214,10 +1218,14 @@ forward_read_inline(const std::string& path, void* buf, off64_t offset, // Copy data from string response to user buffer if(out.count() > 0 && !out.data().empty()) { - std::memcpy(buf, out.data().c_str(), out.count()); + // Decode Base64 string back to binary + std::string decoded_data = + gkfs::rpc::base64_decode_to_string(out.data()); + std::memcpy(buf, decoded_data.c_str(), decoded_data.size()); + return {0, decoded_data.size()}; } - return {0, out.count()}; + return {0, 0}; } catch(const std::exception& ex) { LOG(ERROR, "{}() Exception: '{}'", __func__, ex.what()); diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 6f746d6eb..b6d4702c7 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -102,6 +102,7 @@ target_sources(metadata target_link_libraries(metadata PRIVATE fmt::fmt + rpc_utils ) add_library(path_util STATIC) diff --git a/src/common/metadata.cpp b/src/common/metadata.cpp index 699e27e19..ad372493b 100644 --- a/src/common/metadata.cpp +++ b/src/common/metadata.cpp @@ -37,6 +37,7 @@ */ #include +#include #include #include @@ -205,7 +206,8 @@ Metadata::Metadata(const std::string& binary_str) { if(gkfs::config::metadata::use_inline_data && ptr < end) { if(*ptr == MSP) ++ptr; // optional separator before payload - inline_data_ = std::string(ptr, end - ptr); + inline_data_ = + gkfs::rpc::base64_decode_to_string(std::string(ptr, end - ptr)); } } @@ -248,7 +250,7 @@ Metadata::serialize() const { if(gkfs::config::metadata::use_inline_data && !inline_data_.empty()) { s += MSP; // Separator - s += inline_data_; + s += gkfs::rpc::base64_encode(inline_data_); } return s; } diff --git a/src/common/rpc/rpc_util.cpp b/src/common/rpc/rpc_util.cpp index 30ed2fc56..ee11bf10f 100644 --- a/src/common/rpc/rpc_util.cpp +++ b/src/common/rpc/rpc_util.cpp @@ -201,8 +201,9 @@ base64_decode(const std::string& encoded) { // Remove the padding bits buffer >>= padding; padding = 0; - data.push_back(static_cast((buffer >> 8) & 0xFF)); - count++; + // The previous code incorrectly extracted a byte here. + // Since padding < 8, we don't have a full byte left, so we just discard + // the padding. } if(count == 0 || padding % 8 != 0) @@ -222,4 +223,16 @@ decompress_bitset(const std::string& compressedString) { } +std::string +base64_encode(const std::string& data) { + std::vector vec(data.begin(), data.end()); + return base64_encode(vec); +} + +std::string +base64_decode_to_string(const std::string& encoded) { + auto vec = base64_decode(encoded); + return std::string(vec.begin(), vec.end()); +} + } // namespace gkfs::rpc \ No newline at end of file diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 719a300e3..63c98c8f3 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -51,6 +51,7 @@ #include #include +#include #include #include using namespace std; @@ -1074,6 +1075,8 @@ rpc_srv_write_data_inline(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug( "{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.count, in.offset, in.append); + // Decode the data + auto decoded_data = gkfs::rpc::base64_decode_to_string(in.data); try { auto md = gkfs::metadata::get(in.path); @@ -1083,25 +1086,26 @@ rpc_srv_write_data_inline(hg_handle_t handle) { } // 1. Check limits - if(in.offset + in.count > gkfs::config::metadata::inline_data_size || + if(in.offset + decoded_data.size() > + gkfs::config::metadata::inline_data_size || md.size() > gkfs::config::metadata::inline_data_size || (md.size() > 0 && md.inline_data().empty())) { out.err = EFBIG; // File too large for inline or already in chunks } else { // 2. Prepare data string from input buffer // Note: in.data is a hg_const_string_t (char*) - std::string data_buffer(in.data, in.count); + // decoded_data already contains the binary data // 3. Persist to RocksDB std::string current_data = md.inline_data(); // Extend if necessary (Sparse write support) - if(current_data.size() < (in.offset + in.count)) { - current_data.resize(in.offset + in.count, '\0'); + if(current_data.size() < (in.offset + decoded_data.size())) { + current_data.resize(in.offset + decoded_data.size(), '\0'); } // Apply write - current_data.replace(in.offset, in.count, data_buffer); + current_data.replace(in.offset, decoded_data.size(), decoded_data); md.inline_data(current_data); // Update size if file grew @@ -1112,7 +1116,7 @@ rpc_srv_write_data_inline(hg_handle_t handle) { gkfs::metadata::update(in.path, md); out.err = 0; out.ret_offset = in.offset; - out.io_size = in.count; + out.io_size = decoded_data.size(); } } catch(const gkfs::metadata::NotFoundException& e) { out.err = ENOENT; @@ -1181,8 +1185,17 @@ rpc_srv_read_data_inline(hg_handle_t handle) { // Substring to return data_buffer = stored_data.substr(in.offset, read_amt); + // Encode to Base64 to ensure it is safe for RPC transmission + // (no null bytes) + std::string encoded_data = + gkfs::rpc::base64_encode(data_buffer); + + // We reuse data_buffer to hold the encoded string to ensure it + // outlives the response + data_buffer = encoded_data; + out.data = data_buffer.c_str(); - out.count = read_amt; + out.count = data_buffer.size(); out.err = 0; } } diff --git a/tests/integration/data/test_inline_null.py b/tests/integration/data/test_inline_null.py new file mode 100644 index 000000000..ba4331ea2 --- /dev/null +++ b/tests/integration/data/test_inline_null.py @@ -0,0 +1,74 @@ +import pytest +import logging +from harness.logger import logger + +def test_inline_null_chars(gkfs_daemon, gkfs_shell, tmp_path): + print("DEBUG: Entered test_inline_null_chars") + """Test inline data with null characters to verify base64 encoding""" + file = gkfs_daemon.mountdir / "file_inline_null" + + # Create a python script file in the temporary directory + script_file = tmp_path / "write_nulls.py" + script_content = f""" +import os +with open('{file}', 'wb') as f: + buf = b'Start\\x00Middle\\x00End' + f.write(buf) +""" + script_file.write_text(script_content) + + # Execute the script using gkfs_shell (which uses LD_PRELOAD) + ret = gkfs_shell.script(f"python3 {script_file}") + assert ret.exit_code == 0 + + # Read back the data to verify + read_script_file = tmp_path / "read_nulls.py" + read_script_content = f""" +import os +with open('{file}', 'rb') as f: + data = f.read() + expected = b'Start\\x00Middle\\x00End' + if data != expected: + print(f"Mismatch: expected {{expected}}, got {{data}}") + exit(1) +""" + read_script_file.write_text(read_script_content) + + ret = gkfs_shell.script(f"python3 {read_script_file}") + assert ret.exit_code == 0 + + +def test_inline_null_chars_large(gkfs_daemon, gkfs_shell, tmp_path): + """Test larger inline data with null characters""" + file = gkfs_daemon.mountdir / "file_inline_null_large" + + # Create a python script file + script_file = tmp_path / "write_nulls_large.py" + script_content = f""" +import os +with open('{file}', 'wb') as f: + # 2000 bytes, mixed nulls and data + buf = b'\\x00' * 100 + b'Data' * 100 + b'\\x00' * 100 + f.write(buf) +""" + script_file.write_text(script_content) + + # Execute the script using gkfs_shell + ret = gkfs_shell.script(f"python3 {script_file}") + assert ret.exit_code == 0 + + # Read back the data to verify + read_script_file = tmp_path / "read_nulls_large.py" + read_script_content = f""" +import os +with open('{file}', 'rb') as f: + data = f.read() + expected = b'\\x00' * 100 + b'Data' * 100 + b'\\x00' * 100 + if data != expected: + print(f"Mismatch: expected len {{len(expected)}}, got {{len(data)}}") + exit(1) +""" + read_script_file.write_text(read_script_content) + + ret = gkfs_shell.script(f"python3 {read_script_file}") + assert ret.exit_code == 0 -- GitLab From 605b8999e47c589254440971270193b87469b268 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 27 Nov 2025 15:54:37 +0100 Subject: [PATCH 17/18] Changelog --- CHANGELOG.md | 3 +++ README.md | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 047b05db7..4f467fef9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Compress directory data with zstd. - Make a new config.hpp option for controlling the compression - If directory buffer is not enough it will reattempt with the exact size + - Metadata server can store small data ([!271](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/271)) + - Using config.hpp use_inline_data = true; and inline_data_size = 4096; + - Data is stored in base64, as we use string to send the small data content (not bulk transfer) ### Changed diff --git a/README.md b/README.md index d4232a88a..711336170 100644 --- a/README.md +++ b/README.md @@ -510,6 +510,10 @@ Note, that a chunk/host configuration is inherited to all children files automat In this example, `/mdt-hard/file1` is therefore also using the same distribution as the `/mdt-hard` directory. If no prefix is used, the Simple Hash distributor is used. +## Small Data Store +Small files can be stored using the metadata server, this is controlled with the `config.hpp` options: +`use_inline_data = true` and `inline_data_size` + #### Guided configuration file Creating a guided configuration file is based on an I/O trace file of a previous execution of the application. -- GitLab From de9371e044816081bd2ca7435121e833530cdd1f Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 5 Dec 2025 10:38:09 +0100 Subject: [PATCH 18/18] 2 RPC to 1 RPC metadata write --- include/client/open_file_map.hpp | 9 ++ include/client/rpc/forward_metadata.hpp | 6 + include/client/rpc/rpc_types.hpp | 137 ++++++++++++++++++++ include/common/common_defs.hpp | 1 + include/common/rpc/rpc_types.hpp | 7 + include/config.hpp | 1 + include/daemon/handler/rpc_defs.hpp | 6 + src/client/gkfs_functions.cpp | 51 ++++++++ src/client/open_file_map.cpp | 12 +- src/client/rpc/forward_metadata.cpp | 41 ++++++ src/daemon/handler/srv_metadata.cpp | 61 +++++++++ tests/integration/data/test_inline_1rpc.py | 116 +++++++++++++++++ tests/integration/data/test_inline_data.py | 30 ++--- tests/integration/harness/gkfs.io/open.cpp | 3 + tests/integration/harness/gkfs.io/write.cpp | 20 ++- tests/integration/harness/gkfs.py | 24 ++-- 16 files changed, 495 insertions(+), 30 deletions(-) create mode 100644 tests/integration/data/test_inline_1rpc.py diff --git a/include/client/open_file_map.hpp b/include/client/open_file_map.hpp index da5298f0d..508755656 100644 --- a/include/client/open_file_map.hpp +++ b/include/client/open_file_map.hpp @@ -61,6 +61,8 @@ enum class OpenFile_flags { wronly, rdwr, cloexec, + created, // indicates if the file was created during open + creation_pending, // indicates if the file creation is delayed flag_count // this is purely used as a size variable of this enum class }; @@ -75,6 +77,7 @@ protected: unsigned long pos_; std::mutex pos_mutex_; std::mutex flag_mutex_; + mode_t mode_; public: // multiple threads may want to update the file position if fd has been @@ -106,6 +109,12 @@ public: FileType type() const; + + mode_t + mode() const; + + void + mode(mode_t mode_); }; diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index e1761b40a..9ea2afc38 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -43,6 +43,7 @@ #include #include #include +#include /* Forward declaration */ namespace gkfs { namespace filemap { @@ -62,6 +63,11 @@ namespace rpc { int forward_create(const std::string& path, mode_t mode, const int copy); +int +forward_create_write_inline(const std::string& path, mode_t mode, + const std::string& data, uint64_t count, + const int copy); + int forward_stat(const std::string& path, std::string& attr, const int copy); diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index b5bf35991..cf9161db8 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -369,6 +369,143 @@ struct create { }; }; +//============================================================================== +// definitions for create_write_inline +struct create_write_inline { + + // forward declarations of public input/output types for this RPC + class input; + + class output; + + // traits used so that the engine knows what to do with the RPC + using self_type = create_write_inline; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = rpc_create_write_inline_in_t; + using mercury_output_type = rpc_create_write_inline_out_t; + + // RPC public identifier + constexpr static const uint64_t public_id = 7; // Custom ID + + // RPC internal Mercury identifier + constexpr static const hg_id_t mercury_id = 0; + + // RPC name + constexpr static const auto name = gkfs::rpc::tag::create_write_inline; + + // requires response? + constexpr static const auto requires_response = true; + + // Mercury callback to serialize input arguments + constexpr static const auto mercury_in_proc_cb = + HG_GEN_PROC_NAME(rpc_create_write_inline_in_t); + + // Mercury callback to serialize output arguments + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_create_write_inline_out_t); + + class input { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input(const std::string& path, uint32_t mode, const std::string& data, + uint64_t count) + : m_path(path), m_mode(mode), m_data(data), m_count(count) {} + + input(input&& rhs) = default; + + input(const input& other) = default; + + input& + operator=(input&& rhs) = default; + + input& + operator=(const input& other) = default; + + std::string + path() const { + return m_path; + } + + uint32_t + mode() const { + return m_mode; + } + + std::string + data() const { + return m_data; + } + + uint64_t + count() const { + return m_count; + } + + explicit input(const rpc_create_write_inline_in_t& other) + : m_path(other.path), m_mode(other.mode), m_data(other.data), + m_count(other.count) {} + + explicit + operator rpc_create_write_inline_in_t() { + return {m_path.c_str(), m_mode, m_data.c_str(), m_count}; + } + + private: + std::string m_path; + uint32_t m_mode; + std::string m_data; + uint64_t m_count; + }; + + class output { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err(), m_io_size() {} + + output(int32_t err, uint64_t io_size) + : m_err(err), m_io_size(io_size) {} + + output(output&& rhs) = default; + + output(const output& other) = default; + + output& + operator=(output&& rhs) = default; + + output& + operator=(const output& other) = default; + + explicit output(const rpc_create_write_inline_out_t& out) { + m_err = out.err; + m_io_size = out.io_size; + } + + int32_t + err() const { + return m_err; + } + + uint64_t + io_size() const { + return m_io_size; + } + + private: + int32_t m_err; + uint64_t m_io_size; + }; +}; + //============================================================================== // definitions for stat struct stat { diff --git a/include/common/common_defs.hpp b/include/common/common_defs.hpp index 619492f91..ce57cdb56 100644 --- a/include/common/common_defs.hpp +++ b/include/common/common_defs.hpp @@ -59,6 +59,7 @@ namespace tag { constexpr auto fs_config = "rpc_srv_fs_config"; constexpr auto create = "rpc_srv_mk_node"; +constexpr auto create_write_inline = "rpc_srv_create_write_inline"; constexpr auto stat = "rpc_srv_stat"; constexpr auto remove_metadata = "rpc_srv_rm_metadata"; constexpr auto remove_data = "rpc_srv_rm_data"; diff --git a/include/common/rpc/rpc_types.hpp b/include/common/rpc/rpc_types.hpp index 066855e3c..262b56792 100644 --- a/include/common/rpc/rpc_types.hpp +++ b/include/common/rpc/rpc_types.hpp @@ -202,6 +202,13 @@ MERCURY_GEN_PROC( rpc_write_inline_out_t, ((hg_int32_t) (err))((hg_int64_t) (ret_offset))((hg_size_t) (io_size))) +MERCURY_GEN_PROC(rpc_create_write_inline_in_t, + ((hg_const_string_t) (path))((uint32_t) (mode))( + (hg_const_string_t) (data))((hg_uint64_t) (count))) + +MERCURY_GEN_PROC(rpc_create_write_inline_out_t, + ((hg_int32_t) (err))((hg_uint64_t) (io_size))) + // Input: path, offset, read length MERCURY_GEN_PROC(rpc_read_inline_in_t, ((hg_const_string_t) (path))((hg_uint64_t) (offset))( diff --git a/include/config.hpp b/include/config.hpp index 3ca38a0c5..a36f6b97f 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -136,6 +136,7 @@ constexpr auto implicit_data_removal = true; constexpr auto create_exist_check = true; constexpr auto use_inline_data = true; constexpr auto inline_data_size = 4096; // in bytes +constexpr auto create_write_optimization = true; } // namespace metadata namespace data { // directory name below rootdir where chunks are placed diff --git a/include/daemon/handler/rpc_defs.hpp b/include/daemon/handler/rpc_defs.hpp index de6f84f27..3ac9ab24f 100644 --- a/include/daemon/handler/rpc_defs.hpp +++ b/include/daemon/handler/rpc_defs.hpp @@ -79,6 +79,12 @@ DECLARE_MARGO_RPC_HANDLER(rpc_srv_rename) // data DECLARE_MARGO_RPC_HANDLER(rpc_srv_remove_data) +DECLARE_MARGO_RPC_HANDLER(rpc_srv_measure) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_create_write_inline) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_create) + DECLARE_MARGO_RPC_HANDLER(rpc_srv_read) DECLARE_MARGO_RPC_HANDLER(rpc_srv_write) diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 0ab61c221..a98675d2b 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -207,6 +207,21 @@ gkfs_open(const std::string& path, mode_t mode, int flags) { } // no access check required here. If one is using our FS they have the // permissions. + if(gkfs::config::metadata::create_write_optimization && + gkfs::config::metadata::use_inline_data && !(flags & O_EXCL)) { + // OPTIMIZATION: fwd_create is delayed until write or close + auto fd = CTX->file_map()->add( + std::make_shared(path, flags)); + auto file = CTX->file_map()->get(fd); + file->mode(mode); + file->set_flag(gkfs::filemap::OpenFile_flags::creation_pending, + true); + if(CTX->protect_files_generator()) { + generate_lock_file(path, true); + } + return fd; + } + auto err = gkfs_create(path, mode | S_IFREG); if(err) { if(errno == EEXIST) { @@ -1072,6 +1087,36 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'", __func__, *path, count, offset, is_append); + // OPTIMIZATION: Delayed creation + if(file.get_flag(gkfs::filemap::OpenFile_flags::creation_pending)) { + // Optimization: Create and write in one RPC + if(gkfs::config::metadata::use_inline_data && + (offset + count) <= gkfs::config::metadata::inline_data_size) { + auto ret_inline = gkfs::rpc::forward_create_write_inline( + *path, file.mode(), std::string(buf, count), count, + 0); // TODO: handle replicas + // if success, we are done + if(ret_inline == 0) { + file.set_flag(gkfs::filemap::OpenFile_flags::creation_pending, + false); + if(update_pos) + file.pos(offset + count); + return count; + } + // fallthrough if failed (e.g., EEXIST if semantics required it, but + // we handled that in open) + } + + // Fallback: Create first + auto err = gkfs_create(*path, file.mode()); + if(err && errno != EEXIST) { + // If error and NOT EEXIST, fail. + // If EEXIST, we proceed to write (as open without O_EXCL allows it) + return -1; + } + file.set_flag(gkfs::filemap::OpenFile_flags::creation_pending, false); + } + // OPTIMIZATION: Inline Write if(gkfs::config::metadata::use_inline_data && @@ -1879,6 +1924,12 @@ int gkfs_close(unsigned int fd) { auto file = CTX->file_map()->get(fd); if(file) { + if(file->get_flag(gkfs::filemap::OpenFile_flags::creation_pending)) { + gkfs_create(file->path(), file->mode()); + file->set_flag(gkfs::filemap::OpenFile_flags::creation_pending, + false); + } + // flush write size cache to be server consistent if(CTX->use_write_size_cache()) { auto err = CTX->write_size_cache()->flush(file->path(), true).first; diff --git a/src/client/open_file_map.cpp b/src/client/open_file_map.cpp index 8ccec5f03..d9af716c5 100644 --- a/src/client/open_file_map.cpp +++ b/src/client/open_file_map.cpp @@ -52,7 +52,7 @@ using namespace std; namespace gkfs::filemap { OpenFile::OpenFile(const string& path, const int flags, FileType type) - : type_(type), path_(path) { + : type_(type), path_(path), mode_(0) { // set flags to OpenFile if(flags & O_CREAT) flags_[gkfs::utils::to_underlying(OpenFile_flags::creat)] = true; @@ -111,6 +111,16 @@ OpenFile::type() const { return type_; } +mode_t +OpenFile::mode() const { + return mode_; +} + +void +OpenFile::mode(mode_t mode_) { + OpenFile::mode_ = mode_; +} + // OpenFileMap starts here shared_ptr diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 8f7249631..5f4be34ba 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -93,6 +93,47 @@ forward_create(const std::string& path, const mode_t mode, const int copy) { } } +/** + * Send an RPC for a create request and write data + * @param path + * @param mode + * @param data + * @param count + * @param copy Number of replica to create + * @return error code + */ +int +forward_create_write_inline(const std::string& path, const mode_t mode, + const std::string& data, const uint64_t count, + const int copy) { + if(gkfs::config::proxy::fwd_create && CTX->use_proxy()) { + LOG(WARNING, "{} was called even though proxy should be used!", + __func__); + } + auto endp = CTX->hosts().at( + CTX->distributor()->locate_file_metadata(path, copy)); + + try { + LOG(DEBUG, "Sending RPC ..."); + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we + // can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + auto out = ld_network_service + ->post( + endp, path, mode, data, count) + .get() + .at(0); + LOG(DEBUG, "Got response success: {}", out.err()); + + return out.err() ? out.err() : 0; + } catch(const std::exception& ex) { + LOG(ERROR, "while getting rpc output"); + return EBUSY; + } +} + /** * Send an RPC for a stat request * @param path diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 63c98c8f3..91fb62007 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -1136,6 +1136,65 @@ rpc_srv_write_data_inline(hg_handle_t handle) { return HG_SUCCESS; } +/** + * @brief Serves a request to create a file and write inline data in a single + * RPC. + * @return Mercury error code + */ +hg_return_t +rpc_srv_create_write_inline(hg_handle_t handle) { + rpc_create_write_inline_in_t in{}; + rpc_create_write_inline_out_t out{}; + out.err = EIO; + out.io_size = 0; + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + + GKFS_DATA->spdlogger()->debug("{}() path: '{}', mode: '{}', size: '{}'", + __func__, in.path, in.mode, in.count); + + try { + // 1. Create Metadentry + gkfs::metadata::Metadata md(in.mode); + gkfs::metadata::create(in.path, md); + + // 2. Decode Data + auto decoded_data = gkfs::rpc::base64_decode_to_string(in.data); + + // 3. Write Inline Data + if(decoded_data.size() > gkfs::config::metadata::inline_data_size) { + out.err = EFBIG; + } else { + md.inline_data(decoded_data); + md.size(decoded_data.size()); + gkfs::metadata::update(in.path, md); + out.err = 0; + out.io_size = decoded_data.size(); + } + + } catch(const gkfs::metadata::ExistsException& e) { + out.err = EEXIST; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to create/write inline: '{}'", __func__, e.what()); + out.err = -1; + } + + auto hret = margo_respond(handle, &out); + if(hret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + } + + margo_free_input(handle, &in); + margo_destroy(handle); + return HG_SUCCESS; +} + /** * @brief Serves a read request for inline data (stored in RocksDB). * @return Mercury error code @@ -1250,4 +1309,6 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_rename) DEFINE_MARGO_RPC_HANDLER(rpc_srv_write_data_inline) +DEFINE_MARGO_RPC_HANDLER(rpc_srv_create_write_inline) + DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data_inline) \ No newline at end of file diff --git a/tests/integration/data/test_inline_1rpc.py b/tests/integration/data/test_inline_1rpc.py new file mode 100644 index 000000000..53db1dcd0 --- /dev/null +++ b/tests/integration/data/test_inline_1rpc.py @@ -0,0 +1,116 @@ +import pytest +import os +import stat +from harness.logger import logger + +def test_inline_1rpc_optimized(gkfs_daemon, gkfs_client): + """Test 1-RPC Create+Write optimization""" + file = gkfs_daemon.mountdir / "file_1rpc_opt" + # Open (O_CREAT) + Write (small) in one command to ensure single process and trigger optimization + buf = 'A' * 100 + # gkfs.io write --creat + ret = gkfs_client.run('write', file, buf, len(buf), '--creat') + assert ret.retval == len(buf) + + # Close + # assert ret.retval == len(buf) + # Actually gkfs_client.close takes 'fd' in some harnesses or 'file' if it manages map. + # Looking at previous test, it doesn't show close calls explicitly often, or uses context managers? + # Harness `gkfs_client` usually has `open` returning an object or struct. + # Let's check `test_inline_data.py` again. It uses `gkfs_client.open` returning `ret`. + # It does NOT show close. Implicit close on harness cleanup or next open? + # Explicit close is `gkfs_client.close(fd)`. + # But `gkfs_client` in `test_inline_data.py` returns a wrapper with `retval`. + # I'll check `harness/client.py` or just assume I need to pass the fd returned by open. + # Re-reading `test_inline_data.py`: it doesn't call close. + # I will call close logic if possible to test the close-fallback, but for this test case (write happened), close does nothing special. + + # Verify content + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == len(buf) + + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == len(buf) + + # Verify content + ret = gkfs_client.read(file, len(buf)) + assert ret.retval == len(buf) + assert ret.buf == buf.encode() + +def test_inline_1rpc_fallback_close(gkfs_daemon, gkfs_client): + """Test 1-RPC optimization fallback: Open(O_CREAT) -> Close (create empty file)""" + file = gkfs_daemon.mountdir / "file_1rpc_empty" + + # gkfs.io open + # O_CREAT = 64 (0o100) + ret = gkfs_client.open(file, os.O_CREAT | os.O_WRONLY, 0o644) + assert ret.retval != -1 + + # Verify file exists and is empty + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 0 + +def test_inline_1rpc_fallback_large_write(gkfs_daemon, gkfs_client): + """Test 1-RPC optimization fallback: Open(O_CREAT) -> Write(large) (explicit create)""" + file = gkfs_daemon.mountdir / "file_1rpc_large" + + # Write larger than inline size (assuming 4096 default) + # gkfs.io write --creat + size = 10000 + buf = 'B' * size + ret = gkfs_client.run('write', file, buf, size, '--creat') + # assert ret.retval == size # gkfs.io write output might be limited by how it prints/returns? + # write command returns written bytes in 'retval' + assert ret.retval == size + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == size + + # Verify content + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == size + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, size) + assert ret.retval == size + assert ret.buf == buf.encode() + +def test_inline_1rpc_no_opt_o_excl(gkfs_daemon, gkfs_client): + """Test O_EXCL disables optimization""" + file = gkfs_daemon.mountdir / "file_no_opt_excl" + + # Open O_CREAT | O_EXCL (Optimization should be disabled) + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY | os.O_EXCL, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # If optimization was disabled, file should exist immediately. + # But validation is hard from client side without out-of-band checks. + # We mainly verify it works correctly. + + buf = b'A' * 100 + ret = gkfs_client.write(file, buf, len(buf)) + assert ret.retval == len(buf) + + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 100 diff --git a/tests/integration/data/test_inline_data.py b/tests/integration/data/test_inline_data.py index 99d2846d9..dacc0d358 100644 --- a/tests/integration/data/test_inline_data.py +++ b/tests/integration/data/test_inline_data.py @@ -14,7 +14,7 @@ def test_inline_append(gkfs_daemon, gkfs_client): assert ret.retval != -1 # Write initial data (inline) - buf1 = b'A' * 100 + buf1 = 'A' * 100 ret = gkfs_client.write(file, buf1, len(buf1)) assert ret.retval == len(buf1) @@ -24,7 +24,7 @@ def test_inline_append(gkfs_daemon, gkfs_client): assert ret.retval != -1 # Append data (inline) - buf2 = b'B' * 100 + buf2 = 'B' * 100 ret = gkfs_client.write(file, buf2, len(buf2), 1) # write with O_APPEND assert ret.retval == len(buf2) @@ -41,7 +41,7 @@ def test_inline_append(gkfs_daemon, gkfs_client): ret = gkfs_client.read(file, 200) assert ret.retval == 200 - assert ret.buf == buf1 + buf2 + assert ret.buf == (buf1 + buf2).encode() def test_inline_pwrite(gkfs_daemon, gkfs_client): """Test inline data overwrite using pwrite""" @@ -53,12 +53,12 @@ def test_inline_pwrite(gkfs_daemon, gkfs_client): assert ret.retval != -1 # Write initial data - buf1 = b'A' * 100 + buf1 = 'A' * 100 ret = gkfs_client.write(file, buf1, len(buf1)) assert ret.retval == len(buf1) # Overwrite middle part - buf2 = b'B' * 50 + buf2 = 'B' * 50 ret = gkfs_client.pwrite(file, buf2, len(buf2), 25) assert ret.retval == len(buf2) @@ -89,7 +89,7 @@ def test_inline_overflow_append(gkfs_daemon, gkfs_client): assert ret.retval != -1 # Write almost full inline data - buf1 = b'A' * 4000 + buf1 = 'A' * 4000 ret = gkfs_client.write(file, buf1, len(buf1)) assert ret.retval == len(buf1) @@ -100,7 +100,7 @@ def test_inline_overflow_append(gkfs_daemon, gkfs_client): assert ret.retval != -1 # Append enough to overflow 4096 - buf2 = b'B' * 200 + buf2 = 'B' * 200 ret = gkfs_client.write(file, buf2, len(buf2), 1) # Pass append flag assert ret.retval == len(buf2) @@ -117,7 +117,7 @@ def test_inline_overflow_append(gkfs_daemon, gkfs_client): ret = gkfs_client.read(file, 4200) assert ret.retval == 4200 - assert ret.buf == buf1 + buf2 + assert ret.buf == (buf1 + buf2).encode() def test_inline_overflow_pwrite(gkfs_daemon, gkfs_client): """Test pwrite that overflows inline limit (migration to chunks)""" @@ -129,12 +129,12 @@ def test_inline_overflow_pwrite(gkfs_daemon, gkfs_client): assert ret.retval != -1 # Write small inline data - buf1 = b'A' * 100 + buf1 = 'A' * 100 ret = gkfs_client.write(file, buf1, len(buf1)) assert ret.retval == len(buf1) # Pwrite far beyond inline limit (creating hole) - buf2 = b'B' * 100 + buf2 = 'B' * 100 offset = 5000 ret = gkfs_client.pwrite(file, buf2, len(buf2), offset) assert ret.retval == len(buf2) @@ -158,9 +158,9 @@ def test_inline_overflow_pwrite(gkfs_daemon, gkfs_client): assert ret.retval == 5100 read_buf = ret.buf - assert read_buf[0:100] == buf1 + assert read_buf[0:100] == buf1.encode() assert read_buf[100:offset] == b'\x00' * (offset - 100) - assert read_buf[offset:offset+100] == buf2 + assert read_buf[offset:offset+100] == buf2.encode() def test_inline_overwrite_pwrite(gkfs_daemon, gkfs_client): """Test pwrite at offset 0 that overflows inline limit (migration/clearing)""" @@ -172,13 +172,13 @@ def test_inline_overwrite_pwrite(gkfs_daemon, gkfs_client): assert ret.retval != -1 # Write small inline data - buf1 = b'A' * 100 + buf1 = 'A' * 100 ret = gkfs_client.write(file, buf1, len(buf1)) assert ret.retval == len(buf1) # Overwrite with large data at offset 0 # This should force chunk write and clear inline data - buf2 = b'B' * 5000 + buf2 = 'B' * 5000 ret = gkfs_client.pwrite(file, buf2, len(buf2), 0) assert ret.retval == len(buf2) @@ -195,6 +195,6 @@ def test_inline_overwrite_pwrite(gkfs_daemon, gkfs_client): ret = gkfs_client.read(file, 5000) assert ret.retval == 5000 - assert ret.buf == buf2 + assert ret.buf == buf2.encode() diff --git a/tests/integration/harness/gkfs.io/open.cpp b/tests/integration/harness/gkfs.io/open.cpp index 599d0ae4c..ef6a76f9c 100644 --- a/tests/integration/harness/gkfs.io/open.cpp +++ b/tests/integration/harness/gkfs.io/open.cpp @@ -95,6 +95,9 @@ open_exec(const open_options& opts) { json out = open_output{fd, errno}; fmt::print("{}\n", out.dump(2)); + if(fd >= 0) { + ::close(fd); + } return; } diff --git a/tests/integration/harness/gkfs.io/write.cpp b/tests/integration/harness/gkfs.io/write.cpp index c74880235..fd1ff60f5 100644 --- a/tests/integration/harness/gkfs.io/write.cpp +++ b/tests/integration/harness/gkfs.io/write.cpp @@ -60,11 +60,16 @@ struct write_options { std::string data; ::size_t count; bool append{false}; + bool creat{false}; + ::mode_t mode; REFL_DECL_STRUCT(write_options, REFL_DECL_MEMBER(bool, verbose), REFL_DECL_MEMBER(std::string, pathname), REFL_DECL_MEMBER(std::string, data), - REFL_DECL_MEMBER(::size_t, count)); + REFL_DECL_MEMBER(::size_t, count), + REFL_DECL_MEMBER(bool, append), + REFL_DECL_MEMBER(bool, creat), + REFL_DECL_MEMBER(::mode_t, mode)); }; struct write_output { @@ -85,7 +90,9 @@ write_exec(const write_options& opts) { auto flags = O_WRONLY; if(opts.append) flags |= O_APPEND; - auto fd = ::open(opts.pathname.c_str(), flags); + if(opts.creat) + flags |= O_CREAT; + auto fd = ::open(opts.pathname.c_str(), flags, opts.mode); if(fd == -1) { if(opts.verbose) { @@ -142,5 +149,14 @@ write_init(CLI::App& app) { ->default_val(false) ->type_name(""); + cmd->add_flag("-c,--creat", opts->creat, + "Create file if it does not exist"); + + cmd->add_option("-m,--mode", opts->mode, + "Octal mode specified for the new file (e.g. 0664)") + ->default_val(0644) + ->type_name("") + ->check(CLI::NonNegativeNumber); + cmd->callback([opts]() { write_exec(*opts); }); } diff --git a/tests/integration/harness/gkfs.py b/tests/integration/harness/gkfs.py index e9722b5ed..277d7eda6 100644 --- a/tests/integration/harness/gkfs.py +++ b/tests/integration/harness/gkfs.py @@ -250,8 +250,8 @@ class Daemon: self._proxy = proxy libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) self._patched_env = { 'LD_LIBRARY_PATH' : libdirs, @@ -268,7 +268,7 @@ class Daemon: '-l', self._address, '--metadir', self._metadir.as_posix(), '--dbbackend', self._database, - '--output-stats', self.logdir / 'stats.log', + '--output-stats', (self.logdir / 'stats.log').as_posix(), '--enable-collection', '--enable-chunkstats'] if self._database == "parallaxdb" : @@ -394,8 +394,8 @@ class Proxy: self._env = os.environ.copy() libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) self._patched_env = { 'LD_LIBRARY_PATH' : libdirs, @@ -540,8 +540,8 @@ class Client: self._proxy = proxy libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) # ensure the client interception library is available: # to avoid running code with potentially installed libraries, @@ -594,14 +594,14 @@ class Client: logger.debug(f"patched env: {pformat(self._patched_env)}") out = self._cmd( - [ cmd ] + list(args), + [ str(cmd) ] + [ str(a) for a in args ], _env = self._env, # _out=sys.stdout, # _err=sys.stderr, ) - logger.debug(f"command output: {out.stdout}") - return self._parser.parse(cmd, out.stdout) + logger.debug(f"command output: {out}") + return self._parser.parse(cmd, out) def __getattr__(self, name): return _proxy_exec(self, name) @@ -624,8 +624,8 @@ class ClientLibc: self._env = os.environ.copy() libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) # ensure the client interception library is available: # to avoid running code with potentially installed libraries, -- GitLab