Loading include/common/rpc/distributor.hpp +19 −10 Original line number Diff line number Diff line /* Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany Copyright 2018-2022, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2022, Johannes Gutenberg Universitaet Mainz, Germany This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). Loading Loading @@ -35,10 +35,7 @@ #include <numeric> #include <unordered_map> #include <fstream> #ifdef GKFS_USE_GUIDED_DISTRIBUTION #include <boost/icl/interval_map.hpp> #endif #include <map> namespace gkfs::rpc { Loading Loading @@ -140,15 +137,28 @@ public: std::vector<host_t> locate_directory_metadata(const std::string& path) const override; }; #ifdef GKFS_USE_GUIDED_DISTRIBUTION /* * Class IntervalSet * FROM *https://stackoverflow.com/questions/55646605/is-there-a-collection-for-storing-discrete-intervals **/ class IntervalSet { std::map<chunkid_t, chunkid_t> _intervals; public: void Add(chunkid_t, chunkid_t); bool IsInsideInterval(unsigned int) const; }; class GuidedDistributor : public Distributor { private: host_t localhost_; unsigned int hosts_size_{0}; std::vector<host_t> all_hosts_; std::hash<std::string> str_hash; std::unordered_map<std::string, boost::icl::interval_map<chunkid_t, unsigned int>> std::unordered_map<std::string, std::pair<IntervalSet, unsigned int>> map_interval; std::vector<std::string> prefix_list; // Should not be very long bool Loading Loading @@ -176,7 +186,6 @@ public: std::vector<host_t> locate_directory_metadata(const std::string& path) const override; }; #endif } // namespace gkfs::rpc Loading src/common/CMakeLists.txt +0 −4 Original line number Diff line number Diff line Loading @@ -38,10 +38,6 @@ target_sources(distributor PRIVATE ${CMAKE_CURRENT_LIST_DIR}/rpc/distributor.cpp ) if(GKFS_USE_GUIDED_DISTRIBUTION) find_package(Boost 1.53 REQUIRED) target_link_libraries(distributor PRIVATE Boost::boost) endif() if(GKFS_ENABLE_CODE_COVERAGE) target_code_coverage(distributor AUTO) Loading src/common/rpc/distributor.cpp +40 −17 Original line number Diff line number Diff line /* Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany Copyright 2018-2022, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2022, Johannes Gutenberg Universitaet Mainz, Germany This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). Loading Loading @@ -125,7 +125,31 @@ std::vector<host_t> ForwarderDistributor::locate_directory_metadata(const std::string& path) const { return all_hosts_; } #ifdef GKFS_USE_GUIDED_DISTRIBUTION void IntervalSet::Add(chunkid_t smaller, chunkid_t bigger) { const auto next = _intervals.upper_bound(smaller); if(next != _intervals.cbegin()) { const auto prev = std::prev(next); if(next != _intervals.cend() && next->first <= bigger + 1) { bigger = next->second; _intervals.erase(next); } if(prev->second + 1 >= smaller) { smaller = prev->first; _intervals.erase(prev); } } _intervals[smaller] = bigger; } bool IntervalSet::IsInsideInterval(unsigned int v) const { const auto suspectNext = _intervals.upper_bound(v); const auto suspect = std::prev(suspectNext); return suspect->first <= v && v <= suspect->second; } bool GuidedDistributor::init_guided() { unsigned int destination_host; Loading @@ -148,16 +172,15 @@ GuidedDistributor::init_guided() { } auto I = map_interval.find(path); if(I == map_interval.end()) map_interval[path] += make_pair( boost::icl::discrete_interval<chunkid_t>::right_open( chunk_id, chunk_id + 1), destination_host + 1); else if(I->second.find(chunk_id) == I->second.end()) I->second.insert(make_pair( boost::icl::discrete_interval<chunkid_t>::right_open( chunk_id, chunk_id + 1), destination_host + 1)); if(I == map_interval.end()) { auto tmp = IntervalSet(); tmp.Add(chunk_id, chunk_id + 1); map_interval[path] = make_pair(tmp, destination_host + 1); } else if(I->second.first.IsInsideInterval(chunk_id)) { auto is = I->second.first; is.Add(chunk_id, chunk_id + 1); I->second = (make_pair(is, destination_host + 1)); } } mapfile.close(); return true; Loading Loading @@ -200,9 +223,9 @@ GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id) const { auto it = map_interval.find(path); if(it != map_interval.end()) { auto it_f = it->second.find(chnk_id); if(it_f != it->second.end()) { return (it_f->second - auto it_f = it->second.first.IsInsideInterval(chnk_id); if(it_f) { return (it->second.second - 1); // Decrement destination host from the interval_map } } Loading @@ -227,6 +250,6 @@ GuidedDistributor::locate_file_metadata(const string& path) const { GuidedDistributor::locate_directory_metadata(const string& path) const { return all_hosts_; } #endif } // namespace rpc } // namespace gkfs tests/unit/CMakeLists.txt +3 −4 Original line number Diff line number Diff line Loading @@ -66,10 +66,9 @@ target_sources(tests ${CMAKE_CURRENT_LIST_DIR}/test_example_01.cpp ${CMAKE_CURRENT_LIST_DIR}/test_utils_arithmetic.cpp ${CMAKE_CURRENT_LIST_DIR}/test_helpers.cpp ) if(GKFS_TESTS_GUIDED_DISTRIBUTION) target_sources(tests PRIVATE ${CMAKE_CURRENT_LIST_DIR}/test_guided_distributor.cpp) endif() ${CMAKE_CURRENT_LIST_DIR}/test_guided_distributor.cpp) target_link_libraries(tests PRIVATE catch2_main Loading Loading
include/common/rpc/distributor.hpp +19 −10 Original line number Diff line number Diff line /* Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany Copyright 2018-2022, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2022, Johannes Gutenberg Universitaet Mainz, Germany This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). Loading Loading @@ -35,10 +35,7 @@ #include <numeric> #include <unordered_map> #include <fstream> #ifdef GKFS_USE_GUIDED_DISTRIBUTION #include <boost/icl/interval_map.hpp> #endif #include <map> namespace gkfs::rpc { Loading Loading @@ -140,15 +137,28 @@ public: std::vector<host_t> locate_directory_metadata(const std::string& path) const override; }; #ifdef GKFS_USE_GUIDED_DISTRIBUTION /* * Class IntervalSet * FROM *https://stackoverflow.com/questions/55646605/is-there-a-collection-for-storing-discrete-intervals **/ class IntervalSet { std::map<chunkid_t, chunkid_t> _intervals; public: void Add(chunkid_t, chunkid_t); bool IsInsideInterval(unsigned int) const; }; class GuidedDistributor : public Distributor { private: host_t localhost_; unsigned int hosts_size_{0}; std::vector<host_t> all_hosts_; std::hash<std::string> str_hash; std::unordered_map<std::string, boost::icl::interval_map<chunkid_t, unsigned int>> std::unordered_map<std::string, std::pair<IntervalSet, unsigned int>> map_interval; std::vector<std::string> prefix_list; // Should not be very long bool Loading Loading @@ -176,7 +186,6 @@ public: std::vector<host_t> locate_directory_metadata(const std::string& path) const override; }; #endif } // namespace gkfs::rpc Loading
src/common/CMakeLists.txt +0 −4 Original line number Diff line number Diff line Loading @@ -38,10 +38,6 @@ target_sources(distributor PRIVATE ${CMAKE_CURRENT_LIST_DIR}/rpc/distributor.cpp ) if(GKFS_USE_GUIDED_DISTRIBUTION) find_package(Boost 1.53 REQUIRED) target_link_libraries(distributor PRIVATE Boost::boost) endif() if(GKFS_ENABLE_CODE_COVERAGE) target_code_coverage(distributor AUTO) Loading
src/common/rpc/distributor.cpp +40 −17 Original line number Diff line number Diff line /* Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany Copyright 2018-2022, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2022, Johannes Gutenberg Universitaet Mainz, Germany This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). Loading Loading @@ -125,7 +125,31 @@ std::vector<host_t> ForwarderDistributor::locate_directory_metadata(const std::string& path) const { return all_hosts_; } #ifdef GKFS_USE_GUIDED_DISTRIBUTION void IntervalSet::Add(chunkid_t smaller, chunkid_t bigger) { const auto next = _intervals.upper_bound(smaller); if(next != _intervals.cbegin()) { const auto prev = std::prev(next); if(next != _intervals.cend() && next->first <= bigger + 1) { bigger = next->second; _intervals.erase(next); } if(prev->second + 1 >= smaller) { smaller = prev->first; _intervals.erase(prev); } } _intervals[smaller] = bigger; } bool IntervalSet::IsInsideInterval(unsigned int v) const { const auto suspectNext = _intervals.upper_bound(v); const auto suspect = std::prev(suspectNext); return suspect->first <= v && v <= suspect->second; } bool GuidedDistributor::init_guided() { unsigned int destination_host; Loading @@ -148,16 +172,15 @@ GuidedDistributor::init_guided() { } auto I = map_interval.find(path); if(I == map_interval.end()) map_interval[path] += make_pair( boost::icl::discrete_interval<chunkid_t>::right_open( chunk_id, chunk_id + 1), destination_host + 1); else if(I->second.find(chunk_id) == I->second.end()) I->second.insert(make_pair( boost::icl::discrete_interval<chunkid_t>::right_open( chunk_id, chunk_id + 1), destination_host + 1)); if(I == map_interval.end()) { auto tmp = IntervalSet(); tmp.Add(chunk_id, chunk_id + 1); map_interval[path] = make_pair(tmp, destination_host + 1); } else if(I->second.first.IsInsideInterval(chunk_id)) { auto is = I->second.first; is.Add(chunk_id, chunk_id + 1); I->second = (make_pair(is, destination_host + 1)); } } mapfile.close(); return true; Loading Loading @@ -200,9 +223,9 @@ GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id) const { auto it = map_interval.find(path); if(it != map_interval.end()) { auto it_f = it->second.find(chnk_id); if(it_f != it->second.end()) { return (it_f->second - auto it_f = it->second.first.IsInsideInterval(chnk_id); if(it_f) { return (it->second.second - 1); // Decrement destination host from the interval_map } } Loading @@ -227,6 +250,6 @@ GuidedDistributor::locate_file_metadata(const string& path) const { GuidedDistributor::locate_directory_metadata(const string& path) const { return all_hosts_; } #endif } // namespace rpc } // namespace gkfs
tests/unit/CMakeLists.txt +3 −4 Original line number Diff line number Diff line Loading @@ -66,10 +66,9 @@ target_sources(tests ${CMAKE_CURRENT_LIST_DIR}/test_example_01.cpp ${CMAKE_CURRENT_LIST_DIR}/test_utils_arithmetic.cpp ${CMAKE_CURRENT_LIST_DIR}/test_helpers.cpp ) if(GKFS_TESTS_GUIDED_DISTRIBUTION) target_sources(tests PRIVATE ${CMAKE_CURRENT_LIST_DIR}/test_guided_distributor.cpp) endif() ${CMAKE_CURRENT_LIST_DIR}/test_guided_distributor.cpp) target_link_libraries(tests PRIVATE catch2_main Loading