Commit a8dc9c3c authored by Ramon Nou's avatar Ramon Nou Committed by Ramon Nou
Browse files

Moved distributor instantation to RPC_DATA and daemon.cpp on the server side....

Moved distributor instantation to RPC_DATA and daemon.cpp on the server side. Added interval_map to GuidedDistributor for better performance

Moving reads extended log to normal log module.

Update logging.hpp

Added GKFS prefix to CMake options

Catch2 Guided Distributor test

Added creation of /tmp/guided.txt in the test

updated README.MD and protected Distributor for duplicated inserts

Added metadata + data optimization with #
parent 2eeb84a8
......@@ -5,7 +5,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
- Added dirents_extended function and a find substitution proposal.
- Created a Guided Distributor using a mapping file to map chunks to specific
nodes.
## [0.8.0] - 2020-09-15
## New
- Both client library and daemon have been extended to support the ofi+verbs
......
......@@ -180,6 +180,14 @@ mark_as_advanced(CLIENT_LOG_MESSAGE_SIZE)
configure_file(include/common/cmake_configure.hpp.in include/common/cmake_configure.hpp)
option(GKFS_USE_GUIDED_DISTRIBUTION "Use guided data distributor " OFF)
message(STATUS "[gekkofs] Guided data distributor: ${GKFS_USE_GUIDED_DISTRIBUTION}")
set(GKFS_USE_GUIDED_DISTRIBUTION_PATH "/tmp/guided.txt" CACHE STRING "File Path for guided distributor")
set_property(CACHE GKFS_USE_GUIDED_DISTRIBUTION_PATH PROPERTY STRINGS)
message(STATUS "[gekkofs] Guided data distributor input file path: ${GKFS_USE_GUIDED_DISTRIBUTION_PATH}")
# Imported target
add_library(RocksDB INTERFACE IMPORTED GLOBAL)
target_link_libraries(RocksDB
......
......@@ -6,7 +6,7 @@
GekkoFS is a file system capable of aggregating the local I/O capacity and performance of each compute node
in a HPC cluster to produce a high-performance storage space that can be accessed in a distributed manner.
This storage space allows HPC applications and simulations to run in isolation from each other with regards
This storage space allows HPC applications and simulations to run in isolation from each other with regards
to I/O, which reduces interferences and improves performance.
# Dependencies
......@@ -30,7 +30,7 @@ to I/O, which reduces interferences and improves performance.
- snappy: `sudo yum install snappy snappy-devel`
- zlib: `sudo yum install zlib zlib-devel`
- bzip2: `sudo yum install bzip2 bzip2-devel`
- zstandard:
- zstandard:
```bash
wget https://github.com/facebook/zstd/archive/v1.1.3.tar.gz
mv v1.1.3.tar.gz zstd-1.1.3.tar.gz
......@@ -162,28 +162,28 @@ Further options:
```bash
Allowed options:
-h [ --help ] Help message
-m [ --mountdir ] arg Virtual mounting directory where GekkoFS is
-m [ --mountdir ] arg Virtual mounting directory where GekkoFS is
available.
-r [ --rootdir ] arg Local data directory where GekkoFS data for this
-r [ --rootdir ] arg Local data directory where GekkoFS data for this
daemon is stored.
-i [ --metadir ] arg Metadata directory where GekkoFS RocksDB data
directory is located. If not set, rootdir is used.
-l [ --listen ] arg Address or interface to bind the daemon to.
-l [ --listen ] arg Address or interface to bind the daemon to.
Default: local hostname.
When used with ofi+verbs the FI_VERBS_IFACE
environment variable is set accordingly which
associates the verbs device with the network
interface. In case FI_VERBS_IFACE is already
When used with ofi+verbs the FI_VERBS_IFACE
environment variable is set accordingly which
associates the verbs device with the network
interface. In case FI_VERBS_IFACE is already
defined, the argument is ignored. Default 'ib'.
-H [ --hosts-file ] arg Shared file used by deamons to register their
-H [ --hosts-file ] arg Shared file used by deamons to register their
endpoints. (default './gkfs_hosts.txt')
-P [ --rpc-protocol ] arg Used RPC protocol for inter-node communication.
Available: {ofi+sockets, ofi+verbs, ofi+psm2} for
TCP, Infiniband, and Omni-Path, respectively.
Available: {ofi+sockets, ofi+verbs, ofi+psm2} for
TCP, Infiniband, and Omni-Path, respectively.
(Default ofi+sockets)
Libfabric must have enabled support verbs or psm2.
--auto-sm Enables intra-node communication (IPCs) via the
`na+sm` (shared memory) protocol, instead of using
--auto-sm Enables intra-node communication (IPCs) via the
`na+sm` (shared memory) protocol, instead of using
the RPC protocol. (Default off)
--version Print version and exit.
```
......@@ -194,15 +194,15 @@ Shut it down by gracefully killing the process (SIGTERM).
Metadata and actual data will be stored at the `<fs_data_path>`. The path where the application works on is set with
`<pseudo_mount_dir_path>`.
Run the application with the preload library: `LD_PRELOAD=<path>/build/lib/libgkfs_intercept.so ./application`. In the case of
an MPI application use the `{mpirun, mpiexec} -x` argument.
### Logging
The following environment variables can be used to enable logging in the client
library: `LIBGKFS_LOG=<module>` and `LIBGKFS_LOG_OUTPUT=<path/to/file>` to
configure the output module and set the path to the log file of the client
library. If not path is specified in `LIBGKFS_LOG_OUTPUT`, the client library
library. If not path is specified in `LIBGKFS_LOG_OUTPUT`, the client library
will send log messages to `/tmp/gkfs_client.log`.
The following modules are available:
......@@ -228,6 +228,7 @@ The following modules are available:
module will only be available if the client library is built in `Debug`
mode.
- `all`: All previous options combined.
- `trace_reads`: Generate log line with extra information in read operations for guided distributor
- `help`: Print a help message and exit.
When tracing sytem calls, specific syscalls can be removed from log messages by
......@@ -236,20 +237,57 @@ setting it to `LIBGKFS_LOG_SYSCALL_FILTER=epoll_wait,epoll_create` will filter
out any log entries from the `epoll_wait()` and `epoll_create()` system calls.
Additionally, setting the `LIBGKFS_LOG_OUTPUT_TRUNC` environment variable with
a value different from `0` will instruct the logging subsystem to truncate
a value different from `0` will instruct the logging subsystem to truncate
the file used for logging, rather than append to it.
For the daemon, the `GKFS_DAEMON_LOG_PATH=<path/to/file>` environment variable
can be provided to set the path to the log file, and the log module can be
For the daemon, the `GKFS_DAEMON_LOG_PATH=<path/to/file>` environment variable
can be provided to set the path to the log file, and the log module can be
selected with the `GKFS_LOG_LEVEL={off,critical,err,warn,info,debug,trace}`
environment variable.
### External functions
GekkoFS allows to use external functions on your client code, via LD_PRELOAD.
Source code needs to be compiled with -fPIC. We include a pfind io500 substitution,
`examples/gfind/gfind.cpp` and a non-mpi version `examples/gfind/sfind.cpp`
### Data distributors
The data distribution can be selected at compilation time, we have 2 distributors available:
## Simple Hash (Default)
Chunks are distributed randomly to the different GekkoFS servers.
## Guided Distributor
Guided distributor distributes chunks using a shared file with the next format:
`<path> <chunk_number> <host>`
Moreover if you prepend a path with #, all the data from that path will go to the same place as the metadata.
Specifically defined paths (without#) will be prioritary.
i.e.,
#/mdt-hard 0 0
GekkoFS will store data and metadata to the same server. The server will still be random (0 0 has no meaning, yet).
Chunks not specified, are distributed using the Simple Hash distributor.
To generate such file we need to follow a first execution, using the trace_reads log option
This will enable a `TRACE_READS` level log at the clients offering several lines that can be used to generate the input file.
In this stage, each node should generate a separated file this can be done in SLURM using the next line :
`srun -N 10 -n 320 --export="ALL" /bin/bash -c "export LIBGKFS_LOG_OUTPUT=${HOME}/test/GLOBAL.txt;LD_PRELOAD=${GKFS_PRLD} <app>"`
Then, use the `examples/distributors/guided/generate.py` to create the output file.
* `python examples/distributors/guided/generate.py ~/test/GLOBAL.txt >> guided.txt`
This should work if the nodes are sorted in alphabetical order, which is the usual scenario. Users should take care of multi-server configurations.
```
Finally, enable the distributor using the next compilation flags:
* `GKFS_USE_GUIDED_DISTRIBUTION` ON
* `GKFS_USE_GUIDED_DISTRIBUTION_PATH` `<full path to guided.txt>`
### Acknowledgment
This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
......
###
# Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
# Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
# This software was partially supported by the
# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
# This software was partially supported by the
# ADA-FS project under the SPPEXA project funded by the DFG.
# SPDX-License-Identifier: MIT
###
#!/usr/bin/python3
import re
import sys
import collections
file = sys.argv[1]
pattern = re.compile(r".+(read\ )(.*)( host: )(\d+).+(path: )(.+),.+(chunk_start: )(\d+).+(chunk_end: )(\d+)")
d = collections.OrderedDict()
with open(file) as f:
for line in f:
result = pattern.match(line)
if result:
d[result.group(2)] = 1
keys = sorted(d.keys())
i = 0
for key in keys:
d[key] = i
i = i + 1
with open(file) as f:
for line in f:
result = pattern.match(line)
if result:
for i in range(int(result.group(8)), int(result.group(10))+1):
print (result.group(6), i, d[result.group(2)])
......@@ -47,7 +47,7 @@
namespace gkfs::log {
enum class log_level : short {
enum class log_level : unsigned int {
print_syscalls = 1 << 0,
print_syscalls_entry = 1 << 1,
print_info = 1 << 2,
......@@ -57,6 +57,7 @@ enum class log_level : short {
print_hermes = 1 << 6,
print_mercury = 1 << 7,
print_debug = 1 << 8,
print_trace_reads = 1 << 9,
// for internal use
print_none = 0,
......@@ -117,6 +118,7 @@ static const auto constexpr warning = log_level::print_warnings;
static const auto constexpr hermes = log_level::print_hermes;
static const auto constexpr mercury = log_level::print_mercury;
static const auto constexpr debug = log_level::print_debug;
static const auto constexpr trace_reads = log_level::print_trace_reads;
static const auto constexpr none = log_level::print_none;
static const auto constexpr most = log_level::print_most;
static const auto constexpr all = log_level::print_all;
......@@ -125,7 +127,8 @@ static const auto constexpr help = log_level::print_help;
static const auto constexpr level_names = utils::make_array(
"syscall",
"syscall", // sycall_entry uses the same name as syscall
"info", "critical", "error", "warning", "hermes", "mercury", "debug");
"info", "critical", "error", "warning", "hermes", "mercury", "debug",
"trace_reads");
inline constexpr auto
lookup_level_name(log_level l) {
......@@ -533,6 +536,14 @@ static_buffer::grow(std::size_t size) {
} \
} while(0);
#define LOG_TRACE_READS(...) \
do { \
if(gkfs::log::get_global_logger()) { \
gkfs::log::get_global_logger()->log( \
gkfs::log::trace_reads, __func__, __LINE__, __VA_ARGS__); \
} \
} while(0);
#ifdef GKFS_DEBUG_BUILD
#define LOG_SYSCALL(...) \
......
......@@ -36,6 +36,7 @@
#include <memory>
#include <vector>
#include <string>
#include <config.hpp>
#include <bitset>
......@@ -102,6 +103,7 @@ private:
mutable std::mutex internal_fds_mutex_;
bool internal_fds_must_relocate_;
std::bitset<MAX_USER_FDS> protected_fds_;
std::string hostname;
public:
static PreloadContext*
......@@ -210,6 +212,9 @@ public:
void
unprotect_user_fds();
std::string
get_hostname();
};
} // namespace preload
......
......@@ -16,5 +16,8 @@
#cmakedefine01 CREATE_CHECK_PARENTS
#cmakedefine01 LOG_SYSCALLS
#endif // FS_CMAKE_CONFIGURE_H
#cmakedefine GKFS_USE_GUIDED_DISTRIBUTION
#define GKFS_USE_GUIDED_DISTRIBUTION_PATH "@GKFS_USE_GUIDED_DISTRIBUTION_PATH@"
#endif //FS_CMAKE_CONFIGURE_H
// clang-format on
......@@ -29,11 +29,17 @@
#ifndef GEKKOFS_RPC_DISTRIBUTOR_HPP
#define GEKKOFS_RPC_DISTRIBUTOR_HPP
#include "../include/config.hpp"
#include <vector>
#include <string>
#include <numeric>
#include <unordered_map>
#include <fstream>
#include <boost/icl/interval_map.hpp>
namespace gkfs::rpc {
namespace gkfs {
namespace rpc {
using chunkid_t = unsigned int;
using host_t = unsigned int;
......@@ -45,6 +51,11 @@ public:
virtual host_t
locate_data(const std::string& path, const chunkid_t& chnk_id) const = 0;
// TODO: We need to pass hosts_size in the server side, because the number
// of servers are not defined (in startup)
virtual host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) = 0;
virtual host_t
locate_file_metadata(const std::string& path) const = 0;
......@@ -57,11 +68,13 @@ public:
class SimpleHashDistributor : public Distributor {
private:
host_t localhost_;
unsigned int hosts_size_;
unsigned int hosts_size_{0};
std::vector<host_t> all_hosts_;
std::hash<std::string> str_hash;
public:
SimpleHashDistributor();
SimpleHashDistributor(host_t localhost, unsigned int hosts_size);
host_t
......@@ -71,6 +84,10 @@ public:
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size);
host_t
locate_file_metadata(const std::string& path) const override;
......@@ -123,6 +140,43 @@ public:
locate_directory_metadata(const std::string& path) const override;
};
} // namespace gkfs::rpc
class GuidedDistributor : public Distributor {
private:
host_t localhost_;
unsigned int hosts_size_{0};
std::vector<host_t> all_hosts_;
std::hash<std::string> str_hash;
std::unordered_map<std::string,
boost::icl::interval_map<chunkid_t, unsigned int>>
map_interval;
std::vector<std::string> prefix_list; // Should not be very long
bool
init_guided();
public:
GuidedDistributor();
GuidedDistributor(host_t localhost, unsigned int hosts_size);
host_t
localhost() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size);
host_t
locate_file_metadata(const std::string& path) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
};
} // namespace rpc
} // namespace gkfs
#endif // GEKKOFS_RPC_LOCATOR_HPP
......@@ -32,6 +32,13 @@
#include <daemon/daemon.hpp>
namespace gkfs {
/* Forward declarations */
namespace rpc {
class Distributor;
}
namespace daemon {
class RPCData {
......@@ -47,6 +54,8 @@ private:
ABT_pool io_pool_;
std::vector<ABT_xstream> io_streams_;
std::string self_addr_str_;
// Distributor
std::shared_ptr<gkfs::rpc::Distributor> distributor_;
public:
static RPCData*
......@@ -84,7 +93,13 @@ public:
self_addr_str() const;
void
self_addr_str(const std::string& addr_str);
self_addr_str(const std::string& addr_stra);
const std::shared_ptr<gkfs::rpc::Distributor>&
distributor() const;
void
distributor(const std::shared_ptr<gkfs::rpc::Distributor>& distributor);
};
} // namespace daemon
......
......@@ -44,6 +44,7 @@ extern "C" {
#include <daemon/classes/fs_data.hpp>
#include <daemon/classes/rpc_data.hpp>
#include <global/rpc/distributor.hpp>
#define GKFS_DATA \
(static_cast<gkfs::daemon::FsData*>(gkfs::daemon::FsData::getInstance()))
......
......@@ -105,6 +105,10 @@ static const auto constexpr debug_opts = utils::make_array(
"[ default: on ]"},
log::mercury},
opt_info{STR_AND_LEN("trace_reads"),
{"Print extended read information", "[ default: off ]"},
log::trace_reads},
#ifdef GKFS_DEBUG_BUILD
opt_info{STR_AND_LEN("debug"),
......
......@@ -149,10 +149,16 @@ init_ld_environment_() {
CTX->fwd_host_id(), CTX->hosts().size());
CTX->distributor(forwarder_dist);
#else
auto simple_hash_dist = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
#ifdef GKFS_USE_GUIDED_DISTRIBUTION
auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
CTX->local_host_id(), CTX->hosts().size());
#else
auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
CTX->local_host_id(), CTX->hosts().size());
CTX->distributor(simple_hash_dist);
#endif
CTX->distributor(distributor);
#endif
LOG(INFO, "Retrieving file system configuration...");
......
......@@ -47,7 +47,9 @@ extern "C" {
#include <syscall.h>
}
namespace gkfs::preload {
namespace gkfs {
namespace preload {
decltype(PreloadContext::MIN_INTERNAL_FD) constexpr PreloadContext::
MIN_INTERNAL_FD;
......@@ -59,6 +61,10 @@ PreloadContext::PreloadContext()
internal_fds_.set();
internal_fds_must_relocate_ = true;
char host[255];
gethostname(host, 255);
hostname = host;
}
void
......@@ -426,4 +432,11 @@ PreloadContext::unprotect_user_fds() {
internal_fds_must_relocate_ = true;
}
} // namespace gkfs::preload
\ No newline at end of file
std::string
PreloadContext::get_hostname() {
return hostname;
}
} // namespace preload
} // namespace gkfs
......@@ -362,7 +362,7 @@ read_hosts_file() {
}
LOG(INFO, "Hosts pool size: {}", hosts.size());
sort(hosts.begin(), hosts.end()); // Sort hosts by alphanumerical value.
return hosts;
}
......@@ -416,4 +416,4 @@ connect_to_hosts(const vector<pair<string, string>>& hosts) {
CTX->hosts(addrs);
}
} // namespace gkfs::utils
\ No newline at end of file
} // namespace gkfs::utils
......@@ -262,7 +262,6 @@ forward_read(const string& path, void* buf, const off64_t offset,
// contains the recipient ids, used to access the target_chnks map.
// First idx is chunk with potential offset
std::vector<uint64_t> targets{};
// targets for the first and last chunk as they need special treatment
uint64_t chnk_start_target = 0;
uint64_t chnk_end_target = 0;
......@@ -361,8 +360,15 @@ forward_read(const string& path, void* buf, const off64_t offset,
handles.emplace_back(
ld_network_service->post<gkfs::rpc::read_data>(endp, in));
LOG(DEBUG, "host: {}, path: {}, chunks: {}, size: {}, offset: {}",
target, path, in.chunk_n(), total_chunk_size, in.offset());
LOG(DEBUG,
"host: {}, path: {}, chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}",
target, path, chnk_start, chnk_end, in.chunk_n(),
total_chunk_size, in.offset());
LOG(TRACE_READS,
"read {} host: {}, path: {}, chunk_start: {}, chunk_end: {}",
CTX->get_hostname(), target, path, chnk_start, chnk_end);
} catch(const std::exception& ex) {
LOG(ERROR,
......@@ -559,4 +565,4 @@ forward_get_chunk_stat() {
return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free});
}
} // namespace gkfs::rpc
\ No newline at end of file
} // namespace gkfs::rpc
......@@ -29,6 +29,7 @@
add_subdirectory(arithmetic)
add_library(distributor STATIC)
include_directories(${Boost_INCLUDE_DIRS})
set_property(TARGET distributor PROPERTY POSITION_INDEPENDENT_CODE ON)
target_sources(distributor
PUBLIC
......
......@@ -30,7 +30,9 @@
using namespace std;
namespace gkfs::rpc {
namespace gkfs {
namespace rpc {
SimpleHashDistributor::SimpleHashDistributor(host_t localhost,
unsigned int hosts_size)
......@@ -38,6 +40,8 @@ SimpleHashDistributor::SimpleHashDistributor(host_t localhost,
::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}
SimpleHashDistributor::SimpleHashDistributor() {}
host_t
SimpleHashDistributor::localhost() const {
return localhost_;
......@@ -49,6 +53,18 @@ SimpleHashDistributor::locate_data(const string& path,
return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
}
host_t
SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) {
if(hosts_size_ != hosts_size) {
hosts_size_ = hosts_size;
all_hosts_ = std::vector<unsigned int>(hosts_size);
::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}
return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
}
host_t
SimpleHashDistributor::locate_file_metadata(const string& path) const {
return str_hash(path) % hosts_size_;
......@@ -109,4 +125,108 @@ std::vector<host_t>
ForwarderDistributor::locate_directory_metadata(const std::string& path) const {
return all_hosts_;
}
} // namespace gkfs::rpc
bool
GuidedDistributor::init_guided() {
unsigned int destination_host;
chunkid_t chunk_id;
string path;
std::ifstream mapfile;
mapfile.open(GKFS_USE_GUIDED_DISTRIBUTION_PATH);
if((mapfile.rdstate() & std::ifstream::failbit) != 0)
return false; // If it fails, the mapping will be as the SimpleHash