Loading include/client/rpc/forward_metadata.hpp +3 −1 Original line number Diff line number Diff line Loading @@ -103,7 +103,9 @@ forward_get_dirents(const std::string& path); std::pair<int, std::unique_ptr<std::vector< std::tuple<const std::string, bool, size_t, time_t>>>> forward_get_dirents_single(const std::string& path, int server); forward_get_dirents_single(const std::string& path, int server, const std::string& start_key = "", bool get_all = true); #ifdef HAS_SYMLINKS Loading include/client/rpc/rpc_types.hpp +12 −4 Original line number Diff line number Diff line Loading @@ -4108,15 +4108,16 @@ struct get_dirents_extended_proxy { HG_GEN_PROC_NAME(rpc_get_dirents_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, int32_t server, const std::string& start_key, const hermes::exposed_memory& buffers) : m_path(path), m_server(server), m_buffers(buffers) {} : m_path(path), m_server(server), m_start_key(start_key), m_buffers(buffers) {} input(input&& rhs) = default; Loading @@ -4138,6 +4139,11 @@ struct get_dirents_extended_proxy { return m_server; } std::string start_key() const { return m_start_key; } hermes::exposed_memory buffers() const { return m_buffers; Loading @@ -4145,16 +4151,18 @@ struct get_dirents_extended_proxy { explicit input(const rpc_proxy_get_dirents_in_t& other) : m_path(other.path), m_server(other.server), m_buffers(other.bulk_handle) {} m_start_key(other.start_key), m_buffers(other.bulk_handle) {} explicit operator rpc_proxy_get_dirents_in_t() { return {m_path.c_str(), m_server, hg_bulk_t(m_buffers)}; return {m_path.c_str(), m_server, m_start_key.c_str(), hg_bulk_t(m_buffers)}; } private: std::string m_path; int32_t m_server; std::string m_start_key; hermes::exposed_memory m_buffers; }; Loading include/common/rpc/rpc_types.hpp +4 −3 Original line number Diff line number Diff line Loading @@ -189,9 +189,10 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC(rpc_proxy_test_in_t, ((hg_const_string_t) (path))) MERCURY_GEN_PROC(rpc_proxy_get_dirents_in_t, ((hg_const_string_t) (path))((int32_t) (server))( (hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC( rpc_proxy_get_dirents_in_t, ((hg_const_string_t) (path))((hg_int32_t) (server))( (hg_const_string_t) (start_key))((hg_bulk_t) (bulk_handle))) // malleability client <-> daemon Loading include/proxy/rpc/forward_metadata.hpp +2 −1 Original line number Diff line number Diff line Loading @@ -48,7 +48,8 @@ forward_update_metadentry_size(const std::string& path, const size_t size, const off64_t offset, const bool append_flag); std::pair<int, std::vector<char>> forward_get_dirents_single(const std::string& path, int server); forward_get_dirents_single(const std::string& path, int server, const std::string& start_key); } // namespace gkfs::rpc Loading src/client/rpc/forward_metadata.cpp +15 −3 Original line number Diff line number Diff line Loading @@ -1054,14 +1054,17 @@ decompress_and_parse_entries(const gkfs::rpc::get_dirents_extended::output& out, * simplify the code removing the asynchronous part. */ pair<int, unique_ptr<vector<tuple<const std::string, bool, size_t, time_t>>>> forward_get_dirents_single(const string& path, int server) { forward_get_dirents_single(const string& path, int server, const std::string& start_key_arg, bool get_all) { if(gkfs::config::proxy::fwd_get_dirents_single && CTX->use_proxy()) { LOG(WARNING, "{} was called even though proxy should be used!", __func__); } LOG(DEBUG, "{}() enter for path '{}', server '{}'", __func__, path, server); LOG(DEBUG, "{}() enter for path '{}', server '{}' start_key '{}' get_all '{}'", __func__, path, server, start_key_arg, get_all); auto const targets = CTX->distributor()->locate_directory_metadata(); if((unsigned) server >= targets.size()) { Loading @@ -1073,7 +1076,7 @@ forward_get_dirents_single(const string& path, int server) { auto all_entries = make_unique< vector<tuple<const std::string, bool, size_t, time_t>>>(); int err = 0; string start_key = ""; string start_key = start_key_arg; // Chunking loop: keep fetching until no more entries are returned while(true) { Loading Loading @@ -1158,6 +1161,15 @@ forward_get_dirents_single(const string& path, int server) { all_entries->push_back(std::move(e)); } if(!get_all) { // If we only wanted one chunk, we are done. // If we receive less entries than the buffer/chunk size, // it's effectively the end but we return what we got. The // caller (proxy) will handle the continuation logic if // needed (it doesn't need to, it just forwards the chunk). return make_pair(0, std::move(all_entries)); } // Update start_key for next chunk start_key = get<0>(all_entries->back()); Loading Loading
include/client/rpc/forward_metadata.hpp +3 −1 Original line number Diff line number Diff line Loading @@ -103,7 +103,9 @@ forward_get_dirents(const std::string& path); std::pair<int, std::unique_ptr<std::vector< std::tuple<const std::string, bool, size_t, time_t>>>> forward_get_dirents_single(const std::string& path, int server); forward_get_dirents_single(const std::string& path, int server, const std::string& start_key = "", bool get_all = true); #ifdef HAS_SYMLINKS Loading
include/client/rpc/rpc_types.hpp +12 −4 Original line number Diff line number Diff line Loading @@ -4108,15 +4108,16 @@ struct get_dirents_extended_proxy { HG_GEN_PROC_NAME(rpc_get_dirents_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, int32_t server, const std::string& start_key, const hermes::exposed_memory& buffers) : m_path(path), m_server(server), m_buffers(buffers) {} : m_path(path), m_server(server), m_start_key(start_key), m_buffers(buffers) {} input(input&& rhs) = default; Loading @@ -4138,6 +4139,11 @@ struct get_dirents_extended_proxy { return m_server; } std::string start_key() const { return m_start_key; } hermes::exposed_memory buffers() const { return m_buffers; Loading @@ -4145,16 +4151,18 @@ struct get_dirents_extended_proxy { explicit input(const rpc_proxy_get_dirents_in_t& other) : m_path(other.path), m_server(other.server), m_buffers(other.bulk_handle) {} m_start_key(other.start_key), m_buffers(other.bulk_handle) {} explicit operator rpc_proxy_get_dirents_in_t() { return {m_path.c_str(), m_server, hg_bulk_t(m_buffers)}; return {m_path.c_str(), m_server, m_start_key.c_str(), hg_bulk_t(m_buffers)}; } private: std::string m_path; int32_t m_server; std::string m_start_key; hermes::exposed_memory m_buffers; }; Loading
include/common/rpc/rpc_types.hpp +4 −3 Original line number Diff line number Diff line Loading @@ -189,9 +189,10 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC(rpc_proxy_test_in_t, ((hg_const_string_t) (path))) MERCURY_GEN_PROC(rpc_proxy_get_dirents_in_t, ((hg_const_string_t) (path))((int32_t) (server))( (hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC( rpc_proxy_get_dirents_in_t, ((hg_const_string_t) (path))((hg_int32_t) (server))( (hg_const_string_t) (start_key))((hg_bulk_t) (bulk_handle))) // malleability client <-> daemon Loading
include/proxy/rpc/forward_metadata.hpp +2 −1 Original line number Diff line number Diff line Loading @@ -48,7 +48,8 @@ forward_update_metadentry_size(const std::string& path, const size_t size, const off64_t offset, const bool append_flag); std::pair<int, std::vector<char>> forward_get_dirents_single(const std::string& path, int server); forward_get_dirents_single(const std::string& path, int server, const std::string& start_key); } // namespace gkfs::rpc Loading
src/client/rpc/forward_metadata.cpp +15 −3 Original line number Diff line number Diff line Loading @@ -1054,14 +1054,17 @@ decompress_and_parse_entries(const gkfs::rpc::get_dirents_extended::output& out, * simplify the code removing the asynchronous part. */ pair<int, unique_ptr<vector<tuple<const std::string, bool, size_t, time_t>>>> forward_get_dirents_single(const string& path, int server) { forward_get_dirents_single(const string& path, int server, const std::string& start_key_arg, bool get_all) { if(gkfs::config::proxy::fwd_get_dirents_single && CTX->use_proxy()) { LOG(WARNING, "{} was called even though proxy should be used!", __func__); } LOG(DEBUG, "{}() enter for path '{}', server '{}'", __func__, path, server); LOG(DEBUG, "{}() enter for path '{}', server '{}' start_key '{}' get_all '{}'", __func__, path, server, start_key_arg, get_all); auto const targets = CTX->distributor()->locate_directory_metadata(); if((unsigned) server >= targets.size()) { Loading @@ -1073,7 +1076,7 @@ forward_get_dirents_single(const string& path, int server) { auto all_entries = make_unique< vector<tuple<const std::string, bool, size_t, time_t>>>(); int err = 0; string start_key = ""; string start_key = start_key_arg; // Chunking loop: keep fetching until no more entries are returned while(true) { Loading Loading @@ -1158,6 +1161,15 @@ forward_get_dirents_single(const string& path, int server) { all_entries->push_back(std::move(e)); } if(!get_all) { // If we only wanted one chunk, we are done. // If we receive less entries than the buffer/chunk size, // it's effectively the end but we return what we got. The // caller (proxy) will handle the continuation logic if // needed (it doesn't need to, it just forwards the chunk). return make_pair(0, std::move(all_entries)); } // Update start_key for next chunk start_key = get<0>(all_entries->back()); Loading