Loading include/client/preload_context.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -63,6 +63,7 @@ class PreloadContext { std::vector<hg_addr_t> hosts_; uint64_t local_host_id_; uint64_t fwd_host_id_; bool interception_enabled_; Loading Loading @@ -91,6 +92,9 @@ class PreloadContext { uint64_t local_host_id() const; void local_host_id(uint64_t id); uint64_t fwd_host_id() const; void fwd_host_id(uint64_t id); RelativizeStatus relativize_fd_path(int dirfd, const char * raw_path, std::string& relative_path, Loading include/client/preload_util.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -71,6 +71,7 @@ hg_addr_t get_local_addr(); void load_hosts(); bool lookup_all_hosts(); uint64_t get_my_forwarder(); void cleanup_addresses(); Loading include/global/rpc/distributor.hpp +13 −0 Original line number Diff line number Diff line Loading @@ -56,4 +56,17 @@ class LocalOnlyDistributor : public Distributor { std::vector<Host> locate_directory_metadata(const std::string& path) const override; }; class ForwarderDistributor : public Distributor { private: Host fwhost_; unsigned int hosts_size_; std::hash<std::string> str_hash; public: ForwarderDistributor(Host fwhost, unsigned int hosts_size); Host localhost() const override; Host locate_data(const std::string& path, const ChunkID& chnk_id) const override; Host locate_file_metadata(const std::string& path) const override; std::vector<Host> locate_directory_metadata(const std::string& path) const override; }; #endif //IFS_RPC_LOCATOR_HPP src/client/preload.cpp +16 −2 Original line number Diff line number Diff line Loading @@ -166,8 +166,22 @@ void init_ld_environment_() { } /* Setup distributor */ auto simple_hash_dist = std::make_shared<SimpleHashDistributor>(CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(simple_hash_dist); //auto simple_hash_dist = std::make_shared<SimpleHashDistributor>(CTX->local_host_id(), CTX->hosts().size()); //CTX->distributor(simple_hash_dist); try { CTX->fwd_host_id(get_my_forwarder()); if (CTX->fwd_host_id() > CTX->hosts().size()) { throw std::runtime_error("Invalid forwarding host"); } CTX->log()->debug("{}() Forward to {}", __func__, CTX->fwd_host_id()); } catch (std::exception& e){ exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what())); } auto forwarder_dist = std::make_shared<ForwarderDistributor>(CTX->fwd_host_id(), CTX->hosts().size()); CTX->distributor(forwarder_dist); if (!rpc_send::get_fs_config()) { exit_error_msg(EXIT_FAILURE, "Unable to fetch file system configurations from daemon process through RPC."); Loading src/client/preload_context.cpp +8 −0 Original line number Diff line number Diff line Loading @@ -73,6 +73,14 @@ void PreloadContext::local_host_id(uint64_t id) { local_host_id_ = id; } uint64_t PreloadContext::fwd_host_id() const { return fwd_host_id_; } void PreloadContext::fwd_host_id(uint64_t id) { fwd_host_id_ = id; } RelativizeStatus PreloadContext::relativize_fd_path(int dirfd, const char * raw_path, std::string& relative_path, Loading Loading
include/client/preload_context.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -63,6 +63,7 @@ class PreloadContext { std::vector<hg_addr_t> hosts_; uint64_t local_host_id_; uint64_t fwd_host_id_; bool interception_enabled_; Loading Loading @@ -91,6 +92,9 @@ class PreloadContext { uint64_t local_host_id() const; void local_host_id(uint64_t id); uint64_t fwd_host_id() const; void fwd_host_id(uint64_t id); RelativizeStatus relativize_fd_path(int dirfd, const char * raw_path, std::string& relative_path, Loading
include/client/preload_util.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -71,6 +71,7 @@ hg_addr_t get_local_addr(); void load_hosts(); bool lookup_all_hosts(); uint64_t get_my_forwarder(); void cleanup_addresses(); Loading
include/global/rpc/distributor.hpp +13 −0 Original line number Diff line number Diff line Loading @@ -56,4 +56,17 @@ class LocalOnlyDistributor : public Distributor { std::vector<Host> locate_directory_metadata(const std::string& path) const override; }; class ForwarderDistributor : public Distributor { private: Host fwhost_; unsigned int hosts_size_; std::hash<std::string> str_hash; public: ForwarderDistributor(Host fwhost, unsigned int hosts_size); Host localhost() const override; Host locate_data(const std::string& path, const ChunkID& chnk_id) const override; Host locate_file_metadata(const std::string& path) const override; std::vector<Host> locate_directory_metadata(const std::string& path) const override; }; #endif //IFS_RPC_LOCATOR_HPP
src/client/preload.cpp +16 −2 Original line number Diff line number Diff line Loading @@ -166,8 +166,22 @@ void init_ld_environment_() { } /* Setup distributor */ auto simple_hash_dist = std::make_shared<SimpleHashDistributor>(CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(simple_hash_dist); //auto simple_hash_dist = std::make_shared<SimpleHashDistributor>(CTX->local_host_id(), CTX->hosts().size()); //CTX->distributor(simple_hash_dist); try { CTX->fwd_host_id(get_my_forwarder()); if (CTX->fwd_host_id() > CTX->hosts().size()) { throw std::runtime_error("Invalid forwarding host"); } CTX->log()->debug("{}() Forward to {}", __func__, CTX->fwd_host_id()); } catch (std::exception& e){ exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what())); } auto forwarder_dist = std::make_shared<ForwarderDistributor>(CTX->fwd_host_id(), CTX->hosts().size()); CTX->distributor(forwarder_dist); if (!rpc_send::get_fs_config()) { exit_error_msg(EXIT_FAILURE, "Unable to fetch file system configurations from daemon process through RPC."); Loading
src/client/preload_context.cpp +8 −0 Original line number Diff line number Diff line Loading @@ -73,6 +73,14 @@ void PreloadContext::local_host_id(uint64_t id) { local_host_id_ = id; } uint64_t PreloadContext::fwd_host_id() const { return fwd_host_id_; } void PreloadContext::fwd_host_id(uint64_t id) { fwd_host_id_ = id; } RelativizeStatus PreloadContext::relativize_fd_path(int dirfd, const char * raw_path, std::string& relative_path, Loading