diff --git a/CHANGELOG.md b/CHANGELOG.md index 047b05db7b157a4e05a01c4ac413aa09c4139d61..4f467fef9a859b4a2c71fffca1a28048742e6d1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Compress directory data with zstd. - Make a new config.hpp option for controlling the compression - If directory buffer is not enough it will reattempt with the exact size + - Metadata server can store small data ([!271](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/271)) + - Using config.hpp use_inline_data = true; and inline_data_size = 4096; + - Data is stored in base64, as we use string to send the small data content (not bulk transfer) ### Changed diff --git a/README.md b/README.md index d4232a88a70b7343914c16fe0a04b33d39212c6a..711336170b85371822aec814bb603fa74d9cd5a6 100644 --- a/README.md +++ b/README.md @@ -510,6 +510,10 @@ Note, that a chunk/host configuration is inherited to all children files automat In this example, `/mdt-hard/file1` is therefore also using the same distribution as the `/mdt-hard` directory. If no prefix is used, the Simple Hash distributor is used. +## Small Data Store +Small files can be stored using the metadata server, this is controlled with the `config.hpp` options: +`use_inline_data = true` and `inline_data_size` + #### Guided configuration file Creating a guided configuration file is based on an I/O trace file of a previous execution of the application. diff --git a/include/client/open_file_map.hpp b/include/client/open_file_map.hpp index da5298f0da1746590f420c390f17c245a309993a..508755656db6a9f93c08287b512bb7a17257876a 100644 --- a/include/client/open_file_map.hpp +++ b/include/client/open_file_map.hpp @@ -61,6 +61,8 @@ enum class OpenFile_flags { wronly, rdwr, cloexec, + created, // indicates if the file was created during open + creation_pending, // indicates if the file creation is delayed flag_count // this is purely used as a size variable of this enum class }; @@ -75,6 +77,7 @@ protected: unsigned long pos_; std::mutex pos_mutex_; std::mutex flag_mutex_; + mode_t mode_; public: // multiple threads may want to update the file position if fd has been @@ -106,6 +109,12 @@ public: FileType type() const; + + mode_t + mode() const; + + void + mode(mode_t mode_); }; diff --git a/include/client/preload_util.hpp b/include/client/preload_util.hpp index fbdab48e97fde3fd44a122df46bc71fbad200ec6..bdff4678c0b0b45e0d06242f9697fd3cceb1d41c 100644 --- a/include/client/preload_util.hpp +++ b/include/client/preload_util.hpp @@ -98,7 +98,7 @@ metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, */ std::pair update_file_size(const std::string& path, size_t count, off64_t offset, - bool is_append); + bool is_append, bool clear_inline_flag = false); void load_hosts(); diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index 7ab81e62b2b6191a9a23e6abc6812dc570098407..9ea2afc3826b3c6f75cb30c9e036f6a3011ccd17 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -43,6 +43,7 @@ #include #include #include +#include /* Forward declaration */ namespace gkfs { namespace filemap { @@ -62,6 +63,11 @@ namespace rpc { int forward_create(const std::string& path, mode_t mode, const int copy); +int +forward_create_write_inline(const std::string& path, mode_t mode, + const std::string& data, uint64_t count, + const int copy); + int forward_stat(const std::string& path, std::string& attr, const int copy); @@ -86,7 +92,7 @@ forward_update_metadentry(const std::string& path, std::pair forward_update_metadentry_size(const std::string& path, size_t size, off64_t offset, bool append_flag, - const int num_copies); + bool clear_inline_flag, const int num_copies); std::pair forward_get_metadentry_size(const std::string& path, const int copy); @@ -105,6 +111,34 @@ forward_mk_symlink(const std::string& path, const std::string& target_path); #endif +/** + * @brief Send an RPC request to write a small amount of data directly + * to the metadata server (inline). + * + * @param path The file path. + * @param buf Pointer to the data buffer. + * @param offset The file offset. + * @param append_flag Whether to append to the file. + * @return std::pair Error code and offset written. + */ +std::pair +forward_write_inline(const std::string& path, const void* buf, off64_t offset, + size_t write_size, bool append_flag); + +/** + * @brief Send an RPC request to read a small amount of data directly + * from the metadata server (inline). + * + * @param path The file path. + * @param buf Pointer to the destination buffer. + * @param offset The file offset. + * @param read_size The number of bytes to read. + * @return std::pair Error code and bytes read. + */ +std::pair +forward_read_inline(const std::string& path, void* buf, off64_t offset, + size_t read_size); + } // namespace rpc } // namespace gkfs diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index d093f4455aabb8e6f10d4a12a1de4b5fc09cf098..cf9161db8df26db5287d26d65ed5d8714b92a8ec 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -369,6 +369,143 @@ struct create { }; }; +//============================================================================== +// definitions for create_write_inline +struct create_write_inline { + + // forward declarations of public input/output types for this RPC + class input; + + class output; + + // traits used so that the engine knows what to do with the RPC + using self_type = create_write_inline; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = rpc_create_write_inline_in_t; + using mercury_output_type = rpc_create_write_inline_out_t; + + // RPC public identifier + constexpr static const uint64_t public_id = 7; // Custom ID + + // RPC internal Mercury identifier + constexpr static const hg_id_t mercury_id = 0; + + // RPC name + constexpr static const auto name = gkfs::rpc::tag::create_write_inline; + + // requires response? + constexpr static const auto requires_response = true; + + // Mercury callback to serialize input arguments + constexpr static const auto mercury_in_proc_cb = + HG_GEN_PROC_NAME(rpc_create_write_inline_in_t); + + // Mercury callback to serialize output arguments + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_create_write_inline_out_t); + + class input { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input(const std::string& path, uint32_t mode, const std::string& data, + uint64_t count) + : m_path(path), m_mode(mode), m_data(data), m_count(count) {} + + input(input&& rhs) = default; + + input(const input& other) = default; + + input& + operator=(input&& rhs) = default; + + input& + operator=(const input& other) = default; + + std::string + path() const { + return m_path; + } + + uint32_t + mode() const { + return m_mode; + } + + std::string + data() const { + return m_data; + } + + uint64_t + count() const { + return m_count; + } + + explicit input(const rpc_create_write_inline_in_t& other) + : m_path(other.path), m_mode(other.mode), m_data(other.data), + m_count(other.count) {} + + explicit + operator rpc_create_write_inline_in_t() { + return {m_path.c_str(), m_mode, m_data.c_str(), m_count}; + } + + private: + std::string m_path; + uint32_t m_mode; + std::string m_data; + uint64_t m_count; + }; + + class output { + + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err(), m_io_size() {} + + output(int32_t err, uint64_t io_size) + : m_err(err), m_io_size(io_size) {} + + output(output&& rhs) = default; + + output(const output& other) = default; + + output& + operator=(output&& rhs) = default; + + output& + operator=(const output& other) = default; + + explicit output(const rpc_create_write_inline_out_t& out) { + m_err = out.err; + m_io_size = out.io_size; + } + + int32_t + err() const { + return m_err; + } + + uint64_t + io_size() const { + return m_io_size; + } + + private: + int32_t m_err; + uint64_t m_io_size; + }; +}; + //============================================================================== // definitions for stat struct stat { @@ -1130,8 +1267,9 @@ struct update_metadentry_size { public: input(const std::string& path, uint64_t size, int64_t offset, - bool append) - : m_path(path), m_size(size), m_offset(offset), m_append(append) {} + bool append, bool clear_inline = false) + : m_path(path), m_size(size), m_offset(offset), m_append(append), + m_clear_inline(clear_inline) {} input(input&& rhs) = default; @@ -1163,9 +1301,14 @@ struct update_metadentry_size { return m_append; } + bool + clear_inline() const { + return m_clear_inline; + } + explicit input(const rpc_update_metadentry_size_in_t& other) : m_path(other.path), m_size(other.size), m_offset(other.offset), - m_append(other.append) {} + m_append(other.append), m_clear_inline(other.clear_inline) {} explicit operator rpc_update_metadentry_size_in_t() { @@ -1177,6 +1320,7 @@ struct update_metadentry_size { uint64_t m_size; int64_t m_offset; bool m_append; + bool m_clear_inline; }; class output { @@ -2452,6 +2596,161 @@ struct chunk_stat { }; }; +struct write_data_inline { + class input; + class output; + + using self_type = write_data_inline; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = rpc_write_inline_in_t; + using mercury_output_type = rpc_write_inline_out_t; + + constexpr static const uint64_t public_id = 60; // Unique ID + constexpr static const hg_id_t mercury_id = 0; + constexpr static const auto name = gkfs::rpc::tag::write_data_inline; + constexpr static const auto requires_response = true; + constexpr static const auto mercury_in_proc_cb = + HG_GEN_PROC_NAME(rpc_write_inline_in_t); + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_write_inline_out_t); + + class input { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input(const std::string& path, uint64_t offset, const std::string& data, + uint64_t count, bool append) + : m_path(path), m_offset(offset), m_data(data), m_count(count), + m_append(append) {} + + explicit input(const rpc_write_inline_in_t& other) + : m_path(other.path), m_offset(other.offset), m_data(other.data), + m_count(other.count), m_append(other.append) {} + + explicit + operator rpc_write_inline_in_t() { + return {m_path.c_str(), m_offset, m_data.c_str(), m_count, + static_cast(m_append)}; + } + + private: + std::string m_path; + uint64_t m_offset; + std::string m_data; + uint64_t m_count; + bool m_append; + }; + + class output { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err(0), m_ret_offset(0), m_io_size(0) {} + explicit output(const rpc_write_inline_out_t& out) + : m_err(out.err), m_ret_offset(out.ret_offset), + m_io_size(out.io_size) {} + int32_t + err() const { + return m_err; + } + int64_t + ret_offset() const { + return m_ret_offset; + } + size_t + io_size() const { + return m_io_size; + } + + private: + int32_t m_err; + int64_t m_ret_offset; + size_t m_io_size; + }; +}; + +//============================================================================== +// definitions for read_data_inline +struct read_data_inline { + class input; + class output; + + using self_type = read_data_inline; + using handle_type = hermes::rpc_handle; + using input_type = input; + using output_type = output; + using mercury_input_type = rpc_read_inline_in_t; + using mercury_output_type = rpc_read_inline_out_t; + + constexpr static const uint64_t public_id = 61; // Unique ID + constexpr static const hg_id_t mercury_id = 0; + constexpr static const auto name = gkfs::rpc::tag::read_data_inline; + constexpr static const auto requires_response = true; + constexpr static const auto mercury_in_proc_cb = + HG_GEN_PROC_NAME(rpc_read_inline_in_t); + constexpr static const auto mercury_out_proc_cb = + HG_GEN_PROC_NAME(rpc_read_inline_out_t); + + class input { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + input(const std::string& path, uint64_t offset, uint64_t count) + : m_path(path), m_offset(offset), m_count(count) {} + + explicit input(const rpc_read_inline_in_t& other) + : m_path(other.path), m_offset(other.offset), m_count(other.count) { + } + + explicit + operator rpc_read_inline_in_t() { + return {m_path.c_str(), m_offset, m_count}; + } + + private: + std::string m_path; + uint64_t m_offset; + uint64_t m_count; + }; + + class output { + template + friend hg_return_t + hermes::detail::post_to_mercury(ExecutionContext*); + + public: + output() : m_err(0), m_count(0) {} + explicit output(const rpc_read_inline_out_t& out) + : m_err(out.err), m_data(out.data ? out.data : ""), + m_count(out.count) {} + + int32_t + err() const { + return m_err; + } + const std::string& + data() const { + return m_data; + } + uint64_t + count() const { + return m_count; + } + + private: + int32_t m_err; + std::string m_data; + uint64_t m_count; + }; +}; //============================================================================== // definitions for write_data struct write_data_proxy { @@ -4180,6 +4479,8 @@ struct expand_finalize { }; } // namespace malleable::rpc + + } // namespace gkfs diff --git a/include/common/common_defs.hpp b/include/common/common_defs.hpp index 70e50ca3adebcb908717403c23532c58c791469b..ce57cdb56ad3263d599d9e40a9ef5925ce5d7897 100644 --- a/include/common/common_defs.hpp +++ b/include/common/common_defs.hpp @@ -59,6 +59,7 @@ namespace tag { constexpr auto fs_config = "rpc_srv_fs_config"; constexpr auto create = "rpc_srv_mk_node"; +constexpr auto create_write_inline = "rpc_srv_create_write_inline"; constexpr auto stat = "rpc_srv_stat"; constexpr auto remove_metadata = "rpc_srv_rm_metadata"; constexpr auto remove_data = "rpc_srv_rm_data"; @@ -97,6 +98,10 @@ constexpr auto client_proxy_get_dirents_extended = constexpr auto proxy_daemon_write = "proxy_daemon_rpc_srv_write_data"; constexpr auto proxy_daemon_read = "proxy_daemon_rpc_srv_read_data"; +// inline data operations +constexpr auto write_data_inline = "rpc_srv_write_data_inline"; +constexpr auto read_data_inline = "rpc_srv_read_data_inline"; + } // namespace tag namespace protocol { diff --git a/include/common/metadata.hpp b/include/common/metadata.hpp index 71459220870fd526861ffab98f3df1f5f76220a0..22d599736d8d4c21f35a40c7afe9894422486c69 100644 --- a/include/common/metadata.hpp +++ b/include/common/metadata.hpp @@ -72,7 +72,7 @@ private: // renamed path #endif #endif - + std::string inline_data_; void init_time(); @@ -85,6 +85,11 @@ public: Metadata(mode_t mode, const std::string& target_path); +#ifdef HAS_RENAME + Metadata(mode_t mode, const std::string& target_path, + const std::string& rename_path); +#endif + #endif // Construct from a binary representation of the object @@ -163,6 +168,11 @@ public: #endif // HAS_RENAME #endif // HAS_SYMLINKS + + std::string + inline_data() const; + void + inline_data(const std::string& data); }; } // namespace gkfs::metadata diff --git a/include/common/rpc/rpc_types.hpp b/include/common/rpc/rpc_types.hpp index dd1f6f091ddb2a0d7c91631035fabab02043b432..262b56792bba7fb7e2cc84db962b8dffd5ea22ec 100644 --- a/include/common/rpc/rpc_types.hpp +++ b/include/common/rpc/rpc_types.hpp @@ -81,7 +81,8 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, ((hg_const_string_t) (path))((hg_uint64_t) (size))( - (hg_int64_t) (offset))((hg_bool_t) (append))) + (hg_int64_t) (offset))((hg_bool_t) (append))( + (hg_bool_t) (clear_inline))) MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, ((hg_int32_t) (err))((hg_int64_t) (ret_offset))) @@ -190,4 +191,31 @@ MERCURY_GEN_PROC(rpc_expand_start_in_t, MERCURY_GEN_PROC(rpc_migrate_metadata_in_t, ((hg_const_string_t) (key))((hg_const_string_t) (value))) +// Inline write operations +MERCURY_GEN_PROC(rpc_write_inline_in_t, + ((hg_const_string_t) (path))((hg_uint64_t) (offset))( + (hg_const_string_t) (data))((hg_uint64_t) (count))( + (hg_bool_t) (append))) + +// Output: err, offset (for append), bytes written +MERCURY_GEN_PROC( + rpc_write_inline_out_t, + ((hg_int32_t) (err))((hg_int64_t) (ret_offset))((hg_size_t) (io_size))) + +MERCURY_GEN_PROC(rpc_create_write_inline_in_t, + ((hg_const_string_t) (path))((uint32_t) (mode))( + (hg_const_string_t) (data))((hg_uint64_t) (count))) + +MERCURY_GEN_PROC(rpc_create_write_inline_out_t, + ((hg_int32_t) (err))((hg_uint64_t) (io_size))) + +// Input: path, offset, read length +MERCURY_GEN_PROC(rpc_read_inline_in_t, + ((hg_const_string_t) (path))((hg_uint64_t) (offset))( + (hg_uint64_t) (count))) + +// Output: err, data buffer, bytes read +MERCURY_GEN_PROC( + rpc_read_inline_out_t, + ((hg_int32_t) (err))((hg_const_string_t) (data))((hg_uint64_t) (count))) #endif // LFS_RPC_TYPES_HPP diff --git a/include/common/rpc/rpc_util.hpp b/include/common/rpc/rpc_util.hpp index 7b7d325355724de94ce599c7e975191d7c24e9fe..c4eb1b06d596867ec3ef846d00108908d00283a5 100644 --- a/include/common/rpc/rpc_util.hpp +++ b/include/common/rpc/rpc_util.hpp @@ -74,6 +74,12 @@ compress_bitset(const std::vector& bytes); std::vector decompress_bitset(const std::string& compressedString); +std::string +base64_encode(const std::string& data); + +std::string +base64_decode_to_string(const std::string& encoded); + } // namespace gkfs::rpc #endif // GEKKOFS_COMMON_RPC_UTILS_HPP diff --git a/include/config.hpp b/include/config.hpp index 6f2f7cfb5672c3067f5de3c21e752481fcee8ad1..a36f6b97f5a646c9a212716e27a3cfd17a2298e5 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -134,6 +134,9 @@ constexpr auto implicit_data_removal = true; // Check for existence of file metadata before create. This done on RocksDB // level constexpr auto create_exist_check = true; +constexpr auto use_inline_data = true; +constexpr auto inline_data_size = 4096; // in bytes +constexpr auto create_write_optimization = true; } // namespace metadata namespace data { // directory name below rootdir where chunks are placed diff --git a/include/daemon/backend/metadata/db.hpp b/include/daemon/backend/metadata/db.hpp index 72fe2f9b2291cd194d7dfa87aa5f71392eee7391..2a320f58d706c795d2360a68c00da37730286670 100644 --- a/include/daemon/backend/metadata/db.hpp +++ b/include/daemon/backend/metadata/db.hpp @@ -136,7 +136,7 @@ public: */ off_t increase_size(const std::string& key, size_t io_size, off_t offset, - bool append); + bool append, bool clear_inline = false); /** * @brief Decreases only the size part of the metadata entry via a RocksDB diff --git a/include/daemon/backend/metadata/merge.hpp b/include/daemon/backend/metadata/merge.hpp index 73d0f71b93a04ffcba47c129a65fe9d1e93dafaf..e483256180814061276179aab842a97cd7d7be31 100644 --- a/include/daemon/backend/metadata/merge.hpp +++ b/include/daemon/backend/metadata/merge.hpp @@ -56,7 +56,8 @@ enum class OperandID : char { increase_size = 'i', decrease_size = 'd', create = 'c', - update_time = 't' + update_time = 't', + write_inline = 'w' }; @@ -103,11 +104,13 @@ private: */ uint16_t merge_id_; bool append_; + bool clear_inline_; public: - IncreaseSizeOperand(size_t size); + IncreaseSizeOperand(size_t size, bool clear_inline = false); - IncreaseSizeOperand(size_t size, uint16_t merge_id, bool append); + IncreaseSizeOperand(size_t size, uint16_t merge_id, bool append, + bool clear_inline = false); explicit IncreaseSizeOperand(const rdb::Slice& serialized_op); @@ -131,6 +134,11 @@ public: append() const { return append_; } + + bool + clear_inline() const { + return clear_inline_; + } }; /** * @brief Decrease size operand @@ -244,6 +252,35 @@ public: AllowSingleOperand() const override; }; +class WriteInlineOperand : public MergeOperand { +private: + constexpr const static char serialize_sep = ':'; + + size_t offset_; + std::string data_; + +public: + WriteInlineOperand(size_t offset, const std::string& data); + + explicit WriteInlineOperand(const rdb::Slice& serialized_op); + + OperandID + id() const override; + + std::string + serialize_params() const override; + + size_t + offset() const { + return offset_; + } + + const std::string& + data() const { + return data_; + } +}; + } // namespace gkfs::metadata #endif // DB_MERGE_HPP diff --git a/include/daemon/backend/metadata/metadata_backend.hpp b/include/daemon/backend/metadata/metadata_backend.hpp index bdafa5d7c4e85d02e891a945aefe961c08dfac73..972a26b744d802272b1a02af9ab2f72770737c4c 100644 --- a/include/daemon/backend/metadata/metadata_backend.hpp +++ b/include/daemon/backend/metadata/metadata_backend.hpp @@ -72,7 +72,7 @@ public: virtual off_t increase_size(const std::string& key, size_t size, off_t offset, - bool append) = 0; + bool append, bool clear_inline = false) = 0; virtual void decrease_size(const std::string& key, size_t size) = 0; @@ -133,9 +133,9 @@ public: off_t increase_size(const std::string& key, size_t size, off_t offset, - bool append) { + bool append, bool clear_inline = false) { return static_cast(*this).increase_size_impl(key, size, offset, - append); + append, clear_inline); } void diff --git a/include/daemon/backend/metadata/parallax_backend.hpp b/include/daemon/backend/metadata/parallax_backend.hpp index ced3522eabb0a138f5c90f6784b51dec80a3d0f8..d804038373adbd200a86bf17b346061499eeb17a 100644 --- a/include/daemon/backend/metadata/parallax_backend.hpp +++ b/include/daemon/backend/metadata/parallax_backend.hpp @@ -164,7 +164,7 @@ public: */ off_t increase_size_impl(const std::string& key, size_t io_size, off_t offset, - bool append); + bool append, bool clear_inline = false); /** * Decreases the size on the metadata diff --git a/include/daemon/backend/metadata/rocksdb_backend.hpp b/include/daemon/backend/metadata/rocksdb_backend.hpp index 1b0a1fd006276343948eca288546dad000ff294c..5d01d5782fcd54356e06db5a631339224464253e 100644 --- a/include/daemon/backend/metadata/rocksdb_backend.hpp +++ b/include/daemon/backend/metadata/rocksdb_backend.hpp @@ -148,7 +148,7 @@ public: */ off_t increase_size_impl(const std::string& key, size_t io_size, off_t offset, - bool append); + bool append, bool clear_inline = false); /** * Decreases the size on the metadata diff --git a/include/daemon/handler/rpc_defs.hpp b/include/daemon/handler/rpc_defs.hpp index c79734d21b70293f05838af6755f61e24845ac20..3ac9ab24f52db65d0fd151fede6a8b0ad085968a 100644 --- a/include/daemon/handler/rpc_defs.hpp +++ b/include/daemon/handler/rpc_defs.hpp @@ -79,6 +79,12 @@ DECLARE_MARGO_RPC_HANDLER(rpc_srv_rename) // data DECLARE_MARGO_RPC_HANDLER(rpc_srv_remove_data) +DECLARE_MARGO_RPC_HANDLER(rpc_srv_measure) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_create_write_inline) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_create) + DECLARE_MARGO_RPC_HANDLER(rpc_srv_read) DECLARE_MARGO_RPC_HANDLER(rpc_srv_write) @@ -107,5 +113,9 @@ DECLARE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize) DECLARE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata) +// inline data operations +DECLARE_MARGO_RPC_HANDLER(rpc_srv_write_data_inline) + +DECLARE_MARGO_RPC_HANDLER(rpc_srv_read_data_inline) #endif // GKFS_DAEMON_RPC_DEFS_HPP diff --git a/include/daemon/ops/metadentry.hpp b/include/daemon/ops/metadentry.hpp index 22868ea6b94b05058f456ab311b49aefbcb30fe0..cf9358d388e14ec18c4eed39cdd536806fe7c7f5 100644 --- a/include/daemon/ops/metadentry.hpp +++ b/include/daemon/ops/metadentry.hpp @@ -125,7 +125,8 @@ update(const std::string& path, Metadata& md); * @return starting offset for I/O operation */ off_t -update_size(const std::string& path, size_t io_size, off_t offset, bool append); +update_size(const std::string& path, size_t io_size, off_t offset, bool append, + bool clear_inline = false); /** * @brief Remove metadentry if exists diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 962dee8ec856c0481b2f14015868a37ce9ecfdf9..a98675d2be10d53a66dd3d4ada1c01fe4e0cee62 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -50,6 +50,8 @@ #include #include #include +#include +#include #include #ifdef GKFS_ENABLE_CLIENT_METRICS @@ -63,6 +65,7 @@ extern "C" { #include #include #include +#include } using namespace std; @@ -86,7 +89,8 @@ struct dirent_extended { namespace { // set to store void * addr, fd, length and offset -std::set> mmap_set; +// set to store void * addr, fd, length, offset, prot +std::set> mmap_set; /** * Checks if metadata for parent directory exists (can be disabled with * GKFS_CREATE_CHECK_PARENTS). errno may be set @@ -138,14 +142,18 @@ generate_lock_file(const std::string& path, bool increase) { auto lock_path = path + ".lockgekko"; if(increase) { auto md = gkfs::utils::get_metadata(lock_path); + size_t new_size = 1; if(!md) { gkfs::rpc::forward_create(lock_path, 0777 | S_IFREG, 0); + } else { + new_size = md->size() + 1; } - gkfs::rpc::forward_update_metadentry_size(lock_path, 1, 0, false, 0); + gkfs::rpc::forward_update_metadentry_size(lock_path, new_size, 0, false, + false, 0); } else { auto md = gkfs::utils::get_metadata(lock_path); if(md) { - if(md.value().size() == 1) { + if(md->size() == 1 || md->size() == 0) { LOG(DEBUG, "Deleting Lock file {}", lock_path); gkfs::rpc::forward_remove(lock_path, false, 0); } else { @@ -199,6 +207,21 @@ gkfs_open(const std::string& path, mode_t mode, int flags) { } // no access check required here. If one is using our FS they have the // permissions. + if(gkfs::config::metadata::create_write_optimization && + gkfs::config::metadata::use_inline_data && !(flags & O_EXCL)) { + // OPTIMIZATION: fwd_create is delayed until write or close + auto fd = CTX->file_map()->add( + std::make_shared(path, flags)); + auto file = CTX->file_map()->get(fd); + file->mode(mode); + file->set_flag(gkfs::filemap::OpenFile_flags::creation_pending, + true); + if(CTX->protect_files_generator()) { + generate_lock_file(path, true); + } + return fd; + } + auto err = gkfs_create(path, mode | S_IFREG); if(err) { if(errno == EEXIST) { @@ -968,6 +991,9 @@ gkfs_truncate(const std::string& path, off_t length) { while(!md.value().target_path().empty() and md.value().blocks() != -1) { new_path = md.value().target_path(); md = gkfs::utils::get_metadata(md.value().target_path()); + if(!md) { + return -1; + } } // This could be optimized auto size = md->size(); @@ -1060,6 +1086,103 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, auto num_replicas = CTX->get_replicas(); LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'", __func__, *path, count, offset, is_append); + + // OPTIMIZATION: Delayed creation + if(file.get_flag(gkfs::filemap::OpenFile_flags::creation_pending)) { + // Optimization: Create and write in one RPC + if(gkfs::config::metadata::use_inline_data && + (offset + count) <= gkfs::config::metadata::inline_data_size) { + auto ret_inline = gkfs::rpc::forward_create_write_inline( + *path, file.mode(), std::string(buf, count), count, + 0); // TODO: handle replicas + // if success, we are done + if(ret_inline == 0) { + file.set_flag(gkfs::filemap::OpenFile_flags::creation_pending, + false); + if(update_pos) + file.pos(offset + count); + return count; + } + // fallthrough if failed (e.g., EEXIST if semantics required it, but + // we handled that in open) + } + + // Fallback: Create first + auto err = gkfs_create(*path, file.mode()); + if(err && errno != EEXIST) { + // If error and NOT EEXIST, fail. + // If EEXIST, we proceed to write (as open without O_EXCL allows it) + return -1; + } + file.set_flag(gkfs::filemap::OpenFile_flags::creation_pending, false); + } + + + // OPTIMIZATION: Inline Write + if(gkfs::config::metadata::use_inline_data && + (is_append || + (offset + count) <= gkfs::config::metadata::inline_data_size)) { + + // Attempt inline write via Metadata RPC + auto ret_inline = gkfs::rpc::forward_write_inline( + file.path(), buf, offset, count, is_append); + auto err = ret_inline.first; + if(err == 0) { + if(update_pos) + file.pos(ret_inline.second + count); + return count; + } + } + + // If we are here, we are writing to chunks. + // Check if we need to migrate existing inline data to chunks. + // This is necessary if the file has inline data but we are now writing + // beyond the inline limit (or appending). + bool migrated = false; + if(gkfs::config::metadata::use_inline_data) { + auto md = gkfs::utils::get_metadata(*path); + if(md && md->size() > 0 && + md->size() <= gkfs::config::metadata::inline_data_size) { + LOG(DEBUG, "{}() Migrating inline data to chunks. Size: {}", + __func__, md->size()); + + // Write inline data to chunks + auto err_migration = gkfs::rpc::forward_write( + *path, md->inline_data().c_str(), 0, md->size(), 0); + if(err_migration.first) { + LOG(ERROR, "{}() Failed to migrate inline data to chunks", + __func__); + errno = err_migration.first; + return -1; + } + migrated = true; + } + } + + if(is_append) { + auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, + is_append, migrated); + err = ret_offset.first; + if(err) { + LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); + errno = err; + return -1; + } + // When append is set the EOF is set to the offset + // forward_update_metadentry_size returns. This is because + // it is an atomic operation on the server and reserves the + // space for this append + if(ret_offset.second == -1) { + LOG(ERROR, + "update_metadentry_size() received -1 as starting offset. " + "This occurs when the staring offset could not be extracted " + "from RocksDB's merge operations. Inform GekkoFS devs."); + errno = EIO; + return -1; + } + offset = ret_offset.second; + } + if(CTX->use_write_size_cache() && !is_append) { auto [size_update_cnt, cached_size] = CTX->write_size_cache()->record(*path, offset + count); @@ -1073,30 +1196,31 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, return -1; } } - } else { - auto ret_offset = - gkfs::utils::update_file_size(*path, count, offset, is_append); + // If we migrated, we must ensure the server knows to clear inline data + // even if we don't flush the size cache yet. + // However, the current cache implementation doesn't support passing + // extra flags. For now, if migrated is true, we force an update. + if(migrated) { + auto ret_offset = gkfs::utils::update_file_size( + *path, count, offset, is_append, migrated); + err = ret_offset.first; + if(err) { + LOG(ERROR, "update_metadentry_size() failed with err '{}'", + err); + errno = err; + return -1; + } + } + + } else if(!is_append) { + auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, + is_append, migrated); err = ret_offset.first; if(err) { LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); errno = err; return -1; } - if(is_append) { - // When append is set the EOF is set to the offset - // forward_update_metadentry_size returns. This is because - // it is an atomic operation on the server and reserves the - // space for this append - if(ret_offset.second == -1) { - LOG(ERROR, - "update_metadentry_size() received -1 as starting offset. " - "This occurs when the staring offset could not be extracted " - "from RocksDB's merge operations. Inform GekkoFS devs."); - errno = EIO; - return -1; - } - offset = ret_offset.second; - } } pair ret_write; @@ -1289,6 +1413,31 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count, memset(buf, 0, sizeof(char) * count); } + if(gkfs::config::metadata::use_inline_data && + offset < gkfs::config::metadata::inline_data_size) { + + // Forward the read request to the Metadata Server instead of Data + // Server + auto ret = + gkfs::rpc::forward_read_inline(file.path(), buf, offset, count); + auto err = ret.first; + + if(err == 0) { + // Success, return the number of bytes read + return ret.second; + } + + if(err != EAGAIN) { + errno = err; + return -1; + } + // If err == EAGAIN, it means the file exists but data is not inline + // (it migrated to chunks), so proceed to standard read below. + LOG(DEBUG, + "{}() Inline read missed (EAGAIN), falling back to chunk read", + __func__); + } + pair ret; if(gkfs::config::proxy::fwd_io && CTX->use_proxy() && count > gkfs::config::proxy::fwd_io_count_threshold) { @@ -1775,6 +1924,12 @@ int gkfs_close(unsigned int fd) { auto file = CTX->file_map()->get(fd); if(file) { + if(file->get_flag(gkfs::filemap::OpenFile_flags::creation_pending)) { + gkfs_create(file->path(), file->mode()); + file->set_flag(gkfs::filemap::OpenFile_flags::creation_pending, + false); + } + // flush write size cache to be server consistent if(CTX->use_write_size_cache()) { auto err = CTX->write_size_cache()->flush(file->path(), true).first; @@ -1940,13 +2095,18 @@ gkfs_get_file_list(const std::string& path) { void* gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd, off_t offset) { - void* ptr = malloc(length); + void* ptr = calloc(1, length); if(ptr == nullptr) { return MAP_FAILED; } // store info on mmap_set - mmap_set.insert(std::make_tuple(ptr, fd, length, offset)); - gkfs::syscall::gkfs_pread(fd, ptr, length, offset); + mmap_set.insert(std::make_tuple(ptr, fd, length, offset, prot)); + auto ret = gkfs::syscall::gkfs_pread(fd, ptr, length, offset); + if(ret == -1) { + free(ptr); + mmap_set.erase(std::make_tuple(ptr, fd, length, offset, prot)); + return MAP_FAILED; + } return ptr; } @@ -1960,7 +2120,10 @@ gkfs_msync(void* addr, size_t length, int flags) { if(std::get<0>(tuple) == addr) { int fd = std::get<1>(tuple); off_t offset = std::get<3>(tuple); - gkfs::syscall::gkfs_pwrite(fd, addr, length, offset); + int prot = std::get<4>(tuple); + if(prot & PROT_WRITE) { + gkfs::syscall::gkfs_pwrite(fd, addr, length, offset); + } return 0; } } @@ -1981,7 +2144,7 @@ gkfs_munmap(void* addr, size_t length) { auto it = std::find_if( mmap_set.begin(), mmap_set.end(), - [&addr](const std::tuple& t) { + [&addr](const std::tuple& t) { return std::get<0>(t) == addr; }); if(it != mmap_set.end()) { diff --git a/src/client/open_file_map.cpp b/src/client/open_file_map.cpp index 8ccec5f03a73dc93525cdb5bed83647643b505cd..d9af716c5560c4f70ac0a3d683a06d9596a20ba2 100644 --- a/src/client/open_file_map.cpp +++ b/src/client/open_file_map.cpp @@ -52,7 +52,7 @@ using namespace std; namespace gkfs::filemap { OpenFile::OpenFile(const string& path, const int flags, FileType type) - : type_(type), path_(path) { + : type_(type), path_(path), mode_(0) { // set flags to OpenFile if(flags & O_CREAT) flags_[gkfs::utils::to_underlying(OpenFile_flags::creat)] = true; @@ -111,6 +111,16 @@ OpenFile::type() const { return type_; } +mode_t +OpenFile::mode() const { + return mode_; +} + +void +OpenFile::mode(mode_t mode_) { + OpenFile::mode_ = mode_; +} + // OpenFileMap starts here shared_ptr diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 39b4eac0806cc328d4eb13cb482ddb336a81e2be..a5a883ffa0e0b259a45c92fd455b7056348390e7 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -357,9 +357,10 @@ metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, pair update_file_size(const std::string& path, size_t count, off64_t offset, - bool is_append) { - LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'", - __func__, path, count, offset, is_append); + bool is_append, bool clear_inline_flag) { + LOG(DEBUG, + "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}', clear_inline_flag: '{}'", + __func__, path, count, offset, is_append, clear_inline_flag); pair ret_offset; auto num_replicas = CTX->get_replicas(); if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) { @@ -367,7 +368,8 @@ update_file_size(const std::string& path, size_t count, off64_t offset, path, count, offset, is_append); } else { ret_offset = gkfs::rpc::forward_update_metadentry_size( - path, count, offset, is_append, num_replicas); + path, count, offset, is_append, clear_inline_flag, + num_replicas); } return ret_offset; } diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 6bd9203ea82d7ec4b06110ff424c3441ab828446..5f4be34badd4bc1ee8bf48a5dc5e270d3848d713 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -93,6 +93,47 @@ forward_create(const std::string& path, const mode_t mode, const int copy) { } } +/** + * Send an RPC for a create request and write data + * @param path + * @param mode + * @param data + * @param count + * @param copy Number of replica to create + * @return error code + */ +int +forward_create_write_inline(const std::string& path, const mode_t mode, + const std::string& data, const uint64_t count, + const int copy) { + if(gkfs::config::proxy::fwd_create && CTX->use_proxy()) { + LOG(WARNING, "{} was called even though proxy should be used!", + __func__); + } + auto endp = CTX->hosts().at( + CTX->distributor()->locate_file_metadata(path, copy)); + + try { + LOG(DEBUG, "Sending RPC ..."); + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we + // can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + auto out = ld_network_service + ->post( + endp, path, mode, data, count) + .get() + .at(0); + LOG(DEBUG, "Got response success: {}", out.err()); + + return out.err() ? out.err() : 0; + } catch(const std::exception& ex) { + LOG(ERROR, "while getting rpc output"); + return EBUSY; + } +} + /** * Send an RPC for a stat request * @param path @@ -563,6 +604,7 @@ forward_rename(const string& oldpath, const string& newpath, pair forward_update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, + const bool clear_inline_flag, const int num_copies) { if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) { LOG(WARNING, "{} was called even though proxy should be used!", @@ -583,7 +625,8 @@ forward_update_metadentry_size(const string& path, const size_t size, handles.emplace_back( ld_network_service->post( endp, path, size, offset, - bool_to_merc_bool(append_flag))); + bool_to_merc_bool(append_flag), + bool_to_merc_bool(clear_inline_flag))); } catch(const std::exception& ex) { LOG(ERROR, "while getting rpc output"); return make_pair(EBUSY, 0); @@ -1152,4 +1195,83 @@ forward_mk_symlink(const std::string& path, const std::string& target_path) { #endif + +std::pair +forward_write_inline(const std::string& path, const void* buf, off64_t offset, + size_t write_size, bool append_flag) { + + auto endp = + CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0)); + + try { + LOG(DEBUG, "Sending RPC (Inline Write) path: '{}' size: '{}' ...", path, + write_size); + + // Construct std::string from buffer for serialization + // Note: This copies data, which is acceptable for small inline writes + std::string data(static_cast(buf), write_size); + // Base64 encode the data to ensure it is safe for RPC transmission (no + // null bytes) + std::string encoded_data = gkfs::rpc::base64_encode(data); + + auto out = ld_network_service + ->post( + endp, path, offset, encoded_data, + encoded_data.size(), + bool_to_merc_bool(append_flag)) + .get() + .at(0); + + if(out.err() != 0) { + LOG(ERROR, "{}() Daemon reported error: {}", __func__, out.err()); + return {out.err(), 0}; + } + + return {0, out.ret_offset()}; + + } catch(const std::exception& ex) { + LOG(ERROR, "{}() Exception: '{}'", __func__, ex.what()); + return {EBUSY, 0}; + } +} + +std::pair +forward_read_inline(const std::string& path, void* buf, off64_t offset, + size_t read_size) { + + auto endp = + CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0)); + + try { + LOG(DEBUG, "Sending RPC (Inline Read) path: '{}' size: '{}' ...", path, + read_size); + + auto out = ld_network_service + ->post( + endp, path, offset, read_size) + .get() + .at(0); + + if(out.err() != 0) { + LOG(DEBUG, "{}() Daemon reported error: {}", __func__, out.err()); + return {out.err(), 0}; + } + + // Copy data from string response to user buffer + if(out.count() > 0 && !out.data().empty()) { + // Decode Base64 string back to binary + std::string decoded_data = + gkfs::rpc::base64_decode_to_string(out.data()); + std::memcpy(buf, decoded_data.c_str(), decoded_data.size()); + return {0, decoded_data.size()}; + } + + return {0, 0}; + + } catch(const std::exception& ex) { + LOG(ERROR, "{}() Exception: '{}'", __func__, ex.what()); + return {EBUSY, 0}; + } +} + } // namespace gkfs::rpc diff --git a/src/client/rpc/rpc_types.cpp b/src/client/rpc/rpc_types.cpp index 1f34d0d5ab098551a7ef90cbad92d68b0cc38b70..5b5f89193545890fef93be03fd4bf1cb9946d481 100644 --- a/src/client/rpc/rpc_types.cpp +++ b/src/client/rpc/rpc_types.cpp @@ -79,6 +79,10 @@ hermes::detail::register_user_request_types(uint32_t provider_id) { provider_id); (void) registered_requests().add( provider_id); + (void) registered_requests().add( + provider_id); + (void) registered_requests().add( + provider_id); } else { (void) registered_requests().add( provider_id); diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 6f746d6eb1fbd08114c3049b31021a0da736e077..b6d4702c7da80c49a2051c50941759cd70c4f4b2 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -102,6 +102,7 @@ target_sources(metadata target_link_libraries(metadata PRIVATE fmt::fmt + rpc_utils ) add_library(path_util STATIC) diff --git a/src/common/metadata.cpp b/src/common/metadata.cpp index e4f751e48fd93c4b46702d4a4974a5541b86b1fd..ad372493b69542c29dba065fd14c7788485c778a 100644 --- a/src/common/metadata.cpp +++ b/src/common/metadata.cpp @@ -37,9 +37,11 @@ */ #include +#include #include #include +#include extern "C" { #include @@ -110,82 +112,103 @@ Metadata::Metadata(const mode_t mode, const std::string& target_path) assert(target_path_.empty() || target_path_[0] == '/'); init_time(); } +#endif +#ifdef HAS_RENAME +#ifdef HAS_SYMLINKS +Metadata::Metadata(const mode_t mode, const std::string& target_path, + const std::string& rename_path) + : mode_(mode), link_count_(0), size_(0), blocks_(0), + target_path_(target_path), rename_path_(rename_path) { + assert(S_ISLNK(mode_) || S_ISDIR(mode_) || S_ISREG(mode_)); + assert(target_path_.empty() || S_ISLNK(mode_)); + assert(target_path_.empty() || target_path_[0] == '/'); + init_time(); +} +#endif #endif Metadata::Metadata(const std::string& binary_str) { size_t read = 0; + const char* ptr = binary_str.data(); + const char* end = ptr + binary_str.size(); - auto ptr = binary_str.data(); + // mode mode_ = static_cast(std::stoul(ptr, &read)); - // we read something assert(read > 0); ptr += read; + assert(ptr < end && *ptr == MSP); + ++ptr; // skip separator - // last parsed char is the separator char - assert(*ptr == MSP); - // yet we have some character to parse - - size_ = std::stol(++ptr, &read); + // size + size_ = std::stol(ptr, &read); assert(read > 0); ptr += read; - // The order is important. don't change. + // optional fields in fixed order if constexpr(gkfs::config::metadata::use_atime) { - assert(*ptr == MSP); - atime_ = static_cast(std::stol(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + atime_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } if constexpr(gkfs::config::metadata::use_mtime) { - assert(*ptr == MSP); - mtime_ = static_cast(std::stol(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + mtime_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } if constexpr(gkfs::config::metadata::use_ctime) { - assert(*ptr == MSP); - ctime_ = static_cast(std::stol(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + ctime_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } if constexpr(gkfs::config::metadata::use_link_cnt) { - assert(*ptr == MSP); - link_count_ = static_cast(std::stoul(++ptr, &read)); + assert(ptr < end && *ptr == MSP); + ++ptr; + link_count_ = static_cast(std::stoul(ptr, &read)); assert(read > 0); ptr += read; } - if constexpr(gkfs::config::metadata::use_blocks) { // last one will not - // encounter a - // delimiter anymore - assert(*ptr == MSP); - blocks_ = static_cast(std::stol(++ptr, &read)); + if constexpr(gkfs::config::metadata::use_blocks) { + assert(ptr < end && *ptr == MSP); + ++ptr; + blocks_ = static_cast(std::stol(ptr, &read)); assert(read > 0); ptr += read; } + // symlink target #ifdef HAS_SYMLINKS - // Read target_path - assert(*ptr == MSP); - target_path_ = ++ptr; - // target_path should be there only if this is a link - ptr += target_path_.size(); + if(ptr < end && *ptr == MSP) { + ++ptr; // skip separator + const char* start = ptr; + const char* sep = std::find(ptr, end, MSP); + target_path_.assign(start, sep - start); + ptr = sep; + } #ifdef HAS_RENAME - // Read rename target, we had captured '|' so we need to recover it - if(!target_path_.empty()) { - auto index = target_path_.find_last_of(MSP); - auto size = target_path_.size(); - target_path_ = target_path_.substr(0, index); - ptr -= (size - index); + if(ptr < end && *ptr == MSP) { + ++ptr; // skip separator + const char* start = ptr; + const char* sep = std::find(ptr, end, MSP); + rename_path_.assign(start, sep - start); + ptr = sep; } - assert(*ptr == MSP); - rename_path_ = ++ptr; - ptr += rename_path_.size(); #endif // HAS_RENAME #endif // HAS_SYMLINKS - // we consumed all the binary string - assert(*ptr == '\0'); + // inline data + if(gkfs::config::metadata::use_inline_data && ptr < end) { + if(*ptr == MSP) + ++ptr; // optional separator before payload + inline_data_ = + gkfs::rpc::base64_decode_to_string(std::string(ptr, end - ptr)); + } } std::string @@ -225,23 +248,24 @@ Metadata::serialize() const { #endif // HAS_RENAME #endif // HAS_SYMLINKS + if(gkfs::config::metadata::use_inline_data && !inline_data_.empty()) { + s += MSP; // Separator + s += gkfs::rpc::base64_encode(inline_data_); + } return s; } +// Getter/Setter implementations (unchanged) ... + void Metadata::update_atime_now() { - auto time = std::time(nullptr); - atime_ = time; + atime_ = std::time(nullptr); } - void Metadata::update_mtime_now() { - auto time = std::time(nullptr); - mtime_ = time; + mtime_ = std::time(nullptr); } -//-------------------------------------------- GETTER/SETTER - time_t Metadata::atime() const { return atime_; @@ -249,7 +273,7 @@ Metadata::atime() const { void Metadata::atime(time_t atime) { - Metadata::atime_ = atime; + atime_ = atime; } time_t @@ -259,7 +283,7 @@ Metadata::mtime() const { void Metadata::mtime(time_t mtime) { - Metadata::mtime_ = mtime; + mtime_ = mtime; } time_t @@ -269,7 +293,7 @@ Metadata::ctime() const { void Metadata::ctime(time_t ctime) { - Metadata::ctime_ = ctime; + ctime_ = ctime; } mode_t @@ -279,7 +303,7 @@ Metadata::mode() const { void Metadata::mode(mode_t mode) { - Metadata::mode_ = mode; + mode_ = mode; } nlink_t @@ -299,7 +323,7 @@ Metadata::size() const { void Metadata::size(size_t size) { - Metadata::size_ = size; + size_ = size; } blkcnt_t @@ -309,7 +333,7 @@ Metadata::blocks() const { void Metadata::blocks(blkcnt_t blocks) { - Metadata::blocks_ = blocks; + blocks_ = blocks; } #ifdef HAS_SYMLINKS @@ -343,4 +367,13 @@ Metadata::rename_path(const std::string& rename_path) { #endif // HAS_RENAME #endif // HAS_SYMLINKS +std::string +Metadata::inline_data() const { + return inline_data_; +} + +void +Metadata::inline_data(const std::string& data) { + inline_data_ = data; +} } // namespace gkfs::metadata diff --git a/src/common/rpc/rpc_util.cpp b/src/common/rpc/rpc_util.cpp index 30ed2fc56d593ffee0b21dbe582f04bef5074277..ee11bf10f69ab64cbfa7565c70103efe3ff8cba3 100644 --- a/src/common/rpc/rpc_util.cpp +++ b/src/common/rpc/rpc_util.cpp @@ -201,8 +201,9 @@ base64_decode(const std::string& encoded) { // Remove the padding bits buffer >>= padding; padding = 0; - data.push_back(static_cast((buffer >> 8) & 0xFF)); - count++; + // The previous code incorrectly extracted a byte here. + // Since padding < 8, we don't have a full byte left, so we just discard + // the padding. } if(count == 0 || padding % 8 != 0) @@ -222,4 +223,16 @@ decompress_bitset(const std::string& compressedString) { } +std::string +base64_encode(const std::string& data) { + std::vector vec(data.begin(), data.end()); + return base64_encode(vec); +} + +std::string +base64_decode_to_string(const std::string& encoded) { + auto vec = base64_decode(encoded); + return std::string(vec.begin(), vec.end()); +} + } // namespace gkfs::rpc \ No newline at end of file diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index b03e07bad2c5fb5e93116510951894cb2732f253..6ef511de2c32c504aa3d5db6cced9478a8a38d51 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -287,6 +287,8 @@ ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_t chunk_start) { auto chunk_dir = absolute(get_chunks_dir(file_path)); + if(!fs::exists(chunk_dir)) + return; const fs::directory_iterator end; auto err_flag = false; for(fs::directory_iterator chunk_file(chunk_dir); chunk_file != end; @@ -319,6 +321,8 @@ ChunkStorage::truncate_chunk_file(const string& file_path, static_cast(length) <= chunksize_); auto ret = truncate(chunk_path.c_str(), length); if(ret == -1) { + if(errno == ENOENT) + return; auto err_str = fmt::format( "Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); diff --git a/src/daemon/backend/metadata/db.cpp b/src/daemon/backend/metadata/db.cpp index 6483f74a5942d1bb5ed47a71f49f824e0bc81807..cee663bc3ade1c7dc149af5859ed8676db079831 100644 --- a/src/daemon/backend/metadata/db.cpp +++ b/src/daemon/backend/metadata/db.cpp @@ -148,8 +148,8 @@ MetadataDB::update(const std::string& old_key, const std::string& new_key, off_t MetadataDB::increase_size(const std::string& key, size_t io_size, off_t offset, - bool append) { - return backend_->increase_size(key, io_size, offset, append); + bool append, bool clear_inline) { + return backend_->increase_size(key, io_size, offset, append, clear_inline); } void diff --git a/src/daemon/backend/metadata/merge.cpp b/src/daemon/backend/metadata/merge.cpp index bf8d586923863436ce9f5df6425931888d3f23b9..74e013de1a024a418dea46d9e53ec42f7853b76b 100644 --- a/src/daemon/backend/metadata/merge.cpp +++ b/src/daemon/backend/metadata/merge.cpp @@ -70,13 +70,16 @@ MergeOperand::get_params(const rdb::Slice& serialized_op) { return {serialized_op.data() + 2, serialized_op.size() - 2}; } -IncreaseSizeOperand::IncreaseSizeOperand(const size_t size) - : size_(size), merge_id_(0), append_(false) {} +IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, + const bool clear_inline) + : size_(size), merge_id_(0), append_(false), clear_inline_(clear_inline) {} IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const uint16_t merge_id, - const bool append) - : size_(size), merge_id_(merge_id), append_(append) {} + const bool append, + const bool clear_inline) + : size_(size), merge_id_(merge_id), append_(append), + clear_inline_(clear_inline) {} IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) { size_t read = 0; @@ -86,13 +89,25 @@ IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) { serialized_op[read] == serialize_end) { merge_id_ = 0; append_ = false; + clear_inline_ = false; return; } assert(serialized_op[read] == serialize_sep); // Parse merge id + size_t read_merge_id = 0; merge_id_ = static_cast( - std::stoul(serialized_op.data() + read + 1, nullptr)); + std::stoul(serialized_op.data() + read + 1, &read_merge_id)); append_ = true; + + // Check for clear_inline + size_t next_pos = read + 1 + read_merge_id; + if(next_pos < serialized_op.size() && + serialized_op[next_pos] == serialize_sep) { + // we have clear_inline + clear_inline_ = (serialized_op[next_pos + 1] == '1'); + } else { + clear_inline_ = false; + } } OperandID @@ -103,9 +118,9 @@ IncreaseSizeOperand::id() const { string IncreaseSizeOperand::serialize_params() const { // serialize_end avoids rogue characters in the serialized string - if(append_) - return fmt::format("{}{}{}{}", size_, serialize_sep, merge_id_, - serialize_end); + if(append_ || clear_inline_) + return fmt::format("{}{}{}{}{}{}", size_, serialize_sep, merge_id_, + serialize_sep, clear_inline_ ? 1 : 0, serialize_end); else { return fmt::format("{}{}", size_, serialize_end); } @@ -167,6 +182,43 @@ UpdateTimeOperand::serialize_params() const { return ::to_string(mtime_); } + +WriteInlineOperand::WriteInlineOperand(const size_t offset, + const std::string& data) + : offset_(offset), data_(data) {} + +WriteInlineOperand::WriteInlineOperand(const rdb::Slice& serialized_op) { + // We expect format: "offset:data" + // Since data is binary, we cannot rely on simple string conversion if nulls + // are present, but we can find the first separator because offset is a + // number. + + // Convert to string view or string to find the separator + std::string_view s(serialized_op.data(), serialized_op.size()); + auto pos = s.find(serialize_sep); + + if(pos == std::string::npos) { + // Fallback/Error case + offset_ = 0; + } else { + // Parse offset + offset_ = std::stoul(std::string(s.substr(0, pos))); + // Everything after separator is data + data_ = std::string(s.substr(pos + 1)); + } +} + +OperandID +WriteInlineOperand::id() const { + return OperandID::write_inline; +} + +std::string +WriteInlineOperand::serialize_params() const { + return fmt::format("{}{}{}", offset_, serialize_sep, data_); +} + + /** * @internal * Merges all operands in chronological order for the same key. @@ -224,15 +276,58 @@ MetadataMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, } else { fsize = ::max(op.size(), fsize); } + + // Handle clear_inline + if(op.clear_inline() && gkfs::config::metadata::use_inline_data) { + md.inline_data(""); + } + } else if(operand_id == OperandID::decrease_size) { auto op = DecreaseSizeOperand(parameters); assert(op.size() <= fsize); // we assume no concurrency here fsize = op.size(); + + // We need to handle inline_data + if(gkfs::config::metadata::use_inline_data) { + auto inline_data = md.inline_data(); + if(inline_data.size() > fsize) { + inline_data.resize(fsize); + md.inline_data(inline_data); + } + } } else if(operand_id == OperandID::create) { continue; } else if(operand_id == OperandID::update_time) { auto op = UpdateTimeOperand(parameters); md.mtime(op.mtime()); + } else if(operand_id == OperandID::write_inline) { + + auto op = WriteInlineOperand(parameters); + + // 1. Get a copy of the string (not a reference, as md.inline_data() + // returns by value) + std::string current_data = md.inline_data(); + + // 2. Use the variables from the operand class + size_t offset = op.offset(); + const std::string& data_buffer = op.data(); + size_t data_len = data_buffer.size(); + + // 3. Resize if writing beyond current inline data size + if(offset + data_len > current_data.size()) { + current_data.resize(offset + data_len, '\0'); + } + + // 4. Overwrite data + current_data.replace(offset, data_len, data_buffer); + + // 5. Store updated string back into Metadata object + md.inline_data(current_data); + + // 6. Update file size if we wrote beyond previous EOF + if(current_data.size() > fsize) { + fsize = current_data.size(); + } } else { throw ::runtime_error("Unrecognized merge operand ID: " + (char) operand_id); diff --git a/src/daemon/backend/metadata/parallax_backend.cpp b/src/daemon/backend/metadata/parallax_backend.cpp index a21c2ddabc2ca8f50f073cdf2964f8cc1da564ea..215781989571be71dfa5b42de6dc4ed0f5306786 100644 --- a/src/daemon/backend/metadata/parallax_backend.cpp +++ b/src/daemon/backend/metadata/parallax_backend.cpp @@ -343,7 +343,8 @@ ParallaxBackend::update_impl(const std::string& old_key, */ off_t ParallaxBackend::increase_size_impl(const std::string& key, size_t io_size, - off_t offset, bool append) { + off_t offset, bool append, + bool clear_inline) { lock_guard lock_guard(parallax_mutex_); off_t out_offset = -1; auto value = get(key); diff --git a/src/daemon/backend/metadata/rocksdb_backend.cpp b/src/daemon/backend/metadata/rocksdb_backend.cpp index 3d84fefbeb08a032cfcc45b5367fcbaecce243e5..423e5f641e16f59780501bb2f819a2db510e4fbf 100644 --- a/src/daemon/backend/metadata/rocksdb_backend.cpp +++ b/src/daemon/backend/metadata/rocksdb_backend.cpp @@ -223,12 +223,13 @@ RocksDBBackend::update_impl(const std::string& old_key, */ off_t RocksDBBackend::increase_size_impl(const std::string& key, size_t io_size, - off_t offset, bool append) { + off_t offset, bool append, + bool clear_inline) { off_t out_offset = -1; if(append) { auto merge_id = gkfs::metadata::gen_unique_id(key); // no offset needed because new size is current file size + io_size - auto uop = IncreaseSizeOperand(io_size, merge_id, append); + auto uop = IncreaseSizeOperand(io_size, merge_id, append, clear_inline); auto s = db_->Merge(write_opts_, key, uop.serialize()); if(!s.ok()) { throw_status_excpt(s); @@ -249,7 +250,7 @@ RocksDBBackend::increase_size_impl(const std::string& key, size_t io_size, } else { // In the standard case we simply add the I/O request size to the // offset. - auto uop = IncreaseSizeOperand(offset + io_size); + auto uop = IncreaseSizeOperand(offset + io_size, clear_inline); auto s = db_->Merge(write_opts_, key, uop.serialize()); if(!s.ok()) { throw_status_excpt(s); diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 25b970b586e62bbc29d13f86fb032ae66c2714fc..9872376dc0df8eea8327f24939e71385faf89f56 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -201,6 +201,11 @@ register_server_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::malleable::rpc::tag::migrate_metadata, rpc_migrate_metadata_in_t, rpc_err_out_t, rpc_srv_migrate_metadata); + MARGO_REGISTER(mid, gkfs::rpc::tag::write_data_inline, + rpc_write_inline_in_t, rpc_write_inline_out_t, + rpc_srv_write_data_inline); + MARGO_REGISTER(mid, gkfs::rpc::tag::read_data_inline, rpc_read_inline_in_t, + rpc_read_inline_out_t, rpc_srv_read_data_inline); } /** diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 42b66114f69de073617f7931e377730e31730171..91fb6200723a00f0d0e63cab2be69b57bebc3c61 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -51,6 +51,7 @@ #include #include +#include #include #include using namespace std; @@ -444,8 +445,32 @@ rpc_srv_update_metadentry_size(hg_handle_t handle) { in.path, in.size, in.offset, in.append); try { + bool clear_inline = false; + if(in.append == HG_TRUE) { + auto md = gkfs::metadata::get(in.path); + size_t current_size = md.size(); + size_t new_size = current_size + in.size; + if(new_size > gkfs::config::metadata::inline_data_size && + current_size <= gkfs::config::metadata::inline_data_size) { + // Migration needed + std::string inline_data = md.inline_data(); + if(!inline_data.empty()) { + // Write to chunk 0 + GKFS_DATA->storage()->write_chunk(in.path, 0, + inline_data.c_str(), + inline_data.size(), 0); + clear_inline = true; + } + } + } + + if(in.clear_inline) { + clear_inline = true; + } + out.ret_offset = gkfs::metadata::update_size( - in.path, in.size, in.offset, (in.append == HG_TRUE)); + in.path, in.size, in.offset, (in.append == HG_TRUE), + clear_inline); out.err = 0; } catch(const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, @@ -1028,6 +1053,230 @@ rpc_srv_rename(hg_handle_t handle) { #endif // HAS_RENAME +/** + * @brief Serves a write request for inline data (stored in RocksDB). + * @return Mercury error code + */ +hg_return_t +rpc_srv_write_data_inline(hg_handle_t handle) { + rpc_write_inline_in_t in{}; + rpc_write_inline_out_t out{}; + out.err = EIO; + out.ret_offset = 0; + out.io_size = 0; + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + + GKFS_DATA->spdlogger()->debug( + "{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, + in.path, in.count, in.offset, in.append); + // Decode the data + auto decoded_data = gkfs::rpc::base64_decode_to_string(in.data); + + try { + auto md = gkfs::metadata::get(in.path); + auto current_size = md.size(); + if(in.append) { + in.offset = current_size; + } + + // 1. Check limits + if(in.offset + decoded_data.size() > + gkfs::config::metadata::inline_data_size || + md.size() > gkfs::config::metadata::inline_data_size || + (md.size() > 0 && md.inline_data().empty())) { + out.err = EFBIG; // File too large for inline or already in chunks + } else { + // 2. Prepare data string from input buffer + // Note: in.data is a hg_const_string_t (char*) + // decoded_data already contains the binary data + + // 3. Persist to RocksDB + std::string current_data = md.inline_data(); + + // Extend if necessary (Sparse write support) + if(current_data.size() < (in.offset + decoded_data.size())) { + current_data.resize(in.offset + decoded_data.size(), '\0'); + } + + // Apply write + current_data.replace(in.offset, decoded_data.size(), decoded_data); + md.inline_data(current_data); + + // Update size if file grew + if(current_data.size() > md.size()) { + md.size(current_data.size()); + } + + gkfs::metadata::update(in.path, md); + out.err = 0; + out.ret_offset = in.offset; + out.io_size = decoded_data.size(); + } + } catch(const gkfs::metadata::NotFoundException& e) { + out.err = ENOENT; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to write inline data: '{}'", + __func__, e.what()); + out.err = EIO; + } + + auto hret = margo_respond(handle, &out); + if(hret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + } + + margo_free_input(handle, &in); + margo_destroy(handle); + return HG_SUCCESS; +} + +/** + * @brief Serves a request to create a file and write inline data in a single + * RPC. + * @return Mercury error code + */ +hg_return_t +rpc_srv_create_write_inline(hg_handle_t handle) { + rpc_create_write_inline_in_t in{}; + rpc_create_write_inline_out_t out{}; + out.err = EIO; + out.io_size = 0; + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + + GKFS_DATA->spdlogger()->debug("{}() path: '{}', mode: '{}', size: '{}'", + __func__, in.path, in.mode, in.count); + + try { + // 1. Create Metadentry + gkfs::metadata::Metadata md(in.mode); + gkfs::metadata::create(in.path, md); + + // 2. Decode Data + auto decoded_data = gkfs::rpc::base64_decode_to_string(in.data); + + // 3. Write Inline Data + if(decoded_data.size() > gkfs::config::metadata::inline_data_size) { + out.err = EFBIG; + } else { + md.inline_data(decoded_data); + md.size(decoded_data.size()); + gkfs::metadata::update(in.path, md); + out.err = 0; + out.io_size = decoded_data.size(); + } + + } catch(const gkfs::metadata::ExistsException& e) { + out.err = EEXIST; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to create/write inline: '{}'", __func__, e.what()); + out.err = -1; + } + + auto hret = margo_respond(handle, &out); + if(hret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + } + + margo_free_input(handle, &in); + margo_destroy(handle); + return HG_SUCCESS; +} + +/** + * @brief Serves a read request for inline data (stored in RocksDB). + * @return Mercury error code + */ +hg_return_t +rpc_srv_read_data_inline(hg_handle_t handle) { + rpc_read_inline_in_t in{}; + rpc_read_inline_out_t out{}; + out.err = EIO; + out.count = 0; + out.data = nullptr; // Important initialization + + auto ret = margo_get_input(handle, &in); + if(ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to retrieve input from handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + + GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}'", + __func__, in.path, in.count, in.offset); + + std::string data_buffer; // Keep scope alive until respond + + try { + auto md = gkfs::metadata::get(in.path); + + // Check if the file actually has inline data or if it's stored on disk + // We assume here that if the size is larger than the limit, it's NOT + // inline and the client should have checked, but we double check. + if(md.size() > gkfs::config::metadata::inline_data_size) { + out.err = EAGAIN; // Signal client to use Chunk path + } else if(md.size() > 0 && md.inline_data().empty()) { + out.err = EAGAIN; // Signal client to use Chunk path + } else { + const std::string& stored_data = md.inline_data(); + + if(in.offset >= stored_data.size()) { + // EOF + out.count = 0; + out.err = 0; + } else { + size_t available = stored_data.size() - in.offset; + size_t read_amt = + std::min(static_cast(in.count), available); + + // Substring to return + data_buffer = stored_data.substr(in.offset, read_amt); + + // Encode to Base64 to ensure it is safe for RPC transmission + // (no null bytes) + std::string encoded_data = + gkfs::rpc::base64_encode(data_buffer); + + // We reuse data_buffer to hold the encoded string to ensure it + // outlives the response + data_buffer = encoded_data; + + out.data = data_buffer.c_str(); + out.count = data_buffer.size(); + out.err = 0; + } + } + } catch(const gkfs::metadata::NotFoundException& e) { + out.err = ENOENT; + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Failed to read inline data: '{}'", + __func__, e.what()); + out.err = EIO; + } + + auto hret = margo_respond(handle, &out); + if(hret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + } + + margo_free_input(handle, &in); + margo_destroy(handle); + return HG_SUCCESS; +} + + } // namespace DEFINE_MARGO_RPC_HANDLER(rpc_srv_create) @@ -1056,4 +1305,10 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink) #endif #ifdef HAS_RENAME DEFINE_MARGO_RPC_HANDLER(rpc_srv_rename) -#endif \ No newline at end of file +#endif + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_write_data_inline) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_create_write_inline) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data_inline) \ No newline at end of file diff --git a/src/daemon/ops/metadentry.cpp b/src/daemon/ops/metadentry.cpp index 1c53a6da3f3e11e822afff62e4d12ef191aefa88..0f1de7aef65ead10cdc8e3f91880b4abc7f9741f 100644 --- a/src/daemon/ops/metadentry.cpp +++ b/src/daemon/ops/metadentry.cpp @@ -114,8 +114,10 @@ update(const string& path, Metadata& md) { * @endinternal */ off_t -update_size(const string& path, size_t io_size, off64_t offset, bool append) { - return GKFS_DATA->mdb()->increase_size(path, io_size, offset, append); +update_size(const string& path, size_t io_size, off64_t offset, bool append, + bool clear_inline) { + return GKFS_DATA->mdb()->increase_size(path, io_size, offset, append, + clear_inline); } void diff --git a/tests/integration/data/test_inline_1rpc.py b/tests/integration/data/test_inline_1rpc.py new file mode 100644 index 0000000000000000000000000000000000000000..53db1dcd00e21ec7c08a29b38edd0f3b692cc707 --- /dev/null +++ b/tests/integration/data/test_inline_1rpc.py @@ -0,0 +1,116 @@ +import pytest +import os +import stat +from harness.logger import logger + +def test_inline_1rpc_optimized(gkfs_daemon, gkfs_client): + """Test 1-RPC Create+Write optimization""" + file = gkfs_daemon.mountdir / "file_1rpc_opt" + # Open (O_CREAT) + Write (small) in one command to ensure single process and trigger optimization + buf = 'A' * 100 + # gkfs.io write --creat + ret = gkfs_client.run('write', file, buf, len(buf), '--creat') + assert ret.retval == len(buf) + + # Close + # assert ret.retval == len(buf) + # Actually gkfs_client.close takes 'fd' in some harnesses or 'file' if it manages map. + # Looking at previous test, it doesn't show close calls explicitly often, or uses context managers? + # Harness `gkfs_client` usually has `open` returning an object or struct. + # Let's check `test_inline_data.py` again. It uses `gkfs_client.open` returning `ret`. + # It does NOT show close. Implicit close on harness cleanup or next open? + # Explicit close is `gkfs_client.close(fd)`. + # But `gkfs_client` in `test_inline_data.py` returns a wrapper with `retval`. + # I'll check `harness/client.py` or just assume I need to pass the fd returned by open. + # Re-reading `test_inline_data.py`: it doesn't call close. + # I will call close logic if possible to test the close-fallback, but for this test case (write happened), close does nothing special. + + # Verify content + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == len(buf) + + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == len(buf) + + # Verify content + ret = gkfs_client.read(file, len(buf)) + assert ret.retval == len(buf) + assert ret.buf == buf.encode() + +def test_inline_1rpc_fallback_close(gkfs_daemon, gkfs_client): + """Test 1-RPC optimization fallback: Open(O_CREAT) -> Close (create empty file)""" + file = gkfs_daemon.mountdir / "file_1rpc_empty" + + # gkfs.io open + # O_CREAT = 64 (0o100) + ret = gkfs_client.open(file, os.O_CREAT | os.O_WRONLY, 0o644) + assert ret.retval != -1 + + # Verify file exists and is empty + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 0 + +def test_inline_1rpc_fallback_large_write(gkfs_daemon, gkfs_client): + """Test 1-RPC optimization fallback: Open(O_CREAT) -> Write(large) (explicit create)""" + file = gkfs_daemon.mountdir / "file_1rpc_large" + + # Write larger than inline size (assuming 4096 default) + # gkfs.io write --creat + size = 10000 + buf = 'B' * size + ret = gkfs_client.run('write', file, buf, size, '--creat') + # assert ret.retval == size # gkfs.io write output might be limited by how it prints/returns? + # write command returns written bytes in 'retval' + assert ret.retval == size + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == size + + # Verify content + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == size + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, size) + assert ret.retval == size + assert ret.buf == buf.encode() + +def test_inline_1rpc_no_opt_o_excl(gkfs_daemon, gkfs_client): + """Test O_EXCL disables optimization""" + file = gkfs_daemon.mountdir / "file_no_opt_excl" + + # Open O_CREAT | O_EXCL (Optimization should be disabled) + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY | os.O_EXCL, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # If optimization was disabled, file should exist immediately. + # But validation is hard from client side without out-of-band checks. + # We mainly verify it works correctly. + + buf = b'A' * 100 + ret = gkfs_client.write(file, buf, len(buf)) + assert ret.retval == len(buf) + + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 100 diff --git a/tests/integration/data/test_inline_data.py b/tests/integration/data/test_inline_data.py new file mode 100644 index 0000000000000000000000000000000000000000..dacc0d35820858f600b50a95b05cb81f3d238de3 --- /dev/null +++ b/tests/integration/data/test_inline_data.py @@ -0,0 +1,200 @@ +import pytest +import os +import stat +from harness.logger import logger + +def test_inline_append(gkfs_daemon, gkfs_client): + """Test inline data append operations""" + file = gkfs_daemon.mountdir / "file_inline_append" + + # Open file + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write initial data (inline) + buf1 = 'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + ret = gkfs_client.open(file, + os.O_WRONLY | os.O_APPEND, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Append data (inline) + buf2 = 'B' * 100 + ret = gkfs_client.write(file, buf2, len(buf2), 1) # write with O_APPEND + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 200 + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 200) + assert ret.retval == 200 + assert ret.buf == (buf1 + buf2).encode() + +def test_inline_pwrite(gkfs_daemon, gkfs_client): + """Test inline data overwrite using pwrite""" + file = gkfs_daemon.mountdir / "file_inline_pwrite" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write initial data + buf1 = 'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Overwrite middle part + buf2 = 'B' * 50 + ret = gkfs_client.pwrite(file, buf2, len(buf2), 25) + assert ret.retval == len(buf2) + + # Verify size (should be same) + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 100 + + # Verify content + expected = b'A' * 25 + b'B' * 50 + b'A' * 25 + + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 100) + assert ret.retval == 100 + assert ret.buf == expected + +def test_inline_overflow_append(gkfs_daemon, gkfs_client): + """Test appending data that overflows inline limit (migration to chunks)""" + file = gkfs_daemon.mountdir / "file_inline_overflow" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write almost full inline data + buf1 = 'A' * 4000 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Reopen for append + ret = gkfs_client.open(file, + os.O_WRONLY | os.O_APPEND, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Append enough to overflow 4096 + buf2 = 'B' * 200 + ret = gkfs_client.write(file, buf2, len(buf2), 1) # Pass append flag + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 4200 + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 4200) + assert ret.retval == 4200 + assert ret.buf == (buf1 + buf2).encode() + +def test_inline_overflow_pwrite(gkfs_daemon, gkfs_client): + """Test pwrite that overflows inline limit (migration to chunks)""" + file = gkfs_daemon.mountdir / "file_inline_overflow_pwrite" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write small inline data + buf1 = 'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Pwrite far beyond inline limit (creating hole) + buf2 = 'B' * 100 + offset = 5000 + ret = gkfs_client.pwrite(file, buf2, len(buf2), offset) + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == offset + len(buf2) + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Read hole + data + # We expect A*100 + zeros + B*100 + # Total size = 5100 + + ret = gkfs_client.read(file, 5100) + assert ret.retval == 5100 + + read_buf = ret.buf + assert read_buf[0:100] == buf1.encode() + assert read_buf[100:offset] == b'\x00' * (offset - 100) + assert read_buf[offset:offset+100] == buf2.encode() + +def test_inline_overwrite_pwrite(gkfs_daemon, gkfs_client): + """Test pwrite at offset 0 that overflows inline limit (migration/clearing)""" + file = gkfs_daemon.mountdir / "file_inline_overwrite_pwrite" + + ret = gkfs_client.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + # Write small inline data + buf1 = 'A' * 100 + ret = gkfs_client.write(file, buf1, len(buf1)) + assert ret.retval == len(buf1) + + # Overwrite with large data at offset 0 + # This should force chunk write and clear inline data + buf2 = 'B' * 5000 + ret = gkfs_client.pwrite(file, buf2, len(buf2), 0) + assert ret.retval == len(buf2) + + # Verify size + ret = gkfs_client.stat(file) + assert ret.retval == 0 + assert ret.statbuf.st_size == 5000 + + # Verify content + ret = gkfs_client.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval != -1 + + ret = gkfs_client.read(file, 5000) + assert ret.retval == 5000 + assert ret.buf == buf2.encode() + + diff --git a/tests/integration/data/test_inline_null.py b/tests/integration/data/test_inline_null.py new file mode 100644 index 0000000000000000000000000000000000000000..ba4331ea238f85327dd0b1884221a0d452bbe55e --- /dev/null +++ b/tests/integration/data/test_inline_null.py @@ -0,0 +1,74 @@ +import pytest +import logging +from harness.logger import logger + +def test_inline_null_chars(gkfs_daemon, gkfs_shell, tmp_path): + print("DEBUG: Entered test_inline_null_chars") + """Test inline data with null characters to verify base64 encoding""" + file = gkfs_daemon.mountdir / "file_inline_null" + + # Create a python script file in the temporary directory + script_file = tmp_path / "write_nulls.py" + script_content = f""" +import os +with open('{file}', 'wb') as f: + buf = b'Start\\x00Middle\\x00End' + f.write(buf) +""" + script_file.write_text(script_content) + + # Execute the script using gkfs_shell (which uses LD_PRELOAD) + ret = gkfs_shell.script(f"python3 {script_file}") + assert ret.exit_code == 0 + + # Read back the data to verify + read_script_file = tmp_path / "read_nulls.py" + read_script_content = f""" +import os +with open('{file}', 'rb') as f: + data = f.read() + expected = b'Start\\x00Middle\\x00End' + if data != expected: + print(f"Mismatch: expected {{expected}}, got {{data}}") + exit(1) +""" + read_script_file.write_text(read_script_content) + + ret = gkfs_shell.script(f"python3 {read_script_file}") + assert ret.exit_code == 0 + + +def test_inline_null_chars_large(gkfs_daemon, gkfs_shell, tmp_path): + """Test larger inline data with null characters""" + file = gkfs_daemon.mountdir / "file_inline_null_large" + + # Create a python script file + script_file = tmp_path / "write_nulls_large.py" + script_content = f""" +import os +with open('{file}', 'wb') as f: + # 2000 bytes, mixed nulls and data + buf = b'\\x00' * 100 + b'Data' * 100 + b'\\x00' * 100 + f.write(buf) +""" + script_file.write_text(script_content) + + # Execute the script using gkfs_shell + ret = gkfs_shell.script(f"python3 {script_file}") + assert ret.exit_code == 0 + + # Read back the data to verify + read_script_file = tmp_path / "read_nulls_large.py" + read_script_content = f""" +import os +with open('{file}', 'rb') as f: + data = f.read() + expected = b'\\x00' * 100 + b'Data' * 100 + b'\\x00' * 100 + if data != expected: + print(f"Mismatch: expected len {{len(expected)}}, got {{len(data)}}") + exit(1) +""" + read_script_file.write_text(read_script_content) + + ret = gkfs_shell.script(f"python3 {read_script_file}") + assert ret.exit_code == 0 diff --git a/tests/integration/harness/gkfs.io/open.cpp b/tests/integration/harness/gkfs.io/open.cpp index 599d0ae4cc82350f2703b1afa47a7f16e25976c2..ef6a76f9c2aaab2a3859740e6288d36405c0fc8c 100644 --- a/tests/integration/harness/gkfs.io/open.cpp +++ b/tests/integration/harness/gkfs.io/open.cpp @@ -95,6 +95,9 @@ open_exec(const open_options& opts) { json out = open_output{fd, errno}; fmt::print("{}\n", out.dump(2)); + if(fd >= 0) { + ::close(fd); + } return; } diff --git a/tests/integration/harness/gkfs.io/write.cpp b/tests/integration/harness/gkfs.io/write.cpp index c74880235cc2f71b8820422faf8077924b96b7d0..fd1ff60f52a931ab5384bc07d6df8fa7d0851a7c 100644 --- a/tests/integration/harness/gkfs.io/write.cpp +++ b/tests/integration/harness/gkfs.io/write.cpp @@ -60,11 +60,16 @@ struct write_options { std::string data; ::size_t count; bool append{false}; + bool creat{false}; + ::mode_t mode; REFL_DECL_STRUCT(write_options, REFL_DECL_MEMBER(bool, verbose), REFL_DECL_MEMBER(std::string, pathname), REFL_DECL_MEMBER(std::string, data), - REFL_DECL_MEMBER(::size_t, count)); + REFL_DECL_MEMBER(::size_t, count), + REFL_DECL_MEMBER(bool, append), + REFL_DECL_MEMBER(bool, creat), + REFL_DECL_MEMBER(::mode_t, mode)); }; struct write_output { @@ -85,7 +90,9 @@ write_exec(const write_options& opts) { auto flags = O_WRONLY; if(opts.append) flags |= O_APPEND; - auto fd = ::open(opts.pathname.c_str(), flags); + if(opts.creat) + flags |= O_CREAT; + auto fd = ::open(opts.pathname.c_str(), flags, opts.mode); if(fd == -1) { if(opts.verbose) { @@ -142,5 +149,14 @@ write_init(CLI::App& app) { ->default_val(false) ->type_name(""); + cmd->add_flag("-c,--creat", opts->creat, + "Create file if it does not exist"); + + cmd->add_option("-m,--mode", opts->mode, + "Octal mode specified for the new file (e.g. 0664)") + ->default_val(0644) + ->type_name("") + ->check(CLI::NonNegativeNumber); + cmd->callback([opts]() { write_exec(*opts); }); } diff --git a/tests/integration/harness/gkfs.py b/tests/integration/harness/gkfs.py index e9722b5edfdf385f5cda2b74d6f6bee17287f743..277d7eda63b23a2836bd688e0a5346e6a00d5491 100644 --- a/tests/integration/harness/gkfs.py +++ b/tests/integration/harness/gkfs.py @@ -250,8 +250,8 @@ class Daemon: self._proxy = proxy libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) self._patched_env = { 'LD_LIBRARY_PATH' : libdirs, @@ -268,7 +268,7 @@ class Daemon: '-l', self._address, '--metadir', self._metadir.as_posix(), '--dbbackend', self._database, - '--output-stats', self.logdir / 'stats.log', + '--output-stats', (self.logdir / 'stats.log').as_posix(), '--enable-collection', '--enable-chunkstats'] if self._database == "parallaxdb" : @@ -394,8 +394,8 @@ class Proxy: self._env = os.environ.copy() libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) self._patched_env = { 'LD_LIBRARY_PATH' : libdirs, @@ -540,8 +540,8 @@ class Client: self._proxy = proxy libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) # ensure the client interception library is available: # to avoid running code with potentially installed libraries, @@ -594,14 +594,14 @@ class Client: logger.debug(f"patched env: {pformat(self._patched_env)}") out = self._cmd( - [ cmd ] + list(args), + [ str(cmd) ] + [ str(a) for a in args ], _env = self._env, # _out=sys.stdout, # _err=sys.stderr, ) - logger.debug(f"command output: {out.stdout}") - return self._parser.parse(cmd, out.stdout) + logger.debug(f"command output: {out}") + return self._parser.parse(cmd, out) def __getattr__(self, name): return _proxy_exec(self, name) @@ -624,8 +624,8 @@ class ClientLibc: self._env = os.environ.copy() libdirs = ':'.join( - filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + - [str(p) for p in self._workspace.libdirs])) + filter(None, [str(p) for p in self._workspace.libdirs] + + [os.environ.get('LD_LIBRARY_PATH', '')])) # ensure the client interception library is available: # to avoid running code with potentially installed libraries,