Commits on Source (7)
......@@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
- Replication without using the server. NUM_REPL (0 < NUM_REPL < num_servers) env variable defines the number of
replicas ([!166](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/141)).
- Modified write and reads to use a bitset instead of the traditional hash per chunk in the server.
- Added reattemp support in get_fs_config to other servers, when the initial server fails.
### New
### Changed
......@@ -16,6 +20,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
### New
- Use of GSL to narrow cast syscall return types, corrected syscall return types
([!182](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/182)).
- Support for client-side per process logging, activated
with `LIBGKFS_LOG_PER_PROCESS` ([!179](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/179)).
- Support mtime with option gkfs::config::metadata::
......
......@@ -320,6 +320,13 @@ Support for fstat in renamed files is included.
This is disabled by default.
### Replication
The user can enable the data replication feature by setting the replication environment variable:
`LIBGKFS_NUM_REPL=<num repl>`.
The number of replicas should go from `0` to the `number of servers - 1`. The replication environment variable can be
set up for each client independently.
## Acknowledgment
This software was partially supported by the EC H2020 funded NEXTGenIO project (Project ID: 671951, www.nextgenio.eu).
......@@ -330,7 +337,8 @@ the DFG.
This software is partially supported by the FIDIUM project funded by the DFG.
This work was partially funded by the European Union’s Horizon 2020 and the German Ministry of Education and Research (
BMBF) under the ``Adaptive multi-tier intelligent data manager for Exascale (ADMIRE)'' project (https://www.admire-eurohpc.eu/); Grant Agreement number:
BMBF) under the ``Adaptive multi-tier intelligent data manager for Exascale (ADMIRE)''
project (https://www.admire-eurohpc.eu/); Grant Agreement number:
956748-ADMIRE-H2020-JTI-EuroHPC-2019-1. Further, this work was partially supported by the Spanish Ministry of Economy
and Competitiveness (MINECO) under grants PID2019-107255GB, and the Generalitat de Catalunya under contract
2021-SGR-00412. This publication is part of the project ADMIRE PCI2021-121952, funded by MCIN/AEI/10.13039/501100011033.
......@@ -218,3 +218,89 @@ the logging subsystem to truncate the file used for logging, rather than append
For the daemon, the `GKFS_DAEMON_LOG_PATH=<path/to/file>` environment variable can be provided to set the path to the
log file, and the log module can be selected with the `GKFS_DAEMON_LOG_LEVEL={off,critical,err,warn,info,debug,trace}`
environment variable whereas `trace` produces the most trace records while `info` is the default value.
## Miscellaneous
### External functions
GekkoFS allows to use external functions on your client code, via LD_PRELOAD.
Source code needs to be compiled with -fPIC. We include a pfind io500 substitution,
`examples/gfind/gfind.cpp` and a non-mpi version `examples/gfind/sfind.cpp`
### Data distributors
The data distribution can be selected at compilation time, we have 2 distributors available:
#### Simple Hash (Default)
Chunks are distributed randomly to the different GekkoFS servers.
#### Guided Distributor
The guided distributor allows defining a specific distribution of data on a per directory or file basis.
The distribution configurations are defined within a shared file (called `guided_config.txt` henceforth) with the
following format:
`<path> <chunk_number> <host>`
To enable the distributor, the following CMake compilation flags are required:
* `GKFS_USE_GUIDED_DISTRIBUTION` ON
* `GKFS_USE_GUIDED_DISTRIBUTION_PATH` `<path_guided_config.txt>`
To use a custom distribution, a path needs to have the prefix `#` (e.g., `#/mdt-hard 0 0`), in which all the data of all
files in that directory goes to the same place as the metadata.
Note, that a chunk/host configuration is inherited to all children files automatically even if not using the prefix.
In this example, `/mdt-hard/file1` is therefore also using the same distribution as the `/mdt-hard` directory.
If no prefix is used, the Simple Hash distributor is used.
##### Guided configuration file
Creating a guided configuration file is based on an I/O trace file of a previous execution of the application.
For this the `trace_reads` tracing module is used (see above).
The `trace_reads` module enables a `TRACE_READS` level log at the clients writing the I/O information of the client
which is used as the input for a script that creates the guided distributor setting.
Note that capturing the necessary trace records can involve performance degradation.
To capture the I/O of each client within a SLURM environment, i.e., enabling the `trace_reads` module and print its
output to a user-defined path, the following example can be used:
`srun -N 10 -n 320 --export="ALL" /bin/bash -c "export LIBGKFS_LOG=trace_reads;LIBGKFS_LOG_OUTPUT=${HOME}/test/GLOBAL.txt;LD_PRELOAD=${GKFS_PRLD} <app>"`
Then, the `examples/distributors/guided/generate.py` scrpt is used to create the guided distributor configuration file:
* `python examples/distributors/guided/generate.py ~/test/GLOBAL.txt >> guided_config.txt`
Finally, modify `guided_config.txt` to your distribution requirements.
### Metadata Backends
There are two different metadata backends in GekkoFS. The default one uses `rocksdb`, however an alternative based
on `PARALLAX` from `FORTH`
is available. To enable it use the `-DGKFS_ENABLE_PARALLAX:BOOL=ON` option, you can also disable `rocksdb`
with `-DGKFS_ENABLE_ROCKSDB:BOOL=OFF`.
Once it is enabled, `--dbbackend` option will be functional.
### Statistics
GekkoFS daemons are able to output general operations (`--enable-collection`) and data chunk
statistics (`--enable-chunkstats`) to a specified output file via `--output-stats <FILE>`. Prometheus can also be used
instead or in addition to the output file. It must be enabled at compile time via the CMake
argument `-DGKFS_ENABLE_PROMETHEUS` and the daemon argument `--enable-prometheus`. The corresponding statistics are then
pushed to the Prometheus instance.
### Advanced experimental features
#### Rename
`-DGKFS_RENAME_SUPPORT` allows the application to rename files.
This is an experimental feature, and some scenarios may not work properly.
Support for fstat in renamed files is included.
This is disabled by default.
#### Replication
The user can enable the data replication feature by setting the replication environment variable:
`LIBGKFS_NUM_REPL=<num repl>`.
The number of replicas should go from `0` to the `number of servers - 1`. The replication environment variable can be
set up for each client independently.
\ No newline at end of file
......@@ -52,7 +52,7 @@ static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE");
#ifdef GKFS_ENABLE_FORWARDING
static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE");
#endif
static constexpr auto NUM_REPL = ADD_PREFIX("NUM_REPL");
} // namespace gkfs::env
#undef ADD_PREFIX
......
......@@ -105,6 +105,7 @@ private:
bool internal_fds_must_relocate_;
std::bitset<MAX_USER_FDS> protected_fds_;
std::string hostname;
int replicas_;
public:
static PreloadContext*
......@@ -216,6 +217,12 @@ public:
std::string
get_hostname();
void
set_replicas(const int repl);
int
get_replicas();
};
} // namespace preload
......
......@@ -30,6 +30,9 @@
#ifndef GEKKOFS_CLIENT_FORWARD_DATA_HPP
#define GEKKOFS_CLIENT_FORWARD_DATA_HPP
#include <string>
#include <memory>
#include <set>
namespace gkfs::rpc {
struct ChunkStat {
......@@ -43,14 +46,16 @@ struct ChunkStat {
std::pair<int, ssize_t>
forward_write(const std::string& path, const void* buf, off64_t offset,
size_t write_size);
size_t write_size, const int8_t num_copy = 0);
std::pair<int, ssize_t>
forward_read(const std::string& path, void* buf, off64_t offset,
size_t read_size);
size_t read_size, const int8_t num_copies,
std::set<int8_t>& failed);
int
forward_truncate(const std::string& path, size_t current_size, size_t new_size);
forward_truncate(const std::string& path, size_t current_size, size_t new_size,
const int8_t num_copies);
std::pair<int, ChunkStat>
forward_get_chunk_stat();
......
......@@ -50,10 +50,10 @@ class Metadata;
namespace rpc {
int
forward_create(const std::string& path, mode_t mode);
forward_create(const std::string& path, mode_t mode, const int copy);
int
forward_stat(const std::string& path, std::string& attr);
forward_stat(const std::string& path, std::string& attr, const int copy);
#ifdef HAS_RENAME
int
......@@ -62,22 +62,24 @@ forward_rename(const std::string& oldpath, const std::string& newpath,
#endif // HAS_RENAME
int
forward_remove(const std::string& path);
forward_remove(const std::string& path, const int8_t num_copies);
int
forward_decr_size(const std::string& path, size_t length);
forward_decr_size(const std::string& path, size_t length, const int copy);
int
forward_update_metadentry(
const std::string& path, const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags);
forward_update_metadentry(const std::string& path,
const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags,
const int copy);
std::pair<int, off64_t>
forward_update_metadentry_size(const std::string& path, size_t size,
off64_t offset, bool append_flag);
off64_t offset, bool append_flag,
const int num_copies);
std::pair<int, off64_t>
forward_get_metadentry_size(const std::string& path);
forward_get_metadentry_size(const std::string& path, const int copy);
std::pair<int, std::shared_ptr<gkfs::filemap::OpenDir>>
forward_get_dirents(const std::string& path);
......
......@@ -1469,11 +1469,11 @@ struct write_data {
public:
input(const std::string& path, int64_t offset, uint64_t host_id,
uint64_t host_size, uint64_t chunk_n, uint64_t chunk_start,
uint64_t chunk_end, uint64_t total_chunk_size,
const hermes::exposed_memory& buffers)
uint64_t host_size, const std::string& wbitset, uint64_t chunk_n,
uint64_t chunk_start, uint64_t chunk_end,
uint64_t total_chunk_size, const hermes::exposed_memory& buffers)
: m_path(path), m_offset(offset), m_host_id(host_id),
m_host_size(host_size), m_chunk_n(chunk_n),
m_host_size(host_size), m_wbitset(wbitset), m_chunk_n(chunk_n),
m_chunk_start(chunk_start), m_chunk_end(chunk_end),
m_total_chunk_size(total_chunk_size), m_buffers(buffers) {}
......@@ -1512,6 +1512,11 @@ struct write_data {
return m_chunk_n;
}
std::string
wbitset() const {
return m_wbitset;
}
uint64_t
chunk_start() const {
return m_chunk_start;
......@@ -1535,15 +1540,16 @@ struct write_data {
explicit input(const rpc_write_data_in_t& other)
: m_path(other.path), m_offset(other.offset),
m_host_id(other.host_id), m_host_size(other.host_size),
m_chunk_n(other.chunk_n), m_chunk_start(other.chunk_start),
m_chunk_end(other.chunk_end),
m_wbitset(other.wbitset), m_chunk_n(other.chunk_n),
m_chunk_start(other.chunk_start), m_chunk_end(other.chunk_end),
m_total_chunk_size(other.total_chunk_size),
m_buffers(other.bulk_handle) {}
explicit operator rpc_write_data_in_t() {
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_chunk_n, m_chunk_start,
m_chunk_end, m_total_chunk_size, hg_bulk_t(m_buffers)};
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_wbitset.c_str(), m_chunk_n,
m_chunk_start, m_chunk_end, m_total_chunk_size,
hg_bulk_t(m_buffers)};
}
private:
......@@ -1551,6 +1557,7 @@ struct write_data {
int64_t m_offset;
uint64_t m_host_id;
uint64_t m_host_size;
std::string m_wbitset;
uint64_t m_chunk_n;
uint64_t m_chunk_start;
uint64_t m_chunk_end;
......@@ -1647,11 +1654,11 @@ struct read_data {
public:
input(const std::string& path, int64_t offset, uint64_t host_id,
uint64_t host_size, uint64_t chunk_n, uint64_t chunk_start,
uint64_t chunk_end, uint64_t total_chunk_size,
const hermes::exposed_memory& buffers)
uint64_t host_size, const std::string& wbitset, uint64_t chunk_n,
uint64_t chunk_start, uint64_t chunk_end,
uint64_t total_chunk_size, const hermes::exposed_memory& buffers)
: m_path(path), m_offset(offset), m_host_id(host_id),
m_host_size(host_size), m_chunk_n(chunk_n),
m_host_size(host_size), m_wbitset(wbitset), m_chunk_n(chunk_n),
m_chunk_start(chunk_start), m_chunk_end(chunk_end),
m_total_chunk_size(total_chunk_size), m_buffers(buffers) {}
......@@ -1685,6 +1692,11 @@ struct read_data {
return m_host_size;
}
std::string
wbitset() const {
return m_wbitset;
}
uint64_t
chunk_n() const {
return m_chunk_n;
......@@ -1713,15 +1725,16 @@ struct read_data {
explicit input(const rpc_read_data_in_t& other)
: m_path(other.path), m_offset(other.offset),
m_host_id(other.host_id), m_host_size(other.host_size),
m_chunk_n(other.chunk_n), m_chunk_start(other.chunk_start),
m_chunk_end(other.chunk_end),
m_wbitset(other.wbitset), m_chunk_n(other.chunk_n),
m_chunk_start(other.chunk_start), m_chunk_end(other.chunk_end),
m_total_chunk_size(other.total_chunk_size),
m_buffers(other.bulk_handle) {}
explicit operator rpc_read_data_in_t() {
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_chunk_n, m_chunk_start,
m_chunk_end, m_total_chunk_size, hg_bulk_t(m_buffers)};
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_wbitset.c_str(), m_chunk_n,
m_chunk_start, m_chunk_end, m_total_chunk_size,
hg_bulk_t(m_buffers)};
}
private:
......@@ -1729,6 +1742,7 @@ struct read_data {
int64_t m_offset;
uint64_t m_host_id;
uint64_t m_host_size;
std::string m_wbitset;
uint64_t m_chunk_n;
uint64_t m_chunk_start;
uint64_t m_chunk_end;
......
......@@ -37,7 +37,6 @@
#include <string>
#include <cstdint>
namespace gkfs::metadata {
constexpr mode_t LINK_MODE = ((S_IRWXU | S_IRWXG | S_IRWXO) | S_IFLNK);
......
......@@ -48,15 +48,20 @@ public:
localhost() const = 0;
virtual host_t
locate_data(const std::string& path, const chunkid_t& chnk_id) const = 0;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const = 0;
// TODO: We need to pass hosts_size in the server side, because the number
// of servers are not defined (in startup)
virtual unsigned int
hosts_size() const = 0;
virtual host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) = 0;
unsigned int hosts_size, const int num_copy) = 0;
virtual host_t
locate_file_metadata(const std::string& path) const = 0;
locate_file_metadata(const std::string& path, const int num_copy) const = 0;
virtual std::vector<host_t>
locate_directory_metadata(const std::string& path) const = 0;
......@@ -75,19 +80,23 @@ public:
SimpleHashDistributor(host_t localhost, unsigned int hosts_size);
unsigned int
hosts_size() const override;
host_t
localhost() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size);
unsigned int host_size, const int num_copy);
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......@@ -96,6 +105,7 @@ public:
class LocalOnlyDistributor : public Distributor {
private:
host_t localhost_;
unsigned int hosts_size_{0};
public:
explicit LocalOnlyDistributor(host_t localhost);
......@@ -103,12 +113,16 @@ public:
host_t
localhost() const override;
unsigned int
hosts_size() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override;
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......@@ -117,7 +131,7 @@ public:
class ForwarderDistributor : public Distributor {
private:
host_t fwd_host_;
unsigned int hosts_size_;
unsigned int hosts_size_{0};
std::vector<host_t> all_hosts_;
std::hash<std::string> str_hash;
......@@ -127,16 +141,20 @@ public:
host_t
localhost() const override final;
unsigned int
hosts_size() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override final;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override final;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size) override final;
unsigned int host_size, const int num_copy) override final;
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......@@ -176,16 +194,20 @@ public:
host_t
localhost() const override;
unsigned int
hosts_size() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size);
unsigned int host_size, const int num_copy);
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......
......@@ -89,9 +89,9 @@ MERCURY_GEN_PROC(
rpc_read_data_in_t,
((hg_const_string_t) (path))((int64_t) (offset))(
(hg_uint64_t) (host_id))((hg_uint64_t) (host_size))(
(hg_uint64_t) (chunk_n))((hg_uint64_t) (chunk_start))(
(hg_uint64_t) (chunk_end))((hg_uint64_t) (total_chunk_size))(
(hg_bulk_t) (bulk_handle)))
(hg_const_string_t) (wbitset))((hg_uint64_t) (chunk_n))(
(hg_uint64_t) (chunk_start))((hg_uint64_t) (chunk_end))(
(hg_uint64_t) (total_chunk_size))((hg_bulk_t) (bulk_handle)))
MERCURY_GEN_PROC(rpc_data_out_t, ((int32_t) (err))((hg_size_t) (io_size)))
......@@ -99,9 +99,9 @@ MERCURY_GEN_PROC(
rpc_write_data_in_t,
((hg_const_string_t) (path))((int64_t) (offset))(
(hg_uint64_t) (host_id))((hg_uint64_t) (host_size))(
(hg_uint64_t) (chunk_n))((hg_uint64_t) (chunk_start))(
(hg_uint64_t) (chunk_end))((hg_uint64_t) (total_chunk_size))(
(hg_bulk_t) (bulk_handle)))
(hg_const_string_t) (wbitset))((hg_uint64_t) (chunk_n))(
(hg_uint64_t) (chunk_start))((hg_uint64_t) (chunk_end))(
(hg_uint64_t) (total_chunk_size))((hg_bulk_t) (bulk_handle)))
MERCURY_GEN_PROC(rpc_get_dirents_in_t,
((hg_const_string_t) (path))((hg_bulk_t) (bulk_handle)))
......
......@@ -35,6 +35,9 @@ extern "C" {
}
#include <string>
#include <iostream>
#include <sstream>
#include <vector>
namespace gkfs::rpc {
......@@ -49,6 +52,18 @@ std::string
get_host_by_name(const std::string& hostname);
#endif
bool
get_bitset(const std::vector<uint8_t>& data, const uint16_t position);
void
set_bitset(std::vector<uint8_t>& data, const uint16_t position);
std::string
compress_bitset(const std::vector<uint8_t>& bytes);
std::vector<uint8_t>
decompress_bitset(const std::string& compressedString);
} // namespace gkfs::rpc
#endif // GEKKOFS_COMMON_RPC_UTILS_HPP
......@@ -298,11 +298,21 @@ gkfs_create(const std::string& path, mode_t mode) {
if(check_parent_dir(path)) {
return -1;
}
auto err = gkfs::rpc::forward_create(path, mode);
if(err) {
errno = err;
// Write to all replicas, at least one need to success
bool success = false;
for(auto copy = 0; copy < CTX->get_replicas() + 1; copy++) {
auto err = gkfs::rpc::forward_create(path, mode, copy);
if(err) {
errno = err;
} else {
success = true;
errno = 0;
}
}
if(!success) {
return -1;
}
return 0;
}
......@@ -340,7 +350,7 @@ gkfs_remove(const std::string& path) {
return -1;
}
}
auto err = gkfs::rpc::forward_remove(new_path);
auto err = gkfs::rpc::forward_remove(new_path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......@@ -350,7 +360,7 @@ gkfs_remove(const std::string& path) {
#endif // HAS_RENAME
#endif // HAS_SYMLINKS
auto err = gkfs::rpc::forward_remove(path);
auto err = gkfs::rpc::forward_remove(path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......@@ -406,6 +416,7 @@ gkfs_access(const std::string& path, const int mask, bool follow_links) {
* We use blocks to determine if the file is a renamed file.
* If the file is re-renamed (a->b->a) a recovers the block of b
* and we delete b.
* There is no support for replication in rename
* @param old_path
* @param new_path
* @return 0 on success, -1 on failure
......@@ -441,14 +452,14 @@ gkfs_rename(const string& old_path, const string& new_path) {
md_old.value().target_path("");
auto err = gkfs::rpc::forward_update_metadentry(
new_path, md_old.value(), flags);
new_path, md_old.value(), flags, 0);
if(err) {
errno = err;
return -1;
}
// Delete old file
err = gkfs::rpc::forward_remove(old_path);
err = gkfs::rpc::forward_remove(old_path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......@@ -674,7 +685,9 @@ gkfs_lseek(shared_ptr<gkfs::filemap::OpenFile> gkfs_fd, off_t offset,
gkfs_fd->pos(gkfs_fd->pos() + offset);
break;
case SEEK_END: {
auto ret = gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path());
// TODO: handle replicas
auto ret =
gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path(), 0);
auto err = ret.first;
if(err) {
errno = err;
......@@ -723,14 +736,17 @@ gkfs_truncate(const std::string& path, off_t old_size, off_t new_size) {
if(new_size == old_size) {
return 0;
}
auto err = gkfs::rpc::forward_decr_size(path, new_size);
if(err) {
LOG(DEBUG, "Failed to decrease size");
errno = err;
return -1;
for(auto copy = 0; copy < (CTX->get_replicas() + 1); copy++) {
auto err = gkfs::rpc::forward_decr_size(path, new_size, copy);
if(err) {
LOG(DEBUG, "Failed to decrease size");
errno = err;
return -1;
}
}
err = gkfs::rpc::forward_truncate(path, old_size, new_size);
auto err = gkfs::rpc::forward_truncate(path, old_size, new_size,
CTX->get_replicas());
if(err) {
LOG(DEBUG, "Failed to truncate data");
errno = err;
......@@ -864,9 +880,11 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
}
auto path = make_unique<string>(file->path());
auto is_append = file->get_flag(gkfs::filemap::OpenFile_flags::append);
auto write_size = 0;
auto num_replicas = CTX->get_replicas();
auto ret_offset = gkfs::rpc::forward_update_metadentry_size(
*path, count, offset, is_append);
*path, count, offset, is_append, num_replicas);
auto err = ret_offset.first;
if(err) {
LOG(ERROR, "update_metadentry_size() failed with err '{}'", err);
......@@ -888,8 +906,22 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
offset = ret_offset.second;
}
auto ret_write = gkfs::rpc::forward_write(*path, buf, offset, count);
auto ret_write = gkfs::rpc::forward_write(*path, buf, offset, count, 0);
err = ret_write.first;
write_size = ret_write.second;
if(num_replicas > 0) {
auto ret_write_repl = gkfs::rpc::forward_write(*path, buf, offset,
count, num_replicas);
if(err and ret_write_repl.first == 0) {
// We succesfully write the data to some replica
err = ret_write_repl.first;
// Write size will be wrong
write_size = ret_write_repl.second;
}
}
if(err) {
LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err);
errno = err;
......@@ -897,14 +929,14 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
}
if(update_pos) {
// Update offset in file descriptor in the file map
file->pos(offset + ret_write.second);
file->pos(offset + write_size);
}
if(static_cast<size_t>(ret_write.second) != count) {
if(static_cast<size_t>(write_size) != count) {
LOG(WARNING,
"gkfs::rpc::forward_write() wrote '{}' bytes instead of '{}'",
ret_write.second, count);
write_size, count);
}
return ret_write.second; // return written size
return write_size; // return written size
}
/**
......@@ -1024,7 +1056,24 @@ gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf,
if constexpr(gkfs::config::io::zero_buffer_before_read) {
memset(buf, 0, sizeof(char) * count);
}
auto ret = gkfs::rpc::forward_read(file->path(), buf, offset, count);
std::pair<int, off_t> ret;
std::set<int8_t> failed; // set with failed targets.
if(CTX->get_replicas() != 0) {
ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
CTX->get_replicas(), failed);
while(ret.first == EIO) {
ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
CTX->get_replicas(), failed);
LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'",
ret.first);
}
} else {
ret = gkfs::rpc::forward_read(file->path(), buf, offset, count, 0,
failed);
}
auto err = ret.first;
if(err) {
LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'", err);
......@@ -1192,7 +1241,7 @@ gkfs_rmdir(const std::string& path) {
errno = ENOTEMPTY;
return -1;
}
err = gkfs::rpc::forward_remove(path);
err = gkfs::rpc::forward_remove(path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......
......@@ -37,6 +37,8 @@
#include <common/rpc/distributor.hpp>
#include <common/common_defs.hpp>
#include <ctime>
#include <cstdlib>
#include <fstream>
#include <hermes.hpp>
......@@ -238,6 +240,11 @@ init_environment() {
EXIT_FAILURE,
"Unable to fetch file system configurations from daemon process through RPC.");
}
// Initialize random number generator and seed for replica selection
// in case of failure, a new replica will be selected
if(CTX->get_replicas() > 0) {
srand(time(nullptr));
}
LOG(INFO, "Environment initialization successful.");
}
......@@ -277,6 +284,7 @@ init_preload() {
gkfs::path::init_cwd();
LOG(DEBUG, "Current working directory: '{}'", CTX->cwd());
LOG(DEBUG, "Number of replicas : '{}'", CTX->get_replicas());
gkfs::preload::init_environment();
CTX->enable_interception();
......
......@@ -65,6 +65,8 @@ PreloadContext::PreloadContext()
char host[255];
gethostname(host, 255);
hostname = host;
PreloadContext::set_replicas(
std::stoi(gkfs::env::get_var(gkfs::env::NUM_REPL, "0")));
}
void
......@@ -452,5 +454,15 @@ PreloadContext::get_hostname() {
return hostname;
}
void
PreloadContext::set_replicas(const int repl) {
replicas_ = repl;
}
int
PreloadContext::get_replicas() {
return replicas_;
}
} // namespace preload
} // namespace gkfs
......@@ -200,16 +200,26 @@ namespace gkfs::utils {
optional<gkfs::metadata::Metadata>
get_metadata(const string& path, bool follow_links) {
std::string attr;
auto err = gkfs::rpc::forward_stat(path, attr);
auto err = gkfs::rpc::forward_stat(path, attr, 0);
// TODO: retry on failure
if(err) {
errno = err;
return {};
auto copy = 1;
while(copy < CTX->get_replicas() + 1 && err) {
LOG(ERROR, "Retrying Stat on replica {} {}", copy, follow_links);
err = gkfs::rpc::forward_stat(path, attr, copy);
copy++;
}
if(err) {
errno = err;
return {};
}
}
#ifdef HAS_SYMLINKS
if(follow_links) {
gkfs::metadata::Metadata md{attr};
while(md.is_link()) {
err = gkfs::rpc::forward_stat(md.target_path(), attr);
err = gkfs::rpc::forward_stat(md.target_path(), attr, 0);
if(err) {
errno = err;
return {};
......
......@@ -34,6 +34,7 @@
#include <common/rpc/distributor.hpp>
#include <common/arithmetic/arithmetic.hpp>
#include <common/rpc/rpc_util.hpp>
#include <unordered_set>
......@@ -42,21 +43,26 @@ using namespace std;
namespace gkfs::rpc {
/*
* This file includes all metadata RPC calls.
* This file includes all data RPC calls.
* NOTE: No errno is defined here!
*/
/**
* Send an RPC request to write from a buffer.
* There is a bitset of 1024 chunks to tell the server
* which chunks to process. Exceeding this value will work without
* replication. Another way is to leverage mercury segments.
* TODO: Decide how to manage a write to a replica that doesn't exist
* @param path
* @param buf
* @param append_flag
* @param write_size
* @param num_copies number of replicas
* @return pair<error code, written size>
*/
pair<int, ssize_t>
forward_write(const string& path, const void* buf, const off64_t offset,
const size_t write_size) {
const size_t write_size, const int8_t num_copies) {
// import pow2-optimized arithmetic functions
using namespace gkfs::utils::arithmetic;
......@@ -69,35 +75,50 @@ forward_write(const string& path, const void* buf, const off64_t offset,
auto chnk_end = block_index((offset + write_size) - 1,
gkfs::config::rpc::chunksize);
auto chnk_total = (chnk_end - chnk_start) + 1;
// Collect all chunk ids within count that have the same destination so
// that those are send in one rpc bulk transfer
std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
// contains the target ids, used to access the target_chnks map.
// First idx is chunk with potential offset
std::vector<uint64_t> targets{};
// targets for the first and last chunk as they need special treatment
uint64_t chnk_start_target = 0;
uint64_t chnk_end_target = 0;
// We need a set to manage replicas.
std::set<uint64_t> chnk_start_target{};
std::set<uint64_t> chnk_end_target{};
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
auto target = CTX->distributor()->locate_data(path, chnk_id);
std::unordered_map<uint64_t, std::vector<uint8_t>> write_ops_vect;
if(target_chnks.count(target) == 0) {
target_chnks.insert(
std::make_pair(target, std::vector<uint64_t>{chnk_id}));
targets.push_back(target);
} else {
target_chnks[target].push_back(chnk_id);
}
// If num_copies is 0, we do the normal write operation. Otherwise
// we process all the replicas.
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
for(auto copy = num_copies ? 1 : 0; copy < num_copies + 1; copy++) {
auto target = CTX->distributor()->locate_data(path, chnk_id, copy);
if(write_ops_vect.find(target) == write_ops_vect.end())
write_ops_vect[target] =
std::vector<uint8_t>(((chnk_total + 7) / 8));
gkfs::rpc::set_bitset(write_ops_vect[target], chnk_id - chnk_start);
if(target_chnks.count(target) == 0) {
target_chnks.insert(
std::make_pair(target, std::vector<uint64_t>{chnk_id}));
targets.push_back(target);
} else {
target_chnks[target].push_back(chnk_id);
}
// set first and last chnk targets
if(chnk_id == chnk_start) {
chnk_start_target = target;
}
// set first and last chnk targets
if(chnk_id == chnk_start) {
chnk_start_target.insert(target);
}
if(chnk_id == chnk_end) {
chnk_end_target = target;
if(chnk_id == chnk_end) {
chnk_end_target.insert(target);
}
}
}
......@@ -133,13 +154,13 @@ forward_write(const string& path, const void* buf, const off64_t offset,
target_chnks[target].size() * gkfs::config::rpc::chunksize;
// receiver of first chunk must subtract the offset from first chunk
if(target == chnk_start_target) {
if(chnk_start_target.end() != chnk_start_target.find(target)) {
total_chunk_size -=
block_overrun(offset, gkfs::config::rpc::chunksize);
}
// receiver of last chunk must subtract
if(target == chnk_end_target &&
if(chnk_end_target.end() != chnk_end_target.find(target) &&
!is_aligned(offset + write_size, gkfs::config::rpc::chunksize)) {
total_chunk_size -= block_underrun(offset + write_size,
gkfs::config::rpc::chunksize);
......@@ -148,7 +169,6 @@ forward_write(const string& path, const void* buf, const off64_t offset,
auto endp = CTX->hosts().at(target);
try {
LOG(DEBUG, "Sending RPC ...");
gkfs::rpc::write_data::input in(
......@@ -158,6 +178,7 @@ forward_write(const string& path, const void* buf, const off64_t offset,
block_overrun(offset, gkfs::config::rpc::chunksize), target,
CTX->hosts().size(),
// number of chunks handled by that destination
gkfs::rpc::compress_bitset(write_ops_vect[target]),
target_chnks[target].size(),
// chunk start id of this write
chnk_start,
......@@ -175,25 +196,26 @@ forward_write(const string& path, const void* buf, const off64_t offset,
ld_network_service->post<gkfs::rpc::write_data>(endp, in));
LOG(DEBUG,
"host: {}, path: \"{}\", chunks: {}, size: {}, offset: {}",
target, path, in.chunk_n(), total_chunk_size, in.offset());
"host: {}, path: \"{}\", chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}",
target, path, chnk_start, chnk_end, in.chunk_n(),
total_chunk_size, in.offset());
} catch(const std::exception& ex) {
LOG(ERROR,
"Unable to send non-blocking rpc for "
"path \"{}\" [peer: {}]",
path, target);
return make_pair(EBUSY, 0);
if(num_copies == 0)
return make_pair(EBUSY, 0);
}
}
// Wait for RPC responses and then get response and add it to out_size
// which is the written size All potential outputs are served to free
// resources regardless of errors, although an errorcode is set.
auto err = 0;
ssize_t out_size = 0;
std::size_t idx = 0;
#ifdef REPLICA_CHECK
std::vector<uint8_t> fill(chnk_total);
auto write_ops = write_ops_vect.begin();
#endif
for(const auto& h : handles) {
try {
// XXX We might need a timeout here to not wait forever for an
......@@ -203,18 +225,52 @@ forward_write(const string& path, const void* buf, const off64_t offset,
if(out.err() != 0) {
LOG(ERROR, "Daemon reported error: {}", out.err());
err = out.err();
} else {
out_size += static_cast<size_t>(out.io_size());
#ifdef REPLICA_CHECK
if(num_copies) {
if(fill.size() == 0) {
fill = write_ops->second;
} else {
for(size_t i = 0; i < fill.size(); i++) {
fill[i] |= write_ops->second[i];
}
}
}
write_ops++;
#endif
}
out_size += static_cast<size_t>(out.io_size());
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
path, targets[idx]);
err = EIO;
}
idx++;
}
// As servers can fail (and we cannot know if the total data is written), we
// send the updated size but check that at least one copy of all chunks are
// processed.
if(num_copies) {
// A bit-wise or should show that all the chunks are written (255)
out_size = write_size;
#ifdef REPLICA_CHECK
for(size_t i = 0; i < fill.size() - 1; i++) {
if(fill[i] != 255) {
err = EIO;
break;
}
}
// Process the leftover bytes
for(uint64_t chnk_id = (chnk_start + (fill.size() - 1) * 8);
chnk_id <= chnk_end; chnk_id++) {
if(!(fill[(chnk_id - chnk_start) / 8] &
(1 << ((chnk_id - chnk_start) % 8)))) {
err = EIO;
break;
}
}
#endif
}
/*
* Typically file systems return the size even if only a part of it was
* written. In our case, we do not keep track which daemon fully wrote its
......@@ -232,11 +288,14 @@ forward_write(const string& path, const void* buf, const off64_t offset,
* @param buf
* @param offset
* @param read_size
* @param num_copies number of copies available (0 is no replication)
* @param failed nodes failed that should not be used
* @return pair<error code, read size>
*/
pair<int, ssize_t>
forward_read(const string& path, void* buf, const off64_t offset,
const size_t read_size) {
const size_t read_size, const int8_t num_copies,
std::set<int8_t>& failed) {
// import pow2-optimized arithmetic functions
using namespace gkfs::utils::arithmetic;
......@@ -246,19 +305,35 @@ forward_read(const string& path, void* buf, const off64_t offset,
auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
auto chnk_end =
block_index((offset + read_size - 1), gkfs::config::rpc::chunksize);
auto chnk_total = (chnk_end - chnk_start) + 1;
// Collect all chunk ids within count that have the same destination so
// that those are send in one rpc bulk transfer
std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
// contains the recipient ids, used to access the target_chnks map.
// First idx is chunk with potential offset
std::vector<uint64_t> targets{};
// targets for the first and last chunk as they need special treatment
uint64_t chnk_start_target = 0;
uint64_t chnk_end_target = 0;
std::unordered_map<uint64_t, std::vector<uint8_t>> read_bitset_vect;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
auto target = CTX->distributor()->locate_data(path, chnk_id);
auto target = CTX->distributor()->locate_data(path, chnk_id, 0);
if(num_copies > 0) {
// If we have some failures we select another copy (randomly).
while(failed.find(target) != failed.end()) {
LOG(DEBUG, "Selecting another node, target: {} down", target);
target = CTX->distributor()->locate_data(path, chnk_id,
rand() % num_copies);
}
}
if(read_bitset_vect.find(target) == read_bitset_vect.end())
read_bitset_vect[target] =
std::vector<uint8_t>(((chnk_total + 7) / 8));
read_bitset_vect[target][(chnk_id - chnk_start) / 8] |=
1 << ((chnk_id - chnk_start) % 8); // set
if(target_chnks.count(target) == 0) {
target_chnks.insert(
......@@ -303,6 +378,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
// TODO(amiranda): This could be simplified by adding a vector of inputs
// to async_engine::broadcast(). This would allow us to avoid manually
// looping over handles as we do below
for(const auto& target : targets) {
// total chunk_size for target
......@@ -334,6 +410,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
// a potential offset
block_overrun(offset, gkfs::config::rpc::chunksize), target,
CTX->hosts().size(),
gkfs::rpc::compress_bitset(read_bitset_vect[target]),
// number of chunks handled by that destination
target_chnks[target].size(),
// chunk start id of this write
......@@ -343,11 +420,12 @@ forward_read(const string& path, void* buf, const off64_t offset,
// total size to write
total_chunk_size, local_buffers);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::read_data>(endp, in));
......@@ -394,9 +472,15 @@ forward_read(const string& path, void* buf, const off64_t offset,
LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
path, targets[idx]);
err = EIO;
// We should get targets[idx] and remove from the list of peers
failed.insert(targets[idx]);
// Then repeat the read with another peer (We repear the full
// read, this can be optimised but it is a cornercase)
}
idx++;
}
/*
* Typically file systems return the size even if only a part of it was
* read. In our case, we do not keep track which daemon fully read its
......@@ -413,11 +497,12 @@ forward_read(const string& path, void* buf, const off64_t offset,
* @param path
* @param current_size
* @param new_size
* @param num_copies Number of replicas
* @return error code
*/
int
forward_truncate(const std::string& path, size_t current_size,
size_t new_size) {
forward_truncate(const std::string& path, size_t current_size, size_t new_size,
const int8_t num_copies) {
// import pow2-optimized arithmetic functions
using namespace gkfs::utils::arithmetic;
......@@ -434,7 +519,9 @@ forward_truncate(const std::string& path, size_t current_size,
std::unordered_set<unsigned int> hosts;
for(unsigned int chunk_id = chunk_start; chunk_id <= chunk_end;
++chunk_id) {
hosts.insert(CTX->distributor()->locate_data(path, chunk_id));
for(auto copy = 0; copy < (num_copies + 1); ++copy) {
hosts.insert(CTX->distributor()->locate_data(path, chunk_id, copy));
}
}
std::vector<hermes::rpc_handle<gkfs::rpc::trunc_data>> handles;
......@@ -450,20 +537,23 @@ forward_truncate(const std::string& path, size_t current_size,
gkfs::rpc::trunc_data::input in(path, new_size);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::trunc_data>(endp, in));
} catch(const std::exception& ex) {
// TODO(amiranda): we should cancel all previously posted requests
// here, unfortunately, Hermes does not support it yet :/
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG(ERROR, "Failed to send request to host: {}", host);
err = EIO;
break; // We need to gather all responses so we can't return here
break; // We need to gather all responses so we can't return
// here
}
}
......@@ -503,20 +593,23 @@ forward_get_chunk_stat() {
gkfs::rpc::chunk_stat::input in(0);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::chunk_stat>(endp, in));
} catch(const std::exception& ex) {
// TODO(amiranda): we should cancel all previously posted requests
// here, unfortunately, Hermes does not support it yet :/
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG(ERROR, "Failed to send request to host: {}", endp.to_string());
err = EBUSY;
break; // We need to gather all responses so we can't return here
break; // We need to gather all responses so we can't return
// here
}
}
......@@ -547,9 +640,11 @@ forward_get_chunk_stat() {
chunk_free += out.chunk_free();
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to get RPC output from host: {}", i);
err = EBUSY;
// Avoid setting err if a server fails.
// err = EBUSY;
}
}
if(err)
return make_pair(err, ChunkStat{});
else
......
......@@ -45,19 +45,30 @@ forward_get_fs_config() {
auto endp = CTX->hosts().at(CTX->local_host_id());
gkfs::rpc::fs_config::output out;
try {
LOG(DEBUG, "Retrieving file system configurations from daemon");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
out = ld_network_service->post<gkfs::rpc::fs_config>(endp).get().at(0);
} catch(const std::exception& ex) {
LOG(ERROR, "Retrieving fs configurations from daemon");
return false;
bool found = false;
size_t idx = 0;
while(!found && idx <= CTX->hosts().size()) {
try {
LOG(DEBUG, "Retrieving file system configurations from daemon");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
out = ld_network_service->post<gkfs::rpc::fs_config>(endp).get().at(
0);
found = true;
} catch(const std::exception& ex) {
LOG(ERROR,
"Retrieving fs configurations from daemon, possible reattempt at peer: {}",
idx);
endp = CTX->hosts().at(idx++);
}
}
if(!found)
return false;
CTX->mountdir(out.mountdir());
LOG(INFO, "Mountdir: '{}'", CTX->mountdir());
......
......@@ -51,12 +51,14 @@ namespace gkfs::rpc {
* Send an RPC for a create request
* @param path
* @param mode
* @param copy Number of replica to create
* @return error code
*/
int
forward_create(const std::string& path, const mode_t mode) {
forward_create(const std::string& path, const mode_t mode, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -81,12 +83,14 @@ forward_create(const std::string& path, const mode_t mode) {
* Send an RPC for a stat request
* @param path
* @param attr
* @param copy metadata replica to read from
* @return error code
*/
int
forward_stat(const std::string& path, string& attr) {
forward_stat(const std::string& path, string& attr, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -121,40 +125,44 @@ forward_stat(const std::string& path, string& attr) {
* This function only attempts data removal if data exists (determined when
* metadata is removed)
* @param path
* @param num_copies Replication scenarios with many replicas
* @return error code
*/
int
forward_remove(const std::string& path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
forward_remove(const std::string& path, const int8_t num_copies) {
int64_t size = 0;
uint32_t mode = 0;
/*
* Send one RPC to metadata destination and remove metadata while retrieving
* size and mode to determine if data needs to removed too
*/
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::remove_metadata>(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
for(auto copy = 0; copy < (num_copies + 1); copy++) {
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
if(out.err())
return out.err();
size = out.size();
mode = out.mode();
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
/*
* Send one RPC to metadata destination and remove metadata while
* retrieving size and mode to determine if data needs to removed too
*/
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::remove_metadata>(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
if(out.err())
return out.err();
size = out.size();
mode = out.mode();
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
}
// if file is not a regular file and it's size is 0, data does not need to
// be removed, thus, we exit
......@@ -167,44 +175,54 @@ forward_remove(const std::string& path) {
// Small files
if(static_cast<std::size_t>(size / gkfs::config::rpc::chunksize) <
CTX->hosts().size()) {
const auto metadata_host_id =
CTX->distributor()->locate_file_metadata(path);
const auto endp_metadata = CTX->hosts().at(metadata_host_id);
try {
LOG(DEBUG, "Sending RPC to host: {}", endp_metadata.to_string());
gkfs::rpc::remove_data::input in(path);
handles.emplace_back(
ld_network_service->post<gkfs::rpc::remove_data>(
endp_metadata, in));
uint64_t chnk_start = 0;
uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
const auto chnk_host_id =
CTX->distributor()->locate_data(path, chnk_id);
if constexpr(gkfs::config::metadata::implicit_data_removal) {
/*
* If the chnk host matches the metadata host the remove
* request as already been sent as part of the metadata
* remove request.
*/
if(chnk_host_id == metadata_host_id)
continue;
}
const auto endp_chnk = CTX->hosts().at(chnk_host_id);
LOG(DEBUG, "Sending RPC to host: {}", endp_chnk.to_string());
for(auto copymd = 0; copymd < (num_copies + 1); copymd++) {
const auto metadata_host_id =
CTX->distributor()->locate_file_metadata(path, copymd);
const auto endp_metadata = CTX->hosts().at(metadata_host_id);
try {
LOG(DEBUG, "Sending RPC to host: {}",
endp_metadata.to_string());
gkfs::rpc::remove_data::input in(path);
handles.emplace_back(
ld_network_service->post<gkfs::rpc::remove_data>(
endp_chnk, in));
endp_metadata, in));
uint64_t chnk_start = 0;
uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end;
chnk_id++) {
for(auto copy = 0; copy < (num_copies + 1); copy++) {
const auto chnk_host_id =
CTX->distributor()->locate_data(path, chnk_id,
copy);
if constexpr(gkfs::config::metadata::
implicit_data_removal) {
/*
* If the chnk host matches the metadata host the
* remove request as already been sent as part of
* the metadata remove request.
*/
if(chnk_host_id == metadata_host_id)
continue;
}
const auto endp_chnk = CTX->hosts().at(chnk_host_id);
LOG(DEBUG, "Sending RPC to host: {}",
endp_chnk.to_string());
handles.emplace_back(
ld_network_service
->post<gkfs::rpc::remove_data>(
endp_chnk, in));
}
}
} catch(const std::exception& ex) {
LOG(ERROR,
"Failed to forward non-blocking rpc request reduced remove requests");
return EBUSY;
}
} catch(const std::exception& ex) {
LOG(ERROR,
"Failed to forward non-blocking rpc request reduced remove requests");
return EBUSY;
}
} else { // "Big" files
for(const auto& endp : CTX->hosts()) {
......@@ -260,12 +278,14 @@ forward_remove(const std::string& path) {
* during a truncate() call.
* @param path
* @param length
* @param copy Target replica (0 original)
* @return error code
*/
int
forward_decr_size(const std::string& path, size_t length) {
forward_decr_size(const std::string& path, size_t length, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -295,14 +315,17 @@ forward_decr_size(const std::string& path, size_t length) {
* @param path
* @param md
* @param md_flags
* @param copy Target replica (0 original)
* @return error code
*/
int
forward_update_metadentry(
const string& path, const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags) {
forward_update_metadentry(const string& path,
const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags,
const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -348,6 +371,7 @@ forward_update_metadentry(
* This marks that this file doesn't have to be accessed directly
* Create a new md with the new name, which should have as value the old name
* All operations should check blockcnt and extract a NOTEXISTS
* The operations does not support replication
* @param oldpath
* @param newpath
* @param md
......@@ -358,8 +382,8 @@ int
forward_rename(const string& oldpath, const string& newpath,
const gkfs::metadata::Metadata& md) {
auto endp =
CTX->hosts().at(CTX->distributor()->locate_file_metadata(oldpath));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(oldpath, 0));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -405,8 +429,8 @@ forward_rename(const string& oldpath, const string& newpath,
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto endp2 =
CTX->hosts().at(CTX->distributor()->locate_file_metadata(newpath));
auto endp2 = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(newpath, 0));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -479,53 +503,85 @@ forward_rename(const string& oldpath, const string& newpath,
/**
* Send an RPC request for an update to the file size.
* This is called during a write() call or similar
* A single correct call is needed only to progress.
* @param path
* @param size
* @param offset
* @param append_flag
* @param num_copies number of replicas
* @return pair<error code, size after update>
*/
pair<int, off64_t>
forward_update_metadentry_size(const string& path, const size_t size,
const off64_t offset, const bool append_flag) {
const off64_t offset, const bool append_flag,
const int num_copies) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::update_metadentry_size>(
endp, path, size, offset,
bool_to_merc_bool(append_flag))
.get()
.at(0);
std::vector<hermes::rpc_handle<gkfs::rpc::update_metadentry_size>> handles;
LOG(DEBUG, "Got response success: {}", out.err());
for(auto copy = 0; copy < num_copies + 1; copy++) {
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::update_metadentry_size>(
endp, path, size, offset,
bool_to_merc_bool(append_flag)));
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return make_pair(EBUSY, 0);
}
}
auto err = 0;
ssize_t out_size = 0;
auto idx = 0;
bool valid = false;
for(const auto& h : handles) {
try {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
auto out = h.get().at(0);
if(out.err())
return make_pair(out.err(), 0);
else
return make_pair(0, out.ret_size());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return make_pair(EBUSY, 0);
if(out.err() != 0) {
LOG(ERROR, "Daemon {} reported error: {}", idx, out.err());
} else {
valid = true;
out_size = out.ret_size();
}
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to get rpc output");
if(!valid) {
err = EIO;
}
}
idx++;
}
if(!valid)
return make_pair(err, 0);
else
return make_pair(0, out_size);
}
/**
* Send an RPC request to get the current file size.
* This is called during a lseek() call
* @param path
* @param copy Target replica (0 original)
* @return pair<error code, file size>
*/
pair<int, off64_t>
forward_get_metadentry_size(const std::string& path) {
forward_get_metadentry_size(const std::string& path, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -831,7 +887,8 @@ forward_get_dirents_single(const string& path, int server) {
int
forward_mk_symlink(const std::string& path, const std::string& target_path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp =
CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0));
try {
LOG(DEBUG, "Sending RPC ...");
......
......@@ -47,27 +47,34 @@ SimpleHashDistributor::localhost() const {
return localhost_;
}
unsigned int
SimpleHashDistributor::hosts_size() const {
return hosts_size_;
}
host_t
SimpleHashDistributor::locate_data(const string& path,
const chunkid_t& chnk_id) const {
return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
const int num_copy) const {
return (str_hash(path + ::to_string(chnk_id)) + num_copy) % hosts_size_;
}
host_t
SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) {
unsigned int hosts_size,
const int num_copy) {
if(hosts_size_ != hosts_size) {
hosts_size_ = hosts_size;
all_hosts_ = std::vector<unsigned int>(hosts_size);
::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}
return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
return (str_hash(path + ::to_string(chnk_id)) + num_copy) % hosts_size_;
}
host_t
SimpleHashDistributor::locate_file_metadata(const string& path) const {
return str_hash(path) % hosts_size_;
SimpleHashDistributor::locate_file_metadata(const string& path,
const int num_copy) const {
return (str_hash(path) + num_copy) % hosts_size_;
}
::vector<host_t>
......@@ -83,14 +90,20 @@ LocalOnlyDistributor::localhost() const {
return localhost_;
}
unsigned int
LocalOnlyDistributor::hosts_size() const {
return hosts_size_;
}
host_t
LocalOnlyDistributor::locate_data(const string& path,
const chunkid_t& chnk_id) const {
LocalOnlyDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
const int num_copy) const {
return localhost_;
}
host_t
LocalOnlyDistributor::locate_file_metadata(const string& path) const {
LocalOnlyDistributor::locate_file_metadata(const string& path,
const int num_copy) const {
return localhost_;
}
......@@ -110,24 +123,32 @@ ForwarderDistributor::localhost() const {
return fwd_host_;
}
unsigned int
ForwarderDistributor::hosts_size() const {
return hosts_size_;
}
host_t
ForwarderDistributor::locate_data(const std::string& path,
const chunkid_t& chnk_id) const {
const chunkid_t& chnk_id,
const int num_copy) const {
return fwd_host_;
}
host_t
ForwarderDistributor::locate_data(const std::string& path,
const chunkid_t& chnk_id,
unsigned int host_size) {
unsigned int host_size, const int num_copy) {
return fwd_host_;
}
host_t
ForwarderDistributor::locate_file_metadata(const std::string& path) const {
return str_hash(path) % hosts_size_;
ForwarderDistributor::locate_file_metadata(const std::string& path,
const int num_copy) const {
return (str_hash(path) + num_copy) % hosts_size_;
}
std::vector<host_t>
ForwarderDistributor::locate_directory_metadata(const std::string& path) const {
return all_hosts_;
......@@ -213,21 +234,26 @@ GuidedDistributor::localhost() const {
return localhost_;
}
unsigned int
GuidedDistributor::hosts_size() const {
return hosts_size_;
}
host_t
GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) {
unsigned int hosts_size, const int num_copy) {
if(hosts_size_ != hosts_size) {
hosts_size_ = hosts_size;
all_hosts_ = std::vector<unsigned int>(hosts_size);
::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}
return (locate_data(path, chnk_id));
return (locate_data(path, chnk_id, num_copy));
}
host_t
GuidedDistributor::locate_data(const string& path,
const chunkid_t& chnk_id) const {
GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
const int num_copy) const {
auto it = map_interval.find(path);
if(it != map_interval.end()) {
auto it_f = it->second.first.IsInsideInterval(chnk_id);
......@@ -245,14 +271,16 @@ GuidedDistributor::locate_data(const string& path,
}
auto locate = path + ::to_string(chnk_id);
return str_hash(locate) % hosts_size_;
return (str_hash(locate) + num_copy) % hosts_size_;
}
host_t
GuidedDistributor::locate_file_metadata(const string& path) const {
return str_hash(path) % hosts_size_;
GuidedDistributor::locate_file_metadata(const string& path,
const int num_copy) const {
return (str_hash(path) + num_copy) % hosts_size_;
}
::vector<host_t>
GuidedDistributor::locate_directory_metadata(const string& path) const {
return all_hosts_;
......