Loading include/common/rpc/rpc_types_thallium.hpp +3 −7 Original line number Diff line number Diff line Loading @@ -242,18 +242,14 @@ struct rpc_read_data_in_t { uint64_t chunk_start; uint64_t chunk_end; uint64_t total_chunk_size; tl::bulk bulk_handle; // SERIALIZATION OF BULK HANDLE? // Thallium bulk handles generally need to be exposed. // But here we are defining the input struct. // Thallium handles bulk separately or as part of // args? serialize function for bulk handle exists in // Thallium. tl::bulk bulk_handle; bool fuzzy_relo = false; template <class Archive> void serialize(Archive& ar) { ar(path, offset, host_id, host_size, wbitset, chunk_n, chunk_start, chunk_end, total_chunk_size, bulk_handle); chunk_end, total_chunk_size, bulk_handle, fuzzy_relo); } }; Loading include/config.hpp +7 −0 Original line number Diff line number Diff line Loading @@ -164,6 +164,10 @@ constexpr auto fwd_io_count_threshold = 0; } // namespace proxy namespace daemon { inline int fuzzy_relocation_targets = 0; } // namespace daemon namespace rpc { constexpr auto chunksize = 524288; // in bytes (e.g., 524288 == 512KB) // size of preallocated buffer to hold directory entries in rpc call Loading @@ -183,6 +187,9 @@ constexpr auto proxy_handler_xstreams = 3; inline bool use_dirents_compression = false; // Enable data compression inline bool use_data_compression = false; // Enable fuzzy relocation (optimistic local read) inline bool fuzzy_relocation = false; } // namespace rpc namespace rocksdb { Loading include/daemon/backend/data/chunk_storage.hpp +10 −0 Original line number Diff line number Diff line Loading @@ -138,6 +138,16 @@ public: void destroy_chunk_space(const std::string& file_path) const; /** * @brief Check if a chunk file exists * @param file_path Chunk file path, e.g., /foo/bar * @param chunk_id Number of chunk id * @return true if exists, false otherwise */ bool chunk_exists(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id) const; /** * @brief Writes a single chunk file and is usually called by an Argobots * tasklet. Loading src/client/preload.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -474,6 +474,11 @@ init_preload() { gkfs::config::rpc::use_dirents_compression ? "ON" : "OFF") == "ON"; gkfs::config::rpc::fuzzy_relocation = gkfs::env::get_var( "LIBGKFS_FUZZY_RELOCATION", gkfs::config::rpc::fuzzy_relocation ? "ON" : "OFF") == "ON"; #ifndef BYPASS_SYSCALL gkfs::preload::start_interception(); Loading src/client/rpc/forward_data.cpp +56 −0 Original line number Diff line number Diff line Loading @@ -419,6 +419,62 @@ forward_read(const string& path, void* buf, const off64_t offset, return make_pair(EBUSY, 0); } // Fuzzy Relocation Logic if(gkfs::config::rpc::fuzzy_relocation && num_copies == 0 && failed.empty()) { auto local_host_id = CTX->local_host_id(); if(local_host_id != static_cast<unsigned int>(-1)) { auto read_rpc = CTX->rpc_engine()->define(gkfs::rpc::tag::read); // Construct RPC for LOCAL gkfs::rpc::rpc_read_data_in_t in; in.path = path; in.offset = block_overrun(offset, gkfs::config::rpc::chunksize); in.host_id = local_host_id; in.host_size = CTX->hosts().size(); // Bitset for ALL chunks in range std::vector<uint8_t> chnk_bitset( ((chnk_end - chnk_start) + 1 + 7) / 8, 0); for(uint64_t i = 0; i <= (chnk_end - chnk_start); i++) { gkfs::rpc::set_bitset(chnk_bitset, i); } in.wbitset = gkfs::rpc::compress_bitset(chnk_bitset); in.chunk_n = (chnk_end - chnk_start) + 1; in.chunk_start = chnk_start; in.chunk_end = chnk_end; in.total_chunk_size = target_chnks[local_host_id].size() * gkfs::config::rpc::chunksize; // WAIT. target_chnks is not fully populated for standard // distribution yet? We need to calculate size manually. auto total_chunk_size = in.chunk_n * gkfs::config::rpc::chunksize; // Adjust first/last total_chunk_size -= block_overrun(offset, gkfs::config::rpc::chunksize); if(!is_aligned(offset + read_size, gkfs::config::rpc::chunksize)) { total_chunk_size -= block_underrun( offset + read_size, gkfs::config::rpc::chunksize); } in.total_chunk_size = total_chunk_size; in.bulk_handle = bulk_handle; in.fuzzy_relo = true; try { gkfs::rpc::rpc_data_out_t out = read_rpc.on(CTX->hosts().at(local_host_id))(in); if(out.err == 0) { return make_pair(0, out.io_size); } // If out.err is ENOENT (or whatever we set), fall through. } catch(const std::exception& ex) { LOG(ERROR, "Fuzzy local read failed: {}", ex.what()); // Fall through } } } std::vector<thallium::async_response> waiters; waiters.reserve(targets.size()); std::vector<uint64_t> waiter_targets; // track targets for error reporting Loading Loading
include/common/rpc/rpc_types_thallium.hpp +3 −7 Original line number Diff line number Diff line Loading @@ -242,18 +242,14 @@ struct rpc_read_data_in_t { uint64_t chunk_start; uint64_t chunk_end; uint64_t total_chunk_size; tl::bulk bulk_handle; // SERIALIZATION OF BULK HANDLE? // Thallium bulk handles generally need to be exposed. // But here we are defining the input struct. // Thallium handles bulk separately or as part of // args? serialize function for bulk handle exists in // Thallium. tl::bulk bulk_handle; bool fuzzy_relo = false; template <class Archive> void serialize(Archive& ar) { ar(path, offset, host_id, host_size, wbitset, chunk_n, chunk_start, chunk_end, total_chunk_size, bulk_handle); chunk_end, total_chunk_size, bulk_handle, fuzzy_relo); } }; Loading
include/config.hpp +7 −0 Original line number Diff line number Diff line Loading @@ -164,6 +164,10 @@ constexpr auto fwd_io_count_threshold = 0; } // namespace proxy namespace daemon { inline int fuzzy_relocation_targets = 0; } // namespace daemon namespace rpc { constexpr auto chunksize = 524288; // in bytes (e.g., 524288 == 512KB) // size of preallocated buffer to hold directory entries in rpc call Loading @@ -183,6 +187,9 @@ constexpr auto proxy_handler_xstreams = 3; inline bool use_dirents_compression = false; // Enable data compression inline bool use_data_compression = false; // Enable fuzzy relocation (optimistic local read) inline bool fuzzy_relocation = false; } // namespace rpc namespace rocksdb { Loading
include/daemon/backend/data/chunk_storage.hpp +10 −0 Original line number Diff line number Diff line Loading @@ -138,6 +138,16 @@ public: void destroy_chunk_space(const std::string& file_path) const; /** * @brief Check if a chunk file exists * @param file_path Chunk file path, e.g., /foo/bar * @param chunk_id Number of chunk id * @return true if exists, false otherwise */ bool chunk_exists(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id) const; /** * @brief Writes a single chunk file and is usually called by an Argobots * tasklet. Loading
src/client/preload.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -474,6 +474,11 @@ init_preload() { gkfs::config::rpc::use_dirents_compression ? "ON" : "OFF") == "ON"; gkfs::config::rpc::fuzzy_relocation = gkfs::env::get_var( "LIBGKFS_FUZZY_RELOCATION", gkfs::config::rpc::fuzzy_relocation ? "ON" : "OFF") == "ON"; #ifndef BYPASS_SYSCALL gkfs::preload::start_interception(); Loading
src/client/rpc/forward_data.cpp +56 −0 Original line number Diff line number Diff line Loading @@ -419,6 +419,62 @@ forward_read(const string& path, void* buf, const off64_t offset, return make_pair(EBUSY, 0); } // Fuzzy Relocation Logic if(gkfs::config::rpc::fuzzy_relocation && num_copies == 0 && failed.empty()) { auto local_host_id = CTX->local_host_id(); if(local_host_id != static_cast<unsigned int>(-1)) { auto read_rpc = CTX->rpc_engine()->define(gkfs::rpc::tag::read); // Construct RPC for LOCAL gkfs::rpc::rpc_read_data_in_t in; in.path = path; in.offset = block_overrun(offset, gkfs::config::rpc::chunksize); in.host_id = local_host_id; in.host_size = CTX->hosts().size(); // Bitset for ALL chunks in range std::vector<uint8_t> chnk_bitset( ((chnk_end - chnk_start) + 1 + 7) / 8, 0); for(uint64_t i = 0; i <= (chnk_end - chnk_start); i++) { gkfs::rpc::set_bitset(chnk_bitset, i); } in.wbitset = gkfs::rpc::compress_bitset(chnk_bitset); in.chunk_n = (chnk_end - chnk_start) + 1; in.chunk_start = chnk_start; in.chunk_end = chnk_end; in.total_chunk_size = target_chnks[local_host_id].size() * gkfs::config::rpc::chunksize; // WAIT. target_chnks is not fully populated for standard // distribution yet? We need to calculate size manually. auto total_chunk_size = in.chunk_n * gkfs::config::rpc::chunksize; // Adjust first/last total_chunk_size -= block_overrun(offset, gkfs::config::rpc::chunksize); if(!is_aligned(offset + read_size, gkfs::config::rpc::chunksize)) { total_chunk_size -= block_underrun( offset + read_size, gkfs::config::rpc::chunksize); } in.total_chunk_size = total_chunk_size; in.bulk_handle = bulk_handle; in.fuzzy_relo = true; try { gkfs::rpc::rpc_data_out_t out = read_rpc.on(CTX->hosts().at(local_host_id))(in); if(out.err == 0) { return make_pair(0, out.io_size); } // If out.err is ENOENT (or whatever we set), fall through. } catch(const std::exception& ex) { LOG(ERROR, "Fuzzy local read failed: {}", ex.what()); // Fall through } } } std::vector<thallium::async_response> waiters; waiters.reserve(targets.size()); std::vector<uint64_t> waiter_targets; // track targets for error reporting Loading