diff --git a/CHANGELOG.md b/CHANGELOG.md index d08f0d4597884dde1ca6b0d34093f77f55f91a2e..14267ee3c36bcebebe6b54cc38a1faf5377eaa89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + - Added dirents_extended function and a find substitution proposal. + +- Created a Guided Distributor using a mapping file to map chunks to specific + nodes. + ## [0.8.0] - 2020-09-15 ## New - Both client library and daemon have been extended to support the ofi+verbs diff --git a/CMakeLists.txt b/CMakeLists.txt index 7fe5aa9ec3b38c91ec44fd136f3f4343fbe6a333..805e2fb8fc91ae15fde6f8a7993ba4119cc0f162 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -178,6 +178,13 @@ add_definitions(-DLIBGKFS_LOG_MESSAGE_SIZE=${CLIENT_LOG_MESSAGE_SIZE}) message(STATUS "[gekkofs] Maximum log message size in the client library: ${CLIENT_LOG_MESSAGE_SIZE}") mark_as_advanced(CLIENT_LOG_MESSAGE_SIZE) +option(GKFS_USE_GUIDED_DISTRIBUTION "Use guided data distributor " OFF) +message(STATUS "[gekkofs] Guided data distributor: ${GKFS_USE_GUIDED_DISTRIBUTION}") + +set(GKFS_USE_GUIDED_DISTRIBUTION_PATH "/tmp/guided.txt" CACHE STRING "File Path for guided distributor") +set_property(CACHE GKFS_USE_GUIDED_DISTRIBUTION_PATH PROPERTY STRINGS) +message(STATUS "[gekkofs] Guided data distributor input file path: ${GKFS_USE_GUIDED_DISTRIBUTION_PATH}") + configure_file(include/common/cmake_configure.hpp.in include/common/cmake_configure.hpp) # Imported target diff --git a/README.md b/README.md index 027814bdd7b8e0ac45594d8f0c028f3983b24bbd..fafb2924e2acf32be9953c9c6a59f6a4c4228aed 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ GekkoFS is a file system capable of aggregating the local I/O capacity and performance of each compute node in a HPC cluster to produce a high-performance storage space that can be accessed in a distributed manner. -This storage space allows HPC applications and simulations to run in isolation from each other with regards +This storage space allows HPC applications and simulations to run in isolation from each other with regards to I/O, which reduces interferences and improves performance. # Dependencies @@ -30,7 +30,7 @@ to I/O, which reduces interferences and improves performance. - snappy: `sudo yum install snappy snappy-devel` - zlib: `sudo yum install zlib zlib-devel` - bzip2: `sudo yum install bzip2 bzip2-devel` -- zstandard: +- zstandard: ```bash wget https://github.com/facebook/zstd/archive/v1.1.3.tar.gz mv v1.1.3.tar.gz zstd-1.1.3.tar.gz @@ -162,28 +162,28 @@ Further options: ```bash Allowed options: -h [ --help ] Help message - -m [ --mountdir ] arg Virtual mounting directory where GekkoFS is + -m [ --mountdir ] arg Virtual mounting directory where GekkoFS is available. - -r [ --rootdir ] arg Local data directory where GekkoFS data for this + -r [ --rootdir ] arg Local data directory where GekkoFS data for this daemon is stored. -i [ --metadir ] arg Metadata directory where GekkoFS RocksDB data directory is located. If not set, rootdir is used. - -l [ --listen ] arg Address or interface to bind the daemon to. + -l [ --listen ] arg Address or interface to bind the daemon to. Default: local hostname. - When used with ofi+verbs the FI_VERBS_IFACE - environment variable is set accordingly which - associates the verbs device with the network - interface. In case FI_VERBS_IFACE is already + When used with ofi+verbs the FI_VERBS_IFACE + environment variable is set accordingly which + associates the verbs device with the network + interface. In case FI_VERBS_IFACE is already defined, the argument is ignored. Default 'ib'. - -H [ --hosts-file ] arg Shared file used by deamons to register their + -H [ --hosts-file ] arg Shared file used by deamons to register their endpoints. (default './gkfs_hosts.txt') -P [ --rpc-protocol ] arg Used RPC protocol for inter-node communication. - Available: {ofi+sockets, ofi+verbs, ofi+psm2} for - TCP, Infiniband, and Omni-Path, respectively. + Available: {ofi+sockets, ofi+verbs, ofi+psm2} for + TCP, Infiniband, and Omni-Path, respectively. (Default ofi+sockets) Libfabric must have enabled support verbs or psm2. - --auto-sm Enables intra-node communication (IPCs) via the - `na+sm` (shared memory) protocol, instead of using + --auto-sm Enables intra-node communication (IPCs) via the + `na+sm` (shared memory) protocol, instead of using the RPC protocol. (Default off) --version Print version and exit. ``` @@ -194,15 +194,15 @@ Shut it down by gracefully killing the process (SIGTERM). Metadata and actual data will be stored at the ``. The path where the application works on is set with ``. - + Run the application with the preload library: `LD_PRELOAD=/build/lib/libgkfs_intercept.so ./application`. In the case of an MPI application use the `{mpirun, mpiexec} -x` argument. - + ### Logging The following environment variables can be used to enable logging in the client library: `LIBGKFS_LOG=` and `LIBGKFS_LOG_OUTPUT=` to configure the output module and set the path to the log file of the client -library. If not path is specified in `LIBGKFS_LOG_OUTPUT`, the client library +library. If not path is specified in `LIBGKFS_LOG_OUTPUT`, the client library will send log messages to `/tmp/gkfs_client.log`. The following modules are available: @@ -228,6 +228,7 @@ The following modules are available: module will only be available if the client library is built in `Debug` mode. - `all`: All previous options combined. + - `trace_reads`: Generate log line with extra information in read operations for guided distributor - `help`: Print a help message and exit. When tracing sytem calls, specific syscalls can be removed from log messages by @@ -236,20 +237,56 @@ setting it to `LIBGKFS_LOG_SYSCALL_FILTER=epoll_wait,epoll_create` will filter out any log entries from the `epoll_wait()` and `epoll_create()` system calls. Additionally, setting the `LIBGKFS_LOG_OUTPUT_TRUNC` environment variable with -a value different from `0` will instruct the logging subsystem to truncate +a value different from `0` will instruct the logging subsystem to truncate the file used for logging, rather than append to it. -For the daemon, the `GKFS_DAEMON_LOG_PATH=` environment variable -can be provided to set the path to the log file, and the log module can be +For the daemon, the `GKFS_DAEMON_LOG_PATH=` environment variable +can be provided to set the path to the log file, and the log module can be selected with the `GKFS_LOG_LEVEL={off,critical,err,warn,info,debug,trace}` environment variable. + ### External functions GekkoFS allows to use external functions on your client code, via LD_PRELOAD. Source code needs to be compiled with -fPIC. We include a pfind io500 substitution, `examples/gfind/gfind.cpp` and a non-mpi version `examples/gfind/sfind.cpp` +### Data distributors +The data distribution can be selected at compilation time, we have 2 distributors available: + +## Simple Hash (Default) +Chunks are distributed randomly to the different GekkoFS servers. + +## Guided Distributor +Guided distributor distributes chunks using a shared file with the next format: +` ` + +Moreover if you prepend a path with #, all the data from that path will go to the same place as the metadata. +Specifically defined paths (without#) will be prioritary. + +i.e., +#/mdt-hard 0 0 + +GekkoFS will store data and metadata to the same server. The server will still be random (0 0 has no meaning, yet). + +Chunks not specified, are distributed using the Simple Hash distributor. + +To generate such file we need to follow a first execution, using the trace_reads log option + +This will enable a `TRACE_READS` level log at the clients offering several lines that can be used to generate the input file. +In this stage, each node should generate a separated file this can be done in SLURM using the next line : +`srun -N 10 -n 320 --export="ALL" /bin/bash -c "export LIBGKFS_LOG_OUTPUT=${HOME}/test/GLOBAL.txt;LD_PRELOAD=${GKFS_PRLD} "` + +Then, use the `examples/distributors/guided/generate.py` to create the output file. +* `python examples/distributors/guided/generate.py ~/test/GLOBAL.txt >> guided.txt` + +This should work if the nodes are sorted in alphabetical order, which is the usual scenario. Users should take care of multi-server configurations. + +Finally, enable the distributor using the next compilation flags: +* `GKFS_USE_GUIDED_DISTRIBUTION` ON +* `GKFS_USE_GUIDED_DISTRIBUTION_PATH` `` + ### Acknowledgment This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). diff --git a/examples/distributors/guided/generate.py b/examples/distributors/guided/generate.py new file mode 100644 index 0000000000000000000000000000000000000000..e4e21d963fef0b9e36ab4056208e6ac911e5ce5e --- /dev/null +++ b/examples/distributors/guided/generate.py @@ -0,0 +1,40 @@ +### +# Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain +# Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + +# This software was partially supported by the +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + +# This software was partially supported by the +# ADA-FS project under the SPPEXA project funded by the DFG. + +# SPDX-License-Identifier: MIT +### +#!/usr/bin/python3 +import re +import sys +import collections + +file = sys.argv[1] + +pattern = re.compile(r".+(read\ )(.*)( host: )(\d+).+(path: )(.+),.+(chunk_start: )(\d+).+(chunk_end: )(\d+)") + +d = collections.OrderedDict() + +with open(file) as f: + for line in f: + result = pattern.match(line) + if result: + d[result.group(2)] = 1 +keys = sorted(d.keys()) +i = 0 +for key in keys: + d[key] = i + i = i + 1 + +with open(file) as f: + for line in f: + result = pattern.match(line) + if result: + for i in range(int(result.group(8)), int(result.group(10))+1): + print (result.group(6), i, d[result.group(2)]) diff --git a/include/client/logging.hpp b/include/client/logging.hpp index e793106c0209761e86db96f58f18f54aca33e90b..e87aa0c059e99503f26db064c321811483c6f2bc 100644 --- a/include/client/logging.hpp +++ b/include/client/logging.hpp @@ -47,7 +47,7 @@ namespace gkfs::log { -enum class log_level : short { +enum class log_level : unsigned int { print_syscalls = 1 << 0, print_syscalls_entry = 1 << 1, print_info = 1 << 2, @@ -57,6 +57,7 @@ enum class log_level : short { print_hermes = 1 << 6, print_mercury = 1 << 7, print_debug = 1 << 8, + print_trace_reads = 1 << 9, // for internal use print_none = 0, @@ -117,6 +118,7 @@ static const auto constexpr warning = log_level::print_warnings; static const auto constexpr hermes = log_level::print_hermes; static const auto constexpr mercury = log_level::print_mercury; static const auto constexpr debug = log_level::print_debug; +static const auto constexpr trace_reads = log_level::print_trace_reads; static const auto constexpr none = log_level::print_none; static const auto constexpr most = log_level::print_most; static const auto constexpr all = log_level::print_all; @@ -125,7 +127,8 @@ static const auto constexpr help = log_level::print_help; static const auto constexpr level_names = utils::make_array( "syscall", "syscall", // sycall_entry uses the same name as syscall - "info", "critical", "error", "warning", "hermes", "mercury", "debug"); + "info", "critical", "error", "warning", "hermes", "mercury", "debug", + "trace_reads"); inline constexpr auto lookup_level_name(log_level l) { @@ -533,6 +536,14 @@ static_buffer::grow(std::size_t size) { } \ } while(0); +#define LOG_TRACE_READS(...) \ + do { \ + if(gkfs::log::get_global_logger()) { \ + gkfs::log::get_global_logger()->log( \ + gkfs::log::trace_reads, __func__, __LINE__, __VA_ARGS__); \ + } \ + } while(0); + #ifdef GKFS_DEBUG_BUILD #define LOG_SYSCALL(...) \ diff --git a/include/client/preload_context.hpp b/include/client/preload_context.hpp index 83a44ad7ddc258b2085d7d088335386e7678c6a0..183f7960b4336e2d7a169a9452121a3938bd6fa9 100644 --- a/include/client/preload_context.hpp +++ b/include/client/preload_context.hpp @@ -36,6 +36,7 @@ #include #include #include +#include #include @@ -102,6 +103,7 @@ private: mutable std::mutex internal_fds_mutex_; bool internal_fds_must_relocate_; std::bitset protected_fds_; + std::string hostname; public: static PreloadContext* @@ -210,6 +212,9 @@ public: void unprotect_user_fds(); + + std::string + get_hostname(); }; } // namespace preload diff --git a/include/common/cmake_configure.hpp.in b/include/common/cmake_configure.hpp.in index 811f643bedfe48721f52d7bb10ef6d85d5555518..8b3ee852be9138d1e839dc4c2aecbcb5b2bef7c6 100644 --- a/include/common/cmake_configure.hpp.in +++ b/include/common/cmake_configure.hpp.in @@ -16,5 +16,8 @@ #cmakedefine01 CREATE_CHECK_PARENTS #cmakedefine01 LOG_SYSCALLS -#endif // FS_CMAKE_CONFIGURE_H +#cmakedefine GKFS_USE_GUIDED_DISTRIBUTION +#define GKFS_USE_GUIDED_DISTRIBUTION_PATH "@GKFS_USE_GUIDED_DISTRIBUTION_PATH@" + +#endif //FS_CMAKE_CONFIGURE_H // clang-format on diff --git a/include/common/rpc/distributor.hpp b/include/common/rpc/distributor.hpp index 4d76eda1237dfbbfffb14932676bd0a359e48d50..7c2b84ee6556f4eb622ccac25b00c80cde3027da 100644 --- a/include/common/rpc/distributor.hpp +++ b/include/common/rpc/distributor.hpp @@ -29,11 +29,17 @@ #ifndef GEKKOFS_RPC_DISTRIBUTOR_HPP #define GEKKOFS_RPC_DISTRIBUTOR_HPP +#include "../include/config.hpp" #include #include #include +#include +#include +#include -namespace gkfs::rpc { +namespace gkfs { + +namespace rpc { using chunkid_t = unsigned int; using host_t = unsigned int; @@ -45,6 +51,11 @@ public: virtual host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const = 0; + // TODO: We need to pass hosts_size in the server side, because the number + // of servers are not defined (in startup) + virtual host_t + locate_data(const std::string& path, const chunkid_t& chnk_id, + unsigned int hosts_size) = 0; virtual host_t locate_file_metadata(const std::string& path) const = 0; @@ -57,11 +68,13 @@ public: class SimpleHashDistributor : public Distributor { private: host_t localhost_; - unsigned int hosts_size_; + unsigned int hosts_size_{0}; std::vector all_hosts_; std::hash str_hash; public: + SimpleHashDistributor(); + SimpleHashDistributor(host_t localhost, unsigned int hosts_size); host_t @@ -71,6 +84,10 @@ public: locate_data(const std::string& path, const chunkid_t& chnk_id) const override; + host_t + locate_data(const std::string& path, const chunkid_t& chnk_id, + unsigned int host_size); + host_t locate_file_metadata(const std::string& path) const override; @@ -123,6 +140,43 @@ public: locate_directory_metadata(const std::string& path) const override; }; -} // namespace gkfs::rpc +class GuidedDistributor : public Distributor { +private: + host_t localhost_; + unsigned int hosts_size_{0}; + std::vector all_hosts_; + std::hash str_hash; + std::unordered_map> + map_interval; + std::vector prefix_list; // Should not be very long + bool + init_guided(); + +public: + GuidedDistributor(); + + GuidedDistributor(host_t localhost, unsigned int hosts_size); + + host_t + localhost() const override; + + host_t + locate_data(const std::string& path, + const chunkid_t& chnk_id) const override; + + host_t + locate_data(const std::string& path, const chunkid_t& chnk_id, + unsigned int host_size); + + host_t + locate_file_metadata(const std::string& path) const override; + + std::vector + locate_directory_metadata(const std::string& path) const override; +}; + +} // namespace rpc +} // namespace gkfs #endif // GEKKOFS_RPC_LOCATOR_HPP diff --git a/include/daemon/classes/rpc_data.hpp b/include/daemon/classes/rpc_data.hpp index 6a8125eee2b4360747b96078d74ae40120f5ea26..35f0458d1ffa37561952a4fd120f35568ca27364 100644 --- a/include/daemon/classes/rpc_data.hpp +++ b/include/daemon/classes/rpc_data.hpp @@ -32,6 +32,13 @@ #include namespace gkfs { + +/* Forward declarations */ +namespace rpc { +class Distributor; +} + + namespace daemon { class RPCData { @@ -47,6 +54,8 @@ private: ABT_pool io_pool_; std::vector io_streams_; std::string self_addr_str_; + // Distributor + std::shared_ptr distributor_; public: static RPCData* @@ -84,7 +93,13 @@ public: self_addr_str() const; void - self_addr_str(const std::string& addr_str); + self_addr_str(const std::string& addr_stra); + + const std::shared_ptr& + distributor() const; + + void + distributor(const std::shared_ptr& distributor); }; } // namespace daemon diff --git a/include/daemon/daemon.hpp b/include/daemon/daemon.hpp index 612712b63bcad416fcffbde8129dbee216112fbb..bfeea79763a1bd0dbff06f5676f376c922c56cfa 100644 --- a/include/daemon/daemon.hpp +++ b/include/daemon/daemon.hpp @@ -44,6 +44,7 @@ extern "C" { #include #include +#include #define GKFS_DATA \ (static_cast(gkfs::daemon::FsData::getInstance())) diff --git a/src/client/logging.cpp b/src/client/logging.cpp index c21da6d9b995f3ffbc4d494a6d6c64f7afb25f7d..0b45cb45853c639ace4d612e961dfe70be5b3853 100644 --- a/src/client/logging.cpp +++ b/src/client/logging.cpp @@ -105,6 +105,10 @@ static const auto constexpr debug_opts = utils::make_array( "[ default: on ]"}, log::mercury}, + opt_info{STR_AND_LEN("trace_reads"), + {"Print extended read information", "[ default: off ]"}, + log::trace_reads}, + #ifdef GKFS_DEBUG_BUILD opt_info{STR_AND_LEN("debug"), diff --git a/src/client/preload.cpp b/src/client/preload.cpp index e5b0eff9dceae67728eb65c888f0574991083bc9..47a354551718b575e44dc5d0547576c99a5470f4 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -149,10 +149,16 @@ init_ld_environment_() { CTX->fwd_host_id(), CTX->hosts().size()); CTX->distributor(forwarder_dist); #else - auto simple_hash_dist = std::make_shared( +#ifdef GKFS_USE_GUIDED_DISTRIBUTION + auto distributor = std::make_shared( + CTX->local_host_id(), CTX->hosts().size()); +#else + auto distributor = std::make_shared( CTX->local_host_id(), CTX->hosts().size()); - CTX->distributor(simple_hash_dist); #endif + CTX->distributor(distributor); +#endif + LOG(INFO, "Retrieving file system configuration..."); diff --git a/src/client/preload_context.cpp b/src/client/preload_context.cpp index 8762e7aebf319bb13d1dcc6e3c8f7f6e5fa2392a..5236a1c4cfb2051f59355cc02fdc484789eafde5 100644 --- a/src/client/preload_context.cpp +++ b/src/client/preload_context.cpp @@ -47,7 +47,9 @@ extern "C" { #include } -namespace gkfs::preload { +namespace gkfs { + +namespace preload { decltype(PreloadContext::MIN_INTERNAL_FD) constexpr PreloadContext:: MIN_INTERNAL_FD; @@ -59,6 +61,10 @@ PreloadContext::PreloadContext() internal_fds_.set(); internal_fds_must_relocate_ = true; + + char host[255]; + gethostname(host, 255); + hostname = host; } void @@ -426,4 +432,11 @@ PreloadContext::unprotect_user_fds() { internal_fds_must_relocate_ = true; } -} // namespace gkfs::preload \ No newline at end of file + +std::string +PreloadContext::get_hostname() { + return hostname; +} + +} // namespace preload +} // namespace gkfs diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 5ca518038460841fd094e791999532c65d65a008..b4e4206b29915e0c590d8b9bbcb5b733df7783d1 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -362,7 +362,7 @@ read_hosts_file() { } LOG(INFO, "Hosts pool size: {}", hosts.size()); - + sort(hosts.begin(), hosts.end()); // Sort hosts by alphanumerical value. return hosts; } @@ -416,4 +416,4 @@ connect_to_hosts(const vector>& hosts) { CTX->hosts(addrs); } -} // namespace gkfs::utils \ No newline at end of file +} // namespace gkfs::utils diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index 151c4f70b9f72bfead5cf9ad687f6edeed59231b..45e4f368ef48ce36d38ae20276471cace6d522b4 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -262,7 +262,6 @@ forward_read(const string& path, void* buf, const off64_t offset, // contains the recipient ids, used to access the target_chnks map. // First idx is chunk with potential offset std::vector targets{}; - // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; @@ -361,8 +360,15 @@ forward_read(const string& path, void* buf, const off64_t offset, handles.emplace_back( ld_network_service->post(endp, in)); - LOG(DEBUG, "host: {}, path: {}, chunks: {}, size: {}, offset: {}", - target, path, in.chunk_n(), total_chunk_size, in.offset()); + LOG(DEBUG, + "host: {}, path: {}, chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}", + target, path, chnk_start, chnk_end, in.chunk_n(), + total_chunk_size, in.offset()); + + LOG(TRACE_READS, + "read {} host: {}, path: {}, chunk_start: {}, chunk_end: {}", + CTX->get_hostname(), target, path, chnk_start, chnk_end); + } catch(const std::exception& ex) { LOG(ERROR, @@ -559,4 +565,4 @@ forward_get_chunk_stat() { return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free}); } -} // namespace gkfs::rpc \ No newline at end of file +} // namespace gkfs::rpc diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 4c6698ed67e1e55e7f09fe46cb2ddfc35a38a627..f85e8e3e5c81a48d7b31ad8bee8264721be2fb6c 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -29,6 +29,7 @@ add_subdirectory(arithmetic) add_library(distributor STATIC) +include_directories(${Boost_INCLUDE_DIRS}) set_property(TARGET distributor PROPERTY POSITION_INDEPENDENT_CODE ON) target_sources(distributor PUBLIC diff --git a/src/common/rpc/distributor.cpp b/src/common/rpc/distributor.cpp index 3b1c4011f3fc015f2cc6ccb05489e9e7b10d99d4..dbbe5efbd3d608e588c326d2944b36b27c372964 100644 --- a/src/common/rpc/distributor.cpp +++ b/src/common/rpc/distributor.cpp @@ -30,7 +30,9 @@ using namespace std; -namespace gkfs::rpc { +namespace gkfs { + +namespace rpc { SimpleHashDistributor::SimpleHashDistributor(host_t localhost, unsigned int hosts_size) @@ -38,6 +40,8 @@ SimpleHashDistributor::SimpleHashDistributor(host_t localhost, ::iota(all_hosts_.begin(), all_hosts_.end(), 0); } +SimpleHashDistributor::SimpleHashDistributor() {} + host_t SimpleHashDistributor::localhost() const { return localhost_; @@ -49,6 +53,18 @@ SimpleHashDistributor::locate_data(const string& path, return str_hash(path + ::to_string(chnk_id)) % hosts_size_; } +host_t +SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id, + unsigned int hosts_size) { + if(hosts_size_ != hosts_size) { + hosts_size_ = hosts_size; + all_hosts_ = std::vector(hosts_size); + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + } + + return str_hash(path + ::to_string(chnk_id)) % hosts_size_; +} + host_t SimpleHashDistributor::locate_file_metadata(const string& path) const { return str_hash(path) % hosts_size_; @@ -109,4 +125,108 @@ std::vector ForwarderDistributor::locate_directory_metadata(const std::string& path) const { return all_hosts_; } -} // namespace gkfs::rpc + +bool +GuidedDistributor::init_guided() { + unsigned int destination_host; + chunkid_t chunk_id; + string path; + std::ifstream mapfile; + mapfile.open(GKFS_USE_GUIDED_DISTRIBUTION_PATH); + if((mapfile.rdstate() & std::ifstream::failbit) != 0) + return false; // If it fails, the mapping will be as the SimpleHash + + while(mapfile >> path >> chunk_id >> destination_host) { + // We need destination+1, as 0 has an special meaning in the interval + // map. + if(path.size() > 0 and path[0] == '#') { + // Path that has this prefix will have metadata and data in the same + // place i.e. #/mdtest-hard/ 10 10 chunk_id and destination_host + // are not used + prefix_list.emplace_back(path.substr(1, path.size())); + continue; + } + + auto I = map_interval.find(path); + if(I == map_interval.end()) + map_interval[path] += make_pair( + boost::icl::discrete_interval::right_open( + chunk_id, chunk_id + 1), + destination_host + 1); + else if(I->second.find(chunk_id) == I->second.end()) + I->second.insert(make_pair( + boost::icl::discrete_interval::right_open( + chunk_id, chunk_id + 1), + destination_host + 1)); + } + mapfile.close(); + return true; +} + +GuidedDistributor::GuidedDistributor() { + init_guided(); +} + +GuidedDistributor::GuidedDistributor(host_t localhost, + unsigned int hosts_size) { + if(hosts_size_ != hosts_size) { + hosts_size_ = hosts_size; + localhost_ = localhost; + all_hosts_ = std::vector(hosts_size); + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + } + init_guided(); +} + +host_t +GuidedDistributor::localhost() const { + return localhost_; +} + +host_t +GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id, + unsigned int hosts_size) { + if(hosts_size_ != hosts_size) { + hosts_size_ = hosts_size; + all_hosts_ = std::vector(hosts_size); + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + } + + return (locate_data(path, chnk_id)); +} + +host_t +GuidedDistributor::locate_data(const string& path, + const chunkid_t& chnk_id) const { + auto it = map_interval.find(path); + if(it != map_interval.end()) { + auto it_f = it->second.find(chnk_id); + if(it_f != it->second.end()) { + return (it_f->second - + 1); // Decrement destination host from the interval_map + } + } + + for(auto const& it : prefix_list) { + if(0 == path.compare(0, min(it.length(), path.length()), it, 0, + min(it.length(), path.length()))) + ; + return str_hash(path) % hosts_size_; + } + + auto locate = path + ::to_string(chnk_id); + return str_hash(locate) % hosts_size_; +} + +host_t +GuidedDistributor::locate_file_metadata(const string& path) const { + return str_hash(path) % hosts_size_; +} + +::vector +GuidedDistributor::locate_directory_metadata(const string& path) const { + return all_hosts_; +} + +} // namespace rpc +} // namespace gkfs diff --git a/src/daemon/classes/rpc_data.cpp b/src/daemon/classes/rpc_data.cpp index 182c19617a6e7d2c8bafa6feb9cdab5537bb4ec6..ae970ee11b0c85bbf8575ccaedfcb298ea222c1a 100644 --- a/src/daemon/classes/rpc_data.cpp +++ b/src/daemon/classes/rpc_data.cpp @@ -30,7 +30,9 @@ using namespace std; -namespace gkfs::daemon { +namespace gkfs { + +namespace daemon { // Getter/Setter @@ -74,4 +76,17 @@ RPCData::self_addr_str(const std::string& addr_str) { self_addr_str_ = addr_str; } -} // namespace gkfs::daemon \ No newline at end of file +const std::shared_ptr& +RPCData::distributor() const { + return distributor_; +} + +void +RPCData::distributor( + const std::shared_ptr& distributor) { + distributor_ = distributor; +} + + +} // namespace daemon +} // namespace gkfs diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 452f76ef3cd60191dc40e61aa8f7c86ae669e5e6..319c6547d78a23280dc9609d2626f49dcff53f13 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -32,6 +32,7 @@ #include #include #include + #include #include #include @@ -57,6 +58,7 @@ extern "C" { } using namespace std; + namespace po = boost::program_options; namespace fs = std::filesystem; @@ -203,6 +205,21 @@ init_environment() { throw; } + GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ", __func__); + try { +#ifdef GKFS_USE_GUIDED_DISTRIBUTION + auto distributor = std::make_shared(); +#else + auto distributor = std::make_shared(); +#endif + RPC_DATA->distributor(distributor); + } catch(const std::exception& e) { + GKFS_DATA->spdlogger()->error( + "{}() Failed to initialize Distributor: {}", __func__, + e.what()); + throw; + } + #ifdef GKFS_ENABLE_FORWARDING GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); #endif @@ -220,6 +237,7 @@ init_environment() { throw; } #endif + // Initialize data backend std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 697ca86e451337d9c4260762f2504b227b0310c0..59abace2e12a11b1203eb07da63129af7d96b55e 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -45,7 +45,6 @@ #endif using namespace std; - /* * This file contains all Margo RPC handlers that are concerning management * operations @@ -152,8 +151,8 @@ rpc_srv_write(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -196,7 +195,8 @@ rpc_srv_write(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING - if(distributor.locate_data(in.path, chnk_id_file) != host_id) { + if(RPC_DATA->distributor()->locate_data(in.path, chnk_id_file, + host_size) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); @@ -415,9 +415,8 @@ rpc_srv_read(hg_handle_t handle) { #ifndef GKFS_ENABLE_FORWARDING auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); #endif - + auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -448,7 +447,8 @@ rpc_srv_read(hg_handle_t handle) { chnk_id_file++) { // Continue if chunk does not hash to this host #ifndef GKFS_ENABLE_FORWARDING - if(distributor.locate_data(in.path, chnk_id_file) != host_id) { + if(RPC_DATA->distributor()->locate_data(in.path, chnk_id_file, + host_size) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 0c9de75b9636a1e9b962082bab4e4801ce1b5252..4c3697a75aa422c5cb68184ec256909a2c56ed03 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -47,7 +47,7 @@ if(NOT catch2_POPULATED) add_subdirectory(${catch2_SOURCE_DIR} ${catch2_BINARY_DIR}) endif() -# create a convenience library with Catch2's main +# create a convenience library with Catch2's main # to speed up test compilation add_library(catch2_main STATIC catch_main.cpp @@ -57,18 +57,20 @@ target_link_libraries(catch2_main Catch2::Catch2 ) -# define executables for tests and make them depend on the convenience +# define executables for tests and make them depend on the convenience # library (and Catch2 transitively) and fmt add_executable(tests test_example_00.cpp test_example_01.cpp - test_utils_arithmetic.cpp + test_utils_arithmetic.cpp + test_guided_distributor.cpp ) target_link_libraries(tests catch2_main fmt::fmt - arithmetic + arithmetic + distributor ) # Catch2's contrib folder includes some helper functions diff --git a/tests/unit/test_guided_distributor.cpp b/tests/unit/test_guided_distributor.cpp new file mode 100644 index 0000000000000000000000000000000000000000..eb36647afb729f195a92183bf53cf3fbf69fb8ff --- /dev/null +++ b/tests/unit/test_guided_distributor.cpp @@ -0,0 +1,61 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#include +#include +#include +#include + +TEST_CASE( "Guided distributor Testing", "[Distributor]" ) { + + GIVEN( "A distributor" ) { + // Generate a guided.txt that will put some files in + // the server num 4 + std::ofstream o; + o.open("/tmp/guided.txt"); + o << "/t.c01 0 3" << std::endl; + o << "/t.c02 0 3" << std::endl; + o << "/t.c01 1 3" << std::endl; + o << "/t.c02 1 3" << std::endl; + o << "/t.c01 2 3" << std::endl; + o << "/t.c02 2 3" << std::endl; + o << "/t.c03 1 3" << std::endl; + o << "/t.c04 1 3" << std::endl; + o << "/t.c05 1 3" << std::endl; + o << "/t.c06 1 3" << std::endl; + o << "/t.c07 1 3" << std::endl; + o.close(); + + // The distributor should return 3 for all the tested files + auto d = gkfs::rpc::GuidedDistributor(); + + REQUIRE( d.locate_data("/t.c01",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c02",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c03",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c04",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c05",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c06",1,10) == 3 ); + REQUIRE( d.locate_data("/t.c07",1,10) == 3 ); + + // Next result is random, but with the same seed is consistent + // We ask for chunk 5 that is distributed randomly between the + // 10 servers. + REQUIRE ( (d.locate_data("/t.c01",5,10) + + d.locate_data("/t.c02",5,10) + + d.locate_data("/t.c03",5,10) + + d.locate_data("/t.c04",5,10) + + d.locate_data("/t.c05",5,10) + + d.locate_data("/t.c06",5,10) + + d.locate_data("/t.c07",5,10) ) == 42); + } +}