Loading include/global/rpc/distributor.hpp +24 −0 Original line number Diff line number Diff line Loading @@ -108,6 +108,30 @@ public: locate_directory_metadata(const std::string& path) const override; }; class RandomSlicingDistributor : public Distributor { private: host_t localhost_; unsigned int hosts_size_; std::vector<host_t> all_hosts_; std::hash<std::string> str_hash; public: RandomSlicingDistributor(host_t localhost, unsigned int hosts_size); host_t localhost() const override final; host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const override final; host_t locate_file_metadata(const std::string& path) const override; std::vector<host_t> locate_directory_metadata(const std::string& path) const override; }; } // namespace gkfs::rpc #endif // GEKKOFS_RPC_LOCATOR_HPP src/client/preload.cpp +10 −3 Original line number Diff line number Diff line Loading @@ -133,9 +133,16 @@ init_ld_environment_() { CTX->fwd_host_id(), CTX->hosts().size()); CTX->distributor(forwarder_dist); #else auto simple_hash_dist = std::make_shared<gkfs::rpc::SimpleHashDistributor>( if(gkfs::config::dynamic_placement) { auto dist = std::make_shared<gkfs::rpc::RandomSlicingDistributor>( CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(dist); } else { auto simple_hash_dist = std::make_shared<gkfs::rpc::SimpleHashDistributor>( CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(simple_hash_dist); } #endif LOG(INFO, "Retrieving file system configuration..."); Loading src/daemon/handler/srv_data.cpp +19 −4 Original line number Diff line number Diff line Loading @@ -138,7 +138,14 @@ rpc_srv_write(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); shared_ptr<gkfs::rpc::Distributor> distributor; if(gkfs::config::dynamic_placement) { distributor = make_shared<gkfs::rpc::RandomSlicingDistributor>( host_id, host_size); } else { distributor = make_shared<gkfs::rpc::SimpleHashDistributor>(host_id, host_size); } // chnk_ids used by this host vector<uint64_t> chnk_ids_host(in.chunk_n); Loading Loading @@ -182,7 +189,7 @@ rpc_srv_write(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING if(distributor.locate_data(in.path, chnk_id_file) != host_id) { if(distributor->locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); Loading Loading @@ -401,7 +408,15 @@ rpc_srv_read(hg_handle_t handle) { #ifndef GKFS_ENABLE_FORWARDING auto const host_id = in.host_id; auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); shared_ptr<gkfs::rpc::Distributor> distributor; if(gkfs::config::dynamic_placement) { distributor = make_shared<gkfs::rpc::RandomSlicingDistributor>( host_id, host_size); } else { distributor = make_shared<gkfs::rpc::SimpleHashDistributor>(host_id, host_size); } #endif // chnk_ids used by this host Loading Loading @@ -434,7 +449,7 @@ rpc_srv_read(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING if(distributor.locate_data(in.path, chnk_id_file) != host_id) { if(distributor->locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); Loading src/daemon/relocation/transmitter.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -46,7 +46,7 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { auto relocate_metadata_id = gkfs::rpc::get_rpc_id(mid, gkfs::rpc::tag::relocate_metadata); gkfs::rpc::SimpleHashDistributor distributor(localhost, hosts.size()); gkfs::rpc::RandomSlicingDistributor distributor(localhost, hosts.size()); cout << fmt::format("Got host_id = {} and parsed {} hosts.\n", localhost, hosts.size()); Loading src/global/rpc/distributor.cpp +28 −0 Original line number Diff line number Diff line Loading @@ -94,4 +94,32 @@ std::vector<host_t> ForwarderDistributor::locate_directory_metadata(const std::string& path) const { return all_hosts_; } RandomSlicingDistributor::RandomSlicingDistributor(host_t localhost, unsigned int hosts_size) : localhost_(localhost), hosts_size_(hosts_size), all_hosts_(hosts_size) { ::iota(all_hosts_.begin(), all_hosts_.end(), 0); } host_t RandomSlicingDistributor::localhost() const { return localhost_; } host_t RandomSlicingDistributor::locate_data(const std::string& path, const chunkid_t& chnk_id) const { return hosts_size_ - 1; } host_t RandomSlicingDistributor::locate_file_metadata(const std::string& path) const { return hosts_size_ - 1; } std::vector<host_t> RandomSlicingDistributor::locate_directory_metadata( const std::string& path) const { return all_hosts_; } } // namespace gkfs::rpc Loading
include/global/rpc/distributor.hpp +24 −0 Original line number Diff line number Diff line Loading @@ -108,6 +108,30 @@ public: locate_directory_metadata(const std::string& path) const override; }; class RandomSlicingDistributor : public Distributor { private: host_t localhost_; unsigned int hosts_size_; std::vector<host_t> all_hosts_; std::hash<std::string> str_hash; public: RandomSlicingDistributor(host_t localhost, unsigned int hosts_size); host_t localhost() const override final; host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const override final; host_t locate_file_metadata(const std::string& path) const override; std::vector<host_t> locate_directory_metadata(const std::string& path) const override; }; } // namespace gkfs::rpc #endif // GEKKOFS_RPC_LOCATOR_HPP
src/client/preload.cpp +10 −3 Original line number Diff line number Diff line Loading @@ -133,9 +133,16 @@ init_ld_environment_() { CTX->fwd_host_id(), CTX->hosts().size()); CTX->distributor(forwarder_dist); #else auto simple_hash_dist = std::make_shared<gkfs::rpc::SimpleHashDistributor>( if(gkfs::config::dynamic_placement) { auto dist = std::make_shared<gkfs::rpc::RandomSlicingDistributor>( CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(dist); } else { auto simple_hash_dist = std::make_shared<gkfs::rpc::SimpleHashDistributor>( CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(simple_hash_dist); } #endif LOG(INFO, "Retrieving file system configuration..."); Loading
src/daemon/handler/srv_data.cpp +19 −4 Original line number Diff line number Diff line Loading @@ -138,7 +138,14 @@ rpc_srv_write(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); shared_ptr<gkfs::rpc::Distributor> distributor; if(gkfs::config::dynamic_placement) { distributor = make_shared<gkfs::rpc::RandomSlicingDistributor>( host_id, host_size); } else { distributor = make_shared<gkfs::rpc::SimpleHashDistributor>(host_id, host_size); } // chnk_ids used by this host vector<uint64_t> chnk_ids_host(in.chunk_n); Loading Loading @@ -182,7 +189,7 @@ rpc_srv_write(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING if(distributor.locate_data(in.path, chnk_id_file) != host_id) { if(distributor->locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); Loading Loading @@ -401,7 +408,15 @@ rpc_srv_read(hg_handle_t handle) { #ifndef GKFS_ENABLE_FORWARDING auto const host_id = in.host_id; auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); shared_ptr<gkfs::rpc::Distributor> distributor; if(gkfs::config::dynamic_placement) { distributor = make_shared<gkfs::rpc::RandomSlicingDistributor>( host_id, host_size); } else { distributor = make_shared<gkfs::rpc::SimpleHashDistributor>(host_id, host_size); } #endif // chnk_ids used by this host Loading Loading @@ -434,7 +449,7 @@ rpc_srv_read(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING if(distributor.locate_data(in.path, chnk_id_file) != host_id) { if(distributor->locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); Loading
src/daemon/relocation/transmitter.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -46,7 +46,7 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { auto relocate_metadata_id = gkfs::rpc::get_rpc_id(mid, gkfs::rpc::tag::relocate_metadata); gkfs::rpc::SimpleHashDistributor distributor(localhost, hosts.size()); gkfs::rpc::RandomSlicingDistributor distributor(localhost, hosts.size()); cout << fmt::format("Got host_id = {} and parsed {} hosts.\n", localhost, hosts.size()); Loading
src/global/rpc/distributor.cpp +28 −0 Original line number Diff line number Diff line Loading @@ -94,4 +94,32 @@ std::vector<host_t> ForwarderDistributor::locate_directory_metadata(const std::string& path) const { return all_hosts_; } RandomSlicingDistributor::RandomSlicingDistributor(host_t localhost, unsigned int hosts_size) : localhost_(localhost), hosts_size_(hosts_size), all_hosts_(hosts_size) { ::iota(all_hosts_.begin(), all_hosts_.end(), 0); } host_t RandomSlicingDistributor::localhost() const { return localhost_; } host_t RandomSlicingDistributor::locate_data(const std::string& path, const chunkid_t& chnk_id) const { return hosts_size_ - 1; } host_t RandomSlicingDistributor::locate_file_metadata(const std::string& path) const { return hosts_size_ - 1; } std::vector<host_t> RandomSlicingDistributor::locate_directory_metadata( const std::string& path) const { return all_hosts_; } } // namespace gkfs::rpc