From a8dc9c3cfc150f3a968b76f184056f022bf822a1 Mon Sep 17 00:00:00 2001 From: Ramon Date: Thu, 1 Oct 2020 13:50:21 +0200 Subject: [PATCH 1/3] Moved distributor instantation to RPC_DATA and daemon.cpp on the server side. Added interval_map to GuidedDistributor for better performance Moving reads extended log to normal log module. Update logging.hpp Added GKFS prefix to CMake options Catch2 Guided Distributor test Added creation of /tmp/guided.txt in the test updated README.MD and protected Distributor for duplicated inserts Added metadata + data optimization with # --- CHANGELOG.md | 5 + CMakeLists.txt | 8 ++ README.md | 78 ++++++++++---- examples/distributors/guided/generate.py | 40 ++++++++ include/client/logging.hpp | 15 ++- include/client/preload_context.hpp | 5 + include/common/cmake_configure.hpp.in | 5 +- include/common/rpc/distributor.hpp | 60 ++++++++++- include/daemon/classes/rpc_data.hpp | 17 +++- include/daemon/daemon.hpp | 1 + src/client/logging.cpp | 4 + src/client/preload.cpp | 10 +- src/client/preload_context.cpp | 17 +++- src/client/preload_util.cpp | 4 +- src/client/rpc/forward_data.cpp | 14 ++- src/common/CMakeLists.txt | 1 + src/common/rpc/distributor.cpp | 124 ++++++++++++++++++++++- src/daemon/classes/rpc_data.cpp | 19 +++- src/daemon/daemon.cpp | 18 ++++ src/daemon/handler/srv_data.cpp | 12 +-- tests/unit/CMakeLists.txt | 10 +- tests/unit/test_guided_distributor.cpp | 61 +++++++++++ 22 files changed, 477 insertions(+), 51 deletions(-) create mode 100644 examples/distributors/guided/generate.py create mode 100644 tests/unit/test_guided_distributor.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index d08f0d459..14267ee3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + - Added dirents_extended function and a find substitution proposal. + +- Created a Guided Distributor using a mapping file to map chunks to specific + nodes. + ## [0.8.0] - 2020-09-15 ## New - Both client library and daemon have been extended to support the ofi+verbs diff --git a/CMakeLists.txt b/CMakeLists.txt index 7fe5aa9ec..b07ed80cd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -180,6 +180,14 @@ mark_as_advanced(CLIENT_LOG_MESSAGE_SIZE) configure_file(include/common/cmake_configure.hpp.in include/common/cmake_configure.hpp) +option(GKFS_USE_GUIDED_DISTRIBUTION "Use guided data distributor " OFF) +message(STATUS "[gekkofs] Guided data distributor: ${GKFS_USE_GUIDED_DISTRIBUTION}") + +set(GKFS_USE_GUIDED_DISTRIBUTION_PATH "/tmp/guided.txt" CACHE STRING "File Path for guided distributor") +set_property(CACHE GKFS_USE_GUIDED_DISTRIBUTION_PATH PROPERTY STRINGS) +message(STATUS "[gekkofs] Guided data distributor input file path: ${GKFS_USE_GUIDED_DISTRIBUTION_PATH}") + + # Imported target add_library(RocksDB INTERFACE IMPORTED GLOBAL) target_link_libraries(RocksDB diff --git a/README.md b/README.md index 027814bdd..dd1ab72bd 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ GekkoFS is a file system capable of aggregating the local I/O capacity and performance of each compute node in a HPC cluster to produce a high-performance storage space that can be accessed in a distributed manner. -This storage space allows HPC applications and simulations to run in isolation from each other with regards +This storage space allows HPC applications and simulations to run in isolation from each other with regards to I/O, which reduces interferences and improves performance. # Dependencies @@ -30,7 +30,7 @@ to I/O, which reduces interferences and improves performance. - snappy: `sudo yum install snappy snappy-devel` - zlib: `sudo yum install zlib zlib-devel` - bzip2: `sudo yum install bzip2 bzip2-devel` -- zstandard: +- zstandard: ```bash wget https://github.com/facebook/zstd/archive/v1.1.3.tar.gz mv v1.1.3.tar.gz zstd-1.1.3.tar.gz @@ -162,28 +162,28 @@ Further options: ```bash Allowed options: -h [ --help ] Help message - -m [ --mountdir ] arg Virtual mounting directory where GekkoFS is + -m [ --mountdir ] arg Virtual mounting directory where GekkoFS is available. - -r [ --rootdir ] arg Local data directory where GekkoFS data for this + -r [ --rootdir ] arg Local data directory where GekkoFS data for this daemon is stored. -i [ --metadir ] arg Metadata directory where GekkoFS RocksDB data directory is located. If not set, rootdir is used. - -l [ --listen ] arg Address or interface to bind the daemon to. + -l [ --listen ] arg Address or interface to bind the daemon to. Default: local hostname. - When used with ofi+verbs the FI_VERBS_IFACE - environment variable is set accordingly which - associates the verbs device with the network - interface. In case FI_VERBS_IFACE is already + When used with ofi+verbs the FI_VERBS_IFACE + environment variable is set accordingly which + associates the verbs device with the network + interface. In case FI_VERBS_IFACE is already defined, the argument is ignored. Default 'ib'. - -H [ --hosts-file ] arg Shared file used by deamons to register their + -H [ --hosts-file ] arg Shared file used by deamons to register their endpoints. (default './gkfs_hosts.txt') -P [ --rpc-protocol ] arg Used RPC protocol for inter-node communication. - Available: {ofi+sockets, ofi+verbs, ofi+psm2} for - TCP, Infiniband, and Omni-Path, respectively. + Available: {ofi+sockets, ofi+verbs, ofi+psm2} for + TCP, Infiniband, and Omni-Path, respectively. (Default ofi+sockets) Libfabric must have enabled support verbs or psm2. - --auto-sm Enables intra-node communication (IPCs) via the - `na+sm` (shared memory) protocol, instead of using + --auto-sm Enables intra-node communication (IPCs) via the + `na+sm` (shared memory) protocol, instead of using the RPC protocol. (Default off) --version Print version and exit. ``` @@ -194,15 +194,15 @@ Shut it down by gracefully killing the process (SIGTERM). Metadata and actual data will be stored at the ``. The path where the application works on is set with ``. - + Run the application with the preload library: `LD_PRELOAD=/build/lib/libgkfs_intercept.so ./application`. In the case of an MPI application use the `{mpirun, mpiexec} -x` argument. - + ### Logging The following environment variables can be used to enable logging in the client library: `LIBGKFS_LOG=` and `LIBGKFS_LOG_OUTPUT=` to configure the output module and set the path to the log file of the client -library. If not path is specified in `LIBGKFS_LOG_OUTPUT`, the client library +library. If not path is specified in `LIBGKFS_LOG_OUTPUT`, the client library will send log messages to `/tmp/gkfs_client.log`. The following modules are available: @@ -228,6 +228,7 @@ The following modules are available: module will only be available if the client library is built in `Debug` mode. - `all`: All previous options combined. + - `trace_reads`: Generate log line with extra information in read operations for guided distributor - `help`: Print a help message and exit. When tracing sytem calls, specific syscalls can be removed from log messages by @@ -236,20 +237,57 @@ setting it to `LIBGKFS_LOG_SYSCALL_FILTER=epoll_wait,epoll_create` will filter out any log entries from the `epoll_wait()` and `epoll_create()` system calls. Additionally, setting the `LIBGKFS_LOG_OUTPUT_TRUNC` environment variable with -a value different from `0` will instruct the logging subsystem to truncate +a value different from `0` will instruct the logging subsystem to truncate the file used for logging, rather than append to it. -For the daemon, the `GKFS_DAEMON_LOG_PATH=` environment variable -can be provided to set the path to the log file, and the log module can be +For the daemon, the `GKFS_DAEMON_LOG_PATH=` environment variable +can be provided to set the path to the log file, and the log module can be selected with the `GKFS_LOG_LEVEL={off,critical,err,warn,info,debug,trace}` environment variable. + ### External functions GekkoFS allows to use external functions on your client code, via LD_PRELOAD. Source code needs to be compiled with -fPIC. We include a pfind io500 substitution, `examples/gfind/gfind.cpp` and a non-mpi version `examples/gfind/sfind.cpp` +### Data distributors +The data distribution can be selected at compilation time, we have 2 distributors available: + +## Simple Hash (Default) +Chunks are distributed randomly to the different GekkoFS servers. + +## Guided Distributor +Guided distributor distributes chunks using a shared file with the next format: +` ` + +Moreover if you prepend a path with #, all the data from that path will go to the same place as the metadata. +Specifically defined paths (without#) will be prioritary. + +i.e., +#/mdt-hard 0 0 + +GekkoFS will store data and metadata to the same server. The server will still be random (0 0 has no meaning, yet). + +Chunks not specified, are distributed using the Simple Hash distributor. + +To generate such file we need to follow a first execution, using the trace_reads log option + +This will enable a `TRACE_READS` level log at the clients offering several lines that can be used to generate the input file. +In this stage, each node should generate a separated file this can be done in SLURM using the next line : +`srun -N 10 -n 320 --export="ALL" /bin/bash -c "export LIBGKFS_LOG_OUTPUT=${HOME}/test/GLOBAL.txt;LD_PRELOAD=${GKFS_PRLD} "` + +Then, use the `examples/distributors/guided/generate.py` to create the output file. +* `python examples/distributors/guided/generate.py ~/test/GLOBAL.txt >> guided.txt` + +This should work if the nodes are sorted in alphabetical order, which is the usual scenario. Users should take care of multi-server configurations. + +``` +Finally, enable the distributor using the next compilation flags: +* `GKFS_USE_GUIDED_DISTRIBUTION` ON +* `GKFS_USE_GUIDED_DISTRIBUTION_PATH` `` + ### Acknowledgment This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). diff --git a/examples/distributors/guided/generate.py b/examples/distributors/guided/generate.py new file mode 100644 index 000000000..e4e21d963 --- /dev/null +++ b/examples/distributors/guided/generate.py @@ -0,0 +1,40 @@ +### +# Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain +# Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + +# This software was partially supported by the +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + +# This software was partially supported by the +# ADA-FS project under the SPPEXA project funded by the DFG. + +# SPDX-License-Identifier: MIT +### +#!/usr/bin/python3 +import re +import sys +import collections + +file = sys.argv[1] + +pattern = re.compile(r".+(read\ )(.*)( host: )(\d+).+(path: )(.+),.+(chunk_start: )(\d+).+(chunk_end: )(\d+)") + +d = collections.OrderedDict() + +with open(file) as f: + for line in f: + result = pattern.match(line) + if result: + d[result.group(2)] = 1 +keys = sorted(d.keys()) +i = 0 +for key in keys: + d[key] = i + i = i + 1 + +with open(file) as f: + for line in f: + result = pattern.match(line) + if result: + for i in range(int(result.group(8)), int(result.group(10))+1): + print (result.group(6), i, d[result.group(2)]) diff --git a/include/client/logging.hpp b/include/client/logging.hpp index e793106c0..e87aa0c05 100644 --- a/include/client/logging.hpp +++ b/include/client/logging.hpp @@ -47,7 +47,7 @@ namespace gkfs::log { -enum class log_level : short { +enum class log_level : unsigned int { print_syscalls = 1 << 0, print_syscalls_entry = 1 << 1, print_info = 1 << 2, @@ -57,6 +57,7 @@ enum class log_level : short { print_hermes = 1 << 6, print_mercury = 1 << 7, print_debug = 1 << 8, + print_trace_reads = 1 << 9, // for internal use print_none = 0, @@ -117,6 +118,7 @@ static const auto constexpr warning = log_level::print_warnings; static const auto constexpr hermes = log_level::print_hermes; static const auto constexpr mercury = log_level::print_mercury; static const auto constexpr debug = log_level::print_debug; +static const auto constexpr trace_reads = log_level::print_trace_reads; static const auto constexpr none = log_level::print_none; static const auto constexpr most = log_level::print_most; static const auto constexpr all = log_level::print_all; @@ -125,7 +127,8 @@ static const auto constexpr help = log_level::print_help; static const auto constexpr level_names = utils::make_array( "syscall", "syscall", // sycall_entry uses the same name as syscall - "info", "critical", "error", "warning", "hermes", "mercury", "debug"); + "info", "critical", "error", "warning", "hermes", "mercury", "debug", + "trace_reads"); inline constexpr auto lookup_level_name(log_level l) { @@ -533,6 +536,14 @@ static_buffer::grow(std::size_t size) { } \ } while(0); +#define LOG_TRACE_READS(...) \ + do { \ + if(gkfs::log::get_global_logger()) { \ + gkfs::log::get_global_logger()->log( \ + gkfs::log::trace_reads, __func__, __LINE__, __VA_ARGS__); \ + } \ + } while(0); + #ifdef GKFS_DEBUG_BUILD #define LOG_SYSCALL(...) \ diff --git a/include/client/preload_context.hpp b/include/client/preload_context.hpp index 83a44ad7d..183f7960b 100644 --- a/include/client/preload_context.hpp +++ b/include/client/preload_context.hpp @@ -36,6 +36,7 @@ #include #include #include +#include #include @@ -102,6 +103,7 @@ private: mutable std::mutex internal_fds_mutex_; bool internal_fds_must_relocate_; std::bitset protected_fds_; + std::string hostname; public: static PreloadContext* @@ -210,6 +212,9 @@ public: void unprotect_user_fds(); + + std::string + get_hostname(); }; } // namespace preload diff --git a/include/common/cmake_configure.hpp.in b/include/common/cmake_configure.hpp.in index 811f643be..8b3ee852b 100644 --- a/include/common/cmake_configure.hpp.in +++ b/include/common/cmake_configure.hpp.in @@ -16,5 +16,8 @@ #cmakedefine01 CREATE_CHECK_PARENTS #cmakedefine01 LOG_SYSCALLS -#endif // FS_CMAKE_CONFIGURE_H +#cmakedefine GKFS_USE_GUIDED_DISTRIBUTION +#define GKFS_USE_GUIDED_DISTRIBUTION_PATH "@GKFS_USE_GUIDED_DISTRIBUTION_PATH@" + +#endif //FS_CMAKE_CONFIGURE_H // clang-format on diff --git a/include/common/rpc/distributor.hpp b/include/common/rpc/distributor.hpp index 4d76eda12..7c2b84ee6 100644 --- a/include/common/rpc/distributor.hpp +++ b/include/common/rpc/distributor.hpp @@ -29,11 +29,17 @@ #ifndef GEKKOFS_RPC_DISTRIBUTOR_HPP #define GEKKOFS_RPC_DISTRIBUTOR_HPP +#include "../include/config.hpp" #include #include #include +#include +#include +#include -namespace gkfs::rpc { +namespace gkfs { + +namespace rpc { using chunkid_t = unsigned int; using host_t = unsigned int; @@ -45,6 +51,11 @@ public: virtual host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const = 0; + // TODO: We need to pass hosts_size in the server side, because the number + // of servers are not defined (in startup) + virtual host_t + locate_data(const std::string& path, const chunkid_t& chnk_id, + unsigned int hosts_size) = 0; virtual host_t locate_file_metadata(const std::string& path) const = 0; @@ -57,11 +68,13 @@ public: class SimpleHashDistributor : public Distributor { private: host_t localhost_; - unsigned int hosts_size_; + unsigned int hosts_size_{0}; std::vector all_hosts_; std::hash str_hash; public: + SimpleHashDistributor(); + SimpleHashDistributor(host_t localhost, unsigned int hosts_size); host_t @@ -71,6 +84,10 @@ public: locate_data(const std::string& path, const chunkid_t& chnk_id) const override; + host_t + locate_data(const std::string& path, const chunkid_t& chnk_id, + unsigned int host_size); + host_t locate_file_metadata(const std::string& path) const override; @@ -123,6 +140,43 @@ public: locate_directory_metadata(const std::string& path) const override; }; -} // namespace gkfs::rpc +class GuidedDistributor : public Distributor { +private: + host_t localhost_; + unsigned int hosts_size_{0}; + std::vector all_hosts_; + std::hash str_hash; + std::unordered_map> + map_interval; + std::vector prefix_list; // Should not be very long + bool + init_guided(); + +public: + GuidedDistributor(); + + GuidedDistributor(host_t localhost, unsigned int hosts_size); + + host_t + localhost() const override; + + host_t + locate_data(const std::string& path, + const chunkid_t& chnk_id) const override; + + host_t + locate_data(const std::string& path, const chunkid_t& chnk_id, + unsigned int host_size); + + host_t + locate_file_metadata(const std::string& path) const override; + + std::vector + locate_directory_metadata(const std::string& path) const override; +}; + +} // namespace rpc +} // namespace gkfs #endif // GEKKOFS_RPC_LOCATOR_HPP diff --git a/include/daemon/classes/rpc_data.hpp b/include/daemon/classes/rpc_data.hpp index 6a8125eee..35f0458d1 100644 --- a/include/daemon/classes/rpc_data.hpp +++ b/include/daemon/classes/rpc_data.hpp @@ -32,6 +32,13 @@ #include namespace gkfs { + +/* Forward declarations */ +namespace rpc { +class Distributor; +} + + namespace daemon { class RPCData { @@ -47,6 +54,8 @@ private: ABT_pool io_pool_; std::vector io_streams_; std::string self_addr_str_; + // Distributor + std::shared_ptr distributor_; public: static RPCData* @@ -84,7 +93,13 @@ public: self_addr_str() const; void - self_addr_str(const std::string& addr_str); + self_addr_str(const std::string& addr_stra); + + const std::shared_ptr& + distributor() const; + + void + distributor(const std::shared_ptr& distributor); }; } // namespace daemon diff --git a/include/daemon/daemon.hpp b/include/daemon/daemon.hpp index 612712b63..30008efbb 100644 --- a/include/daemon/daemon.hpp +++ b/include/daemon/daemon.hpp @@ -44,6 +44,7 @@ extern "C" { #include #include +#include #define GKFS_DATA \ (static_cast(gkfs::daemon::FsData::getInstance())) diff --git a/src/client/logging.cpp b/src/client/logging.cpp index c21da6d9b..0b45cb458 100644 --- a/src/client/logging.cpp +++ b/src/client/logging.cpp @@ -105,6 +105,10 @@ static const auto constexpr debug_opts = utils::make_array( "[ default: on ]"}, log::mercury}, + opt_info{STR_AND_LEN("trace_reads"), + {"Print extended read information", "[ default: off ]"}, + log::trace_reads}, + #ifdef GKFS_DEBUG_BUILD opt_info{STR_AND_LEN("debug"), diff --git a/src/client/preload.cpp b/src/client/preload.cpp index e5b0eff9d..47a354551 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -149,10 +149,16 @@ init_ld_environment_() { CTX->fwd_host_id(), CTX->hosts().size()); CTX->distributor(forwarder_dist); #else - auto simple_hash_dist = std::make_shared( +#ifdef GKFS_USE_GUIDED_DISTRIBUTION + auto distributor = std::make_shared( + CTX->local_host_id(), CTX->hosts().size()); +#else + auto distributor = std::make_shared( CTX->local_host_id(), CTX->hosts().size()); - CTX->distributor(simple_hash_dist); #endif + CTX->distributor(distributor); +#endif + LOG(INFO, "Retrieving file system configuration..."); diff --git a/src/client/preload_context.cpp b/src/client/preload_context.cpp index 8762e7aeb..5236a1c4c 100644 --- a/src/client/preload_context.cpp +++ b/src/client/preload_context.cpp @@ -47,7 +47,9 @@ extern "C" { #include } -namespace gkfs::preload { +namespace gkfs { + +namespace preload { decltype(PreloadContext::MIN_INTERNAL_FD) constexpr PreloadContext:: MIN_INTERNAL_FD; @@ -59,6 +61,10 @@ PreloadContext::PreloadContext() internal_fds_.set(); internal_fds_must_relocate_ = true; + + char host[255]; + gethostname(host, 255); + hostname = host; } void @@ -426,4 +432,11 @@ PreloadContext::unprotect_user_fds() { internal_fds_must_relocate_ = true; } -} // namespace gkfs::preload \ No newline at end of file + +std::string +PreloadContext::get_hostname() { + return hostname; +} + +} // namespace preload +} // namespace gkfs diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 5ca518038..b4e4206b2 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -362,7 +362,7 @@ read_hosts_file() { } LOG(INFO, "Hosts pool size: {}", hosts.size()); - + sort(hosts.begin(), hosts.end()); // Sort hosts by alphanumerical value. return hosts; } @@ -416,4 +416,4 @@ connect_to_hosts(const vector>& hosts) { CTX->hosts(addrs); } -} // namespace gkfs::utils \ No newline at end of file +} // namespace gkfs::utils diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index 151c4f70b..45e4f368e 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -262,7 +262,6 @@ forward_read(const string& path, void* buf, const off64_t offset, // contains the recipient ids, used to access the target_chnks map. // First idx is chunk with potential offset std::vector targets{}; - // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; @@ -361,8 +360,15 @@ forward_read(const string& path, void* buf, const off64_t offset, handles.emplace_back( ld_network_service->post(endp, in)); - LOG(DEBUG, "host: {}, path: {}, chunks: {}, size: {}, offset: {}", - target, path, in.chunk_n(), total_chunk_size, in.offset()); + LOG(DEBUG, + "host: {}, path: {}, chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}", + target, path, chnk_start, chnk_end, in.chunk_n(), + total_chunk_size, in.offset()); + + LOG(TRACE_READS, + "read {} host: {}, path: {}, chunk_start: {}, chunk_end: {}", + CTX->get_hostname(), target, path, chnk_start, chnk_end); + } catch(const std::exception& ex) { LOG(ERROR, @@ -559,4 +565,4 @@ forward_get_chunk_stat() { return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free}); } -} // namespace gkfs::rpc \ No newline at end of file +} // namespace gkfs::rpc diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 4c6698ed6..f85e8e3e5 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -29,6 +29,7 @@ add_subdirectory(arithmetic) add_library(distributor STATIC) +include_directories(${Boost_INCLUDE_DIRS}) set_property(TARGET distributor PROPERTY POSITION_INDEPENDENT_CODE ON) target_sources(distributor PUBLIC diff --git a/src/common/rpc/distributor.cpp b/src/common/rpc/distributor.cpp index 3b1c4011f..dbbe5efbd 100644 --- a/src/common/rpc/distributor.cpp +++ b/src/common/rpc/distributor.cpp @@ -30,7 +30,9 @@ using namespace std; -namespace gkfs::rpc { +namespace gkfs { + +namespace rpc { SimpleHashDistributor::SimpleHashDistributor(host_t localhost, unsigned int hosts_size) @@ -38,6 +40,8 @@ SimpleHashDistributor::SimpleHashDistributor(host_t localhost, ::iota(all_hosts_.begin(), all_hosts_.end(), 0); } +SimpleHashDistributor::SimpleHashDistributor() {} + host_t SimpleHashDistributor::localhost() const { return localhost_; @@ -49,6 +53,18 @@ SimpleHashDistributor::locate_data(const string& path, return str_hash(path + ::to_string(chnk_id)) % hosts_size_; } +host_t +SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id, + unsigned int hosts_size) { + if(hosts_size_ != hosts_size) { + hosts_size_ = hosts_size; + all_hosts_ = std::vector(hosts_size); + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + } + + return str_hash(path + ::to_string(chnk_id)) % hosts_size_; +} + host_t SimpleHashDistributor::locate_file_metadata(const string& path) const { return str_hash(path) % hosts_size_; @@ -109,4 +125,108 @@ std::vector ForwarderDistributor::locate_directory_metadata(const std::string& path) const { return all_hosts_; } -} // namespace gkfs::rpc + +bool +GuidedDistributor::init_guided() { + unsigned int destination_host; + chunkid_t chunk_id; + string path; + std::ifstream mapfile; + mapfile.open(GKFS_USE_GUIDED_DISTRIBUTION_PATH); + if((mapfile.rdstate() & std::ifstream::failbit) != 0) + return false; // If it fails, the mapping will be as the SimpleHash + + while(mapfile >> path >> chunk_id >> destination_host) { + // We need destination+1, as 0 has an special meaning in the interval + // map. + if(path.size() > 0 and path[0] == '#') { + // Path that has this prefix will have metadata and data in the same + // place i.e. #/mdtest-hard/ 10 10 chunk_id and destination_host + // are not used + prefix_list.emplace_back(path.substr(1, path.size())); + continue; + } + + auto I = map_interval.find(path); + if(I == map_interval.end()) + map_interval[path] += make_pair( + boost::icl::discrete_interval::right_open( + chunk_id, chunk_id + 1), + destination_host + 1); + else if(I->second.find(chunk_id) == I->second.end()) + I->second.insert(make_pair( + boost::icl::discrete_interval::right_open( + chunk_id, chunk_id + 1), + destination_host + 1)); + } + mapfile.close(); + return true; +} + +GuidedDistributor::GuidedDistributor() { + init_guided(); +} + +GuidedDistributor::GuidedDistributor(host_t localhost, + unsigned int hosts_size) { + if(hosts_size_ != hosts_size) { + hosts_size_ = hosts_size; + localhost_ = localhost; + all_hosts_ = std::vector(hosts_size); + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + } + init_guided(); +} + +host_t +GuidedDistributor::localhost() const { + return localhost_; +} + +host_t +GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id, + unsigned int hosts_size) { + if(hosts_size_ != hosts_size) { + hosts_size_ = hosts_size; + all_hosts_ = std::vector(hosts_size); + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + } + + return (locate_data(path, chnk_id)); +} + +host_t +GuidedDistributor::locate_data(const string& path, + const chunkid_t& chnk_id) const { + auto it = map_interval.find(path); + if(it != map_interval.end()) { + auto it_f = it->second.find(chnk_id); + if(it_f != it->second.end()) { + return (it_f->second - + 1); // Decrement destination host from the interval_map + } + } + + for(auto const& it : prefix_list) { + if(0 == path.compare(0, min(it.length(), path.length()), it, 0, + min(it.length(), path.length()))) + ; + return str_hash(path) % hosts_size_; + } + + auto locate = path + ::to_string(chnk_id); + return str_hash(locate) % hosts_size_; +} + +host_t +GuidedDistributor::locate_file_metadata(const string& path) const { + return str_hash(path) % hosts_size_; +} + +::vector +GuidedDistributor::locate_directory_metadata(const string& path) const { + return all_hosts_; +} + +} // namespace rpc +} // namespace gkfs diff --git a/src/daemon/classes/rpc_data.cpp b/src/daemon/classes/rpc_data.cpp index 182c19617..ae970ee11 100644 --- a/src/daemon/classes/rpc_data.cpp +++ b/src/daemon/classes/rpc_data.cpp @@ -30,7 +30,9 @@ using namespace std; -namespace gkfs::daemon { +namespace gkfs { + +namespace daemon { // Getter/Setter @@ -74,4 +76,17 @@ RPCData::self_addr_str(const std::string& addr_str) { self_addr_str_ = addr_str; } -} // namespace gkfs::daemon \ No newline at end of file +const std::shared_ptr& +RPCData::distributor() const { + return distributor_; +} + +void +RPCData::distributor( + const std::shared_ptr& distributor) { + distributor_ = distributor; +} + + +} // namespace daemon +} // namespace gkfs diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 452f76ef3..319c6547d 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -32,6 +32,7 @@ #include #include #include + #include #include #include @@ -57,6 +58,7 @@ extern "C" { } using namespace std; + namespace po = boost::program_options; namespace fs = std::filesystem; @@ -203,6 +205,21 @@ init_environment() { throw; } + GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ", __func__); + try { +#ifdef GKFS_USE_GUIDED_DISTRIBUTION + auto distributor = std::make_shared(); +#else + auto distributor = std::make_shared(); +#endif + RPC_DATA->distributor(distributor); + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to initialize Distributor: {}", __func__, + e.what()); + throw; + } + #ifdef GKFS_ENABLE_FORWARDING GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); #endif @@ -220,6 +237,7 @@ init_environment() { throw; } #endif + // Initialize data backend std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 697ca86e4..59abace2e 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -45,7 +45,6 @@ #endif using namespace std; - /* * This file contains all Margo RPC handlers that are concerning management * operations @@ -152,8 +151,8 @@ rpc_srv_write(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -196,7 +195,8 @@ rpc_srv_write(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING - if(distributor.locate_data(in.path, chnk_id_file) != host_id) { + if(RPC_DATA->distributor()->locate_data(in.path, chnk_id_file, + host_size) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); @@ -415,9 +415,8 @@ rpc_srv_read(hg_handle_t handle) { #ifndef GKFS_ENABLE_FORWARDING auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); #endif - + auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -448,7 +447,8 @@ rpc_srv_read(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING - if(distributor.locate_data(in.path, chnk_id_file) != host_id) { + if(RPC_DATA->distributor()->locate_data(in.path, chnk_id_file, + host_size) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 0c9de75b9..4c3697a75 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -47,7 +47,7 @@ if(NOT catch2_POPULATED) add_subdirectory(${catch2_SOURCE_DIR} ${catch2_BINARY_DIR}) endif() -# create a convenience library with Catch2's main +# create a convenience library with Catch2's main # to speed up test compilation add_library(catch2_main STATIC catch_main.cpp @@ -57,18 +57,20 @@ target_link_libraries(catch2_main Catch2::Catch2 ) -# define executables for tests and make them depend on the convenience +# define executables for tests and make them depend on the convenience # library (and Catch2 transitively) and fmt add_executable(tests test_example_00.cpp test_example_01.cpp - test_utils_arithmetic.cpp + test_utils_arithmetic.cpp + test_guided_distributor.cpp ) target_link_libraries(tests catch2_main fmt::fmt - arithmetic + arithmetic + distributor ) # Catch2's contrib folder includes some helper functions diff --git a/tests/unit/test_guided_distributor.cpp b/tests/unit/test_guided_distributor.cpp new file mode 100644 index 000000000..732ff8ead --- /dev/null +++ b/tests/unit/test_guided_distributor.cpp @@ -0,0 +1,61 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#include +#include +#include +#include + +TEST_CASE( "Guided distributor Testing", "[Distributor]" ) { + + GIVEN( "A distributor" ) { + // Generate a guided.txt that will put some files in + // the server num 4 + std::ofstream o; + o.open("/tmp/guided.txt"); + o << "/t.c01 0 3" << std::endl; + o << "/t.c02 0 3" << std::endl; + o << "/t.c01 1 3" << std::endl; + o << "/t.c02 1 3" << std::endl; + o << "/t.c01 2 3" << std::endl; + o << "/t.c02 2 3" << std::endl; + o << "/t.c03 1 3" << std::endl; + o << "/t.c04 1 3" << std::endl; + o << "/t.c05 1 3" << std::endl; + o << "/t.c06 1 3" << std::endl; + o << "/t.c07 1 3" << std::endl; + o.close(); + + // The distributor should return 3 for all the tested files + auto d = gkfs::rpc::GuidedDistributor(); + + REQUIRE( d.locate_data("/t.c01",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c02",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c03",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c04",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c05",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c06",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c07",1,10) == 3 ); + + // Next result is random, but with the same seed is consistent + // We ask for chunk 5 that is distributed randomly between the + // 10 servers. + REQUIRE ( (d.locate_data("/t.c01",5,10) + + d.locate_data("/t.c02",5,10) + + d.locate_data("/t.c03",5,10) + + d.locate_data("/t.c04",5,10) + + d.locate_data("/t.c05",5,10) + + d.locate_data("/t.c06",5,10) + + d.locate_data("/t.c07",5,10) ) == 42); + } +} -- GitLab From 73bde2f6ddbd8379e42f8ed0de7bbe314134cd52 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 14 Jun 2021 10:01:00 +0200 Subject: [PATCH 2/3] README update --- README.md | 1 - include/daemon/daemon.hpp | 2 +- tests/unit/test_guided_distributor.cpp | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index dd1ab72bd..fafb2924e 100644 --- a/README.md +++ b/README.md @@ -283,7 +283,6 @@ Then, use the `examples/distributors/guided/generate.py` to create the output fi This should work if the nodes are sorted in alphabetical order, which is the usual scenario. Users should take care of multi-server configurations. -``` Finally, enable the distributor using the next compilation flags: * `GKFS_USE_GUIDED_DISTRIBUTION` ON * `GKFS_USE_GUIDED_DISTRIBUTION_PATH` `` diff --git a/include/daemon/daemon.hpp b/include/daemon/daemon.hpp index 30008efbb..bfeea7976 100644 --- a/include/daemon/daemon.hpp +++ b/include/daemon/daemon.hpp @@ -44,7 +44,7 @@ extern "C" { #include #include -#include +#include #define GKFS_DATA \ (static_cast(gkfs::daemon::FsData::getInstance())) diff --git a/tests/unit/test_guided_distributor.cpp b/tests/unit/test_guided_distributor.cpp index 732ff8ead..eb36647af 100644 --- a/tests/unit/test_guided_distributor.cpp +++ b/tests/unit/test_guided_distributor.cpp @@ -13,7 +13,7 @@ #include #include -#include +#include #include TEST_CASE( "Guided distributor Testing", "[Distributor]" ) { -- GitLab From 320169676ca60726ee6b04d990f15191e75c17d1 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 17 Jun 2021 10:21:40 +0000 Subject: [PATCH 3/3] Update CMakeLists.txt --- CMakeLists.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b07ed80cd..805e2fb8f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -178,8 +178,6 @@ add_definitions(-DLIBGKFS_LOG_MESSAGE_SIZE=${CLIENT_LOG_MESSAGE_SIZE}) message(STATUS "[gekkofs] Maximum log message size in the client library: ${CLIENT_LOG_MESSAGE_SIZE}") mark_as_advanced(CLIENT_LOG_MESSAGE_SIZE) -configure_file(include/common/cmake_configure.hpp.in include/common/cmake_configure.hpp) - option(GKFS_USE_GUIDED_DISTRIBUTION "Use guided data distributor " OFF) message(STATUS "[gekkofs] Guided data distributor: ${GKFS_USE_GUIDED_DISTRIBUTION}") @@ -187,6 +185,7 @@ set(GKFS_USE_GUIDED_DISTRIBUTION_PATH "/tmp/guided.txt" CACHE STRING "File Path set_property(CACHE GKFS_USE_GUIDED_DISTRIBUTION_PATH PROPERTY STRINGS) message(STATUS "[gekkofs] Guided data distributor input file path: ${GKFS_USE_GUIDED_DISTRIBUTION_PATH}") +configure_file(include/common/cmake_configure.hpp.in include/common/cmake_configure.hpp) # Imported target add_library(RocksDB INTERFACE IMPORTED GLOBAL) -- GitLab