Loading include/client/rpc/forward_metadata.hpp +4 −4 Original line number Diff line number Diff line Loading @@ -112,12 +112,12 @@ forward_mk_symlink(const std::string& path, const std::string& target_path); * @param path The file path. * @param buf Pointer to the data buffer. * @param offset The file offset. * @param write_size The number of bytes to write. * @return int Error code (0 on success). * @param append_flag Whether to append to the file. * @return std::pair<int, off64_t> Error code and offset written. */ int std::pair<int, off64_t> forward_write_inline(const std::string& path, const void* buf, off64_t offset, size_t write_size); size_t write_size, bool append_flag); /** * @brief Send an RPC request to read a small amount of data directly Loading include/client/rpc/rpc_types.hpp +23 −8 Original line number Diff line number Diff line Loading @@ -2461,7 +2461,7 @@ struct write_data_inline { using input_type = input; using output_type = output; using mercury_input_type = rpc_write_inline_in_t; using mercury_output_type = rpc_err_out_t; using mercury_output_type = rpc_write_inline_out_t; constexpr static const uint64_t public_id = 60; // Unique ID constexpr static const hg_id_t mercury_id = 0; Loading @@ -2470,7 +2470,7 @@ struct write_data_inline { constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_write_inline_in_t); constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_err_out_t); HG_GEN_PROC_NAME(rpc_write_inline_out_t); class input { template <typename ExecutionContext> Loading @@ -2479,16 +2479,18 @@ struct write_data_inline { public: input(const std::string& path, uint64_t offset, const std::string& data, uint64_t count) : m_path(path), m_offset(offset), m_data(data), m_count(count) {} uint64_t count, bool append) : m_path(path), m_offset(offset), m_data(data), m_count(count), m_append(append) {} explicit input(const rpc_write_inline_in_t& other) : m_path(other.path), m_offset(other.offset), m_data(other.data), m_count(other.count) {} m_count(other.count), m_append(other.append) {} explicit operator rpc_write_inline_in_t() { return {m_path.c_str(), m_offset, m_data.c_str(), m_count}; return {m_path.c_str(), m_offset, m_data.c_str(), m_count, static_cast<hg_bool_t>(m_append)}; } private: Loading @@ -2496,6 +2498,7 @@ struct write_data_inline { uint64_t m_offset; std::string m_data; uint64_t m_count; bool m_append; }; class output { Loading @@ -2504,15 +2507,27 @@ struct write_data_inline { hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err(0) {} explicit output(const rpc_err_out_t& out) : m_err(out.err) {} output() : m_err(0), m_ret_offset(0), m_io_size(0) {} explicit output(const rpc_write_inline_out_t& out) : m_err(out.err), m_ret_offset(out.ret_offset), m_io_size(out.io_size) {} int32_t err() const { return m_err; } int64_t ret_offset() const { return m_ret_offset; } size_t io_size() const { return m_io_size; } private: int32_t m_err; int64_t m_ret_offset; size_t m_io_size; }; }; Loading include/common/metadata.hpp +5 −0 Original line number Diff line number Diff line Loading @@ -85,6 +85,11 @@ public: Metadata(mode_t mode, const std::string& target_path); #ifdef HAS_RENAME Metadata(mode_t mode, const std::string& target_path, const std::string& rename_path); #endif #endif // Construct from a binary representation of the object Loading include/common/rpc/rpc_types.hpp +7 −1 Original line number Diff line number Diff line Loading @@ -193,7 +193,13 @@ MERCURY_GEN_PROC(rpc_migrate_metadata_in_t, // Inline write operations MERCURY_GEN_PROC(rpc_write_inline_in_t, ((hg_const_string_t) (path))((hg_uint64_t) (offset))( (hg_const_string_t) (data))((hg_uint64_t) (count))) (hg_const_string_t) (data))((hg_uint64_t) (count))( (hg_bool_t) (append))) // Output: err, offset (for append), bytes written MERCURY_GEN_PROC( rpc_write_inline_out_t, ((hg_int32_t) (err))((hg_int64_t) (ret_offset))((hg_size_t) (io_size))) // Input: path, offset, read length MERCURY_GEN_PROC(rpc_read_inline_in_t, Loading src/client/gkfs_functions.cpp +35 −22 Original line number Diff line number Diff line Loading @@ -968,6 +968,9 @@ gkfs_truncate(const std::string& path, off_t length) { while(!md.value().target_path().empty() and md.value().blocks() != -1) { new_path = md.value().target_path(); md = gkfs::utils::get_metadata(md.value().target_path()); if(!md) { return -1; } } // This could be optimized auto size = md->size(); Loading Loading @@ -1063,34 +1066,22 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, // OPTIMIZATION: Inline Write if(gkfs::config::metadata::use_inline_data && offset == 0 && count <= gkfs::config::metadata::inline_data_size) { if(gkfs::config::metadata::use_inline_data && (is_append || (offset + count) <= gkfs::config::metadata::inline_data_size)) { // Attempt inline write via Metadata RPC auto err = gkfs::rpc::forward_write_inline(file.path(), buf, offset, count); auto ret_inline = gkfs::rpc::forward_write_inline( file.path(), buf, offset, count, is_append); auto err = ret_inline.first; if(err == 0) { if(update_pos) file.pos(offset + count); file.pos(ret_inline.second + count); return count; } } if(CTX->use_write_size_cache() && !is_append) { auto [size_update_cnt, cached_size] = CTX->write_size_cache()->record(*path, offset + count); if(size_update_cnt > CTX->write_size_cache()->flush_threshold()) { err = CTX->write_size_cache()->flush(*path, false).first; if(err) { LOG(ERROR, "update_metadentry_size() during cache flush failed with err '{}'", err); errno = err; return -1; } } } else { if(is_append) { auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, is_append); err = ret_offset.first; Loading @@ -1099,7 +1090,6 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, errno = err; return -1; } if(is_append) { // When append is set the EOF is set to the offset // forward_update_metadentry_size returns. This is because // it is an atomic operation on the server and reserves the Loading @@ -1114,6 +1104,29 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, } offset = ret_offset.second; } if(CTX->use_write_size_cache() && !is_append) { auto [size_update_cnt, cached_size] = CTX->write_size_cache()->record(*path, offset + count); if(size_update_cnt > CTX->write_size_cache()->flush_threshold()) { err = CTX->write_size_cache()->flush(*path, false).first; if(err) { LOG(ERROR, "update_metadentry_size() during cache flush failed with err '{}'", err); errno = err; return -1; } } } else if(!is_append) { auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, is_append); err = ret_offset.first; if(err) { LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); errno = err; return -1; } } pair<int, long> ret_write; Loading Loading
include/client/rpc/forward_metadata.hpp +4 −4 Original line number Diff line number Diff line Loading @@ -112,12 +112,12 @@ forward_mk_symlink(const std::string& path, const std::string& target_path); * @param path The file path. * @param buf Pointer to the data buffer. * @param offset The file offset. * @param write_size The number of bytes to write. * @return int Error code (0 on success). * @param append_flag Whether to append to the file. * @return std::pair<int, off64_t> Error code and offset written. */ int std::pair<int, off64_t> forward_write_inline(const std::string& path, const void* buf, off64_t offset, size_t write_size); size_t write_size, bool append_flag); /** * @brief Send an RPC request to read a small amount of data directly Loading
include/client/rpc/rpc_types.hpp +23 −8 Original line number Diff line number Diff line Loading @@ -2461,7 +2461,7 @@ struct write_data_inline { using input_type = input; using output_type = output; using mercury_input_type = rpc_write_inline_in_t; using mercury_output_type = rpc_err_out_t; using mercury_output_type = rpc_write_inline_out_t; constexpr static const uint64_t public_id = 60; // Unique ID constexpr static const hg_id_t mercury_id = 0; Loading @@ -2470,7 +2470,7 @@ struct write_data_inline { constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_write_inline_in_t); constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_err_out_t); HG_GEN_PROC_NAME(rpc_write_inline_out_t); class input { template <typename ExecutionContext> Loading @@ -2479,16 +2479,18 @@ struct write_data_inline { public: input(const std::string& path, uint64_t offset, const std::string& data, uint64_t count) : m_path(path), m_offset(offset), m_data(data), m_count(count) {} uint64_t count, bool append) : m_path(path), m_offset(offset), m_data(data), m_count(count), m_append(append) {} explicit input(const rpc_write_inline_in_t& other) : m_path(other.path), m_offset(other.offset), m_data(other.data), m_count(other.count) {} m_count(other.count), m_append(other.append) {} explicit operator rpc_write_inline_in_t() { return {m_path.c_str(), m_offset, m_data.c_str(), m_count}; return {m_path.c_str(), m_offset, m_data.c_str(), m_count, static_cast<hg_bool_t>(m_append)}; } private: Loading @@ -2496,6 +2498,7 @@ struct write_data_inline { uint64_t m_offset; std::string m_data; uint64_t m_count; bool m_append; }; class output { Loading @@ -2504,15 +2507,27 @@ struct write_data_inline { hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err(0) {} explicit output(const rpc_err_out_t& out) : m_err(out.err) {} output() : m_err(0), m_ret_offset(0), m_io_size(0) {} explicit output(const rpc_write_inline_out_t& out) : m_err(out.err), m_ret_offset(out.ret_offset), m_io_size(out.io_size) {} int32_t err() const { return m_err; } int64_t ret_offset() const { return m_ret_offset; } size_t io_size() const { return m_io_size; } private: int32_t m_err; int64_t m_ret_offset; size_t m_io_size; }; }; Loading
include/common/metadata.hpp +5 −0 Original line number Diff line number Diff line Loading @@ -85,6 +85,11 @@ public: Metadata(mode_t mode, const std::string& target_path); #ifdef HAS_RENAME Metadata(mode_t mode, const std::string& target_path, const std::string& rename_path); #endif #endif // Construct from a binary representation of the object Loading
include/common/rpc/rpc_types.hpp +7 −1 Original line number Diff line number Diff line Loading @@ -193,7 +193,13 @@ MERCURY_GEN_PROC(rpc_migrate_metadata_in_t, // Inline write operations MERCURY_GEN_PROC(rpc_write_inline_in_t, ((hg_const_string_t) (path))((hg_uint64_t) (offset))( (hg_const_string_t) (data))((hg_uint64_t) (count))) (hg_const_string_t) (data))((hg_uint64_t) (count))( (hg_bool_t) (append))) // Output: err, offset (for append), bytes written MERCURY_GEN_PROC( rpc_write_inline_out_t, ((hg_int32_t) (err))((hg_int64_t) (ret_offset))((hg_size_t) (io_size))) // Input: path, offset, read length MERCURY_GEN_PROC(rpc_read_inline_in_t, Loading
src/client/gkfs_functions.cpp +35 −22 Original line number Diff line number Diff line Loading @@ -968,6 +968,9 @@ gkfs_truncate(const std::string& path, off_t length) { while(!md.value().target_path().empty() and md.value().blocks() != -1) { new_path = md.value().target_path(); md = gkfs::utils::get_metadata(md.value().target_path()); if(!md) { return -1; } } // This could be optimized auto size = md->size(); Loading Loading @@ -1063,34 +1066,22 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, // OPTIMIZATION: Inline Write if(gkfs::config::metadata::use_inline_data && offset == 0 && count <= gkfs::config::metadata::inline_data_size) { if(gkfs::config::metadata::use_inline_data && (is_append || (offset + count) <= gkfs::config::metadata::inline_data_size)) { // Attempt inline write via Metadata RPC auto err = gkfs::rpc::forward_write_inline(file.path(), buf, offset, count); auto ret_inline = gkfs::rpc::forward_write_inline( file.path(), buf, offset, count, is_append); auto err = ret_inline.first; if(err == 0) { if(update_pos) file.pos(offset + count); file.pos(ret_inline.second + count); return count; } } if(CTX->use_write_size_cache() && !is_append) { auto [size_update_cnt, cached_size] = CTX->write_size_cache()->record(*path, offset + count); if(size_update_cnt > CTX->write_size_cache()->flush_threshold()) { err = CTX->write_size_cache()->flush(*path, false).first; if(err) { LOG(ERROR, "update_metadentry_size() during cache flush failed with err '{}'", err); errno = err; return -1; } } } else { if(is_append) { auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, is_append); err = ret_offset.first; Loading @@ -1099,7 +1090,6 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, errno = err; return -1; } if(is_append) { // When append is set the EOF is set to the offset // forward_update_metadentry_size returns. This is because // it is an atomic operation on the server and reserves the Loading @@ -1114,6 +1104,29 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, } offset = ret_offset.second; } if(CTX->use_write_size_cache() && !is_append) { auto [size_update_cnt, cached_size] = CTX->write_size_cache()->record(*path, offset + count); if(size_update_cnt > CTX->write_size_cache()->flush_threshold()) { err = CTX->write_size_cache()->flush(*path, false).first; if(err) { LOG(ERROR, "update_metadentry_size() during cache flush failed with err '{}'", err); errno = err; return -1; } } } else if(!is_append) { auto ret_offset = gkfs::utils::update_file_size(*path, count, offset, is_append); err = ret_offset.first; if(err) { LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); errno = err; return -1; } } pair<int, long> ret_write; Loading