Commits on Source (13)
  • Ramon Nou's avatar
    Changelog change and branch · c0ee81e7
    Ramon Nou authored and Marc Vef's avatar Marc Vef committed
    Added NUM_REPL env variable. (0 no replicas)
    
    NUM_REPL  num replicas  (Replicas < servers)
    
    Remove and truncate
    
    Metadata replication
    
    Metadata replication - Reattempt on stat
    
    minimal compilation issues (c++20), srand for repl
    
    Bitset
    
    Bit set proposal (WIP)
    
    Read - Write with bitset (<1024 chunks)
    
    Changed bitset to vector
    
    Added get_fs_config reattempt
    
    Some more resilience on create
    
    Added Replica_Check on write (disabled)
    
    Added helper vector-bitset functions
    c0ee81e7
  • Ramon Nou's avatar
    Write parameters change · 938b436d
    Ramon Nou authored and Marc Vef's avatar Marc Vef committed
    938b436d
  • Marc Vef's avatar
    Review changes · a66ae9eb
    Marc Vef authored
    a66ae9eb
  • Marc Vef's avatar
    Cleanup RPC code · d11575d6
    Marc Vef authored
    d11575d6
  • Marc Vef's avatar
    Review · 2835322a
    Marc Vef authored
    2835322a
  • Marc Vef's avatar
    Merge branch 'rnou/replication' into 'master' · 137dd33f
    Marc Vef authored
    Data replication (client side, synchronous)
    
    This MR adds support for data replication using one environment variable:
    
    `LIBGKFS_NUM_REPL=<num repl>`
    The number of replicas should go from 0 to the number of servers-1. The replicas are guided by the 
    client, so it reduces write performance but we mantain the same level of consistency. 
    On the other hand, it may increase read performance on some corner scenearios. 
    Metadata replication is also implemented
    The replication environment variable can be set up for each client, independently.
    
    If a server is down, the data will be read from another replica. The metadata management is also done from another replica.
    
    The replication is done in a synchronous way. A new function forward_write is used to sent to the different replicas. The reads are distributed, but this shouldn't produce an performance improvement as the distribution is similar to the original. 
    
    In the case of the write, the original is sent to the target servers, and then the replicas are processed. This is done to avoid issues if a server, that should host a replica, is not available. 
    
    In order to process the replicas a new method to check that a chunk needs to be processed inside a server is included, a bitset of 1024 is sent (coded in base-64 in a string). This represents 1024-chunks per write-read operation. If that is exceeded the normal hash check per chunk is done in the server. Exceeding this value, will disable the replica capabilities and produce unknown behaviours.
    
    This can be potentially increased.
    
    Finally, most of the operations are replica-aware, but some of them are missing yet. i.e., dirent.
    
    See merge request !166
    137dd33f
  • Marc Vef's avatar
    Remove debugging output from srv_data.cpp · a3605d52
    Marc Vef authored
    a3605d52
  • Marc Vef's avatar
    Merge branch 'marc/hotfix' into 'master' · f2092c3b
    Marc Vef authored
    Remove debugging output from srv_data.cpp
    
    Leftovers from replication branch that was missed during review.
    
    Currently, prints one error message per chunk on the daemon.
    
    See merge request !184
    f2092c3b
  • Julius Athenstaedt's avatar
    add submodule GSL v4.0.0.0 · e0efe538
    Julius Athenstaedt authored and Marc Vef's avatar Marc Vef committed
    e0efe538
  • Julius Athenstaedt's avatar
    use GSL to narrow down syscalls in hooks.cpp · eda70f12
    Julius Athenstaedt authored and Marc Vef's avatar Marc Vef committed
    eda70f12
  • Julius Athenstaedt's avatar
    correcting syscall return types · 213fa499
    Julius Athenstaedt authored and Marc Vef's avatar Marc Vef committed
    213fa499
  • Marc Vef's avatar
    Review · dd9dab7a
    Marc Vef authored
    dd9dab7a
  • Julius Athenstaedt's avatar
    update Changelog · 957f7a63
    Julius Athenstaedt authored and Marc Vef's avatar Marc Vef committed
    957f7a63
......@@ -22,3 +22,6 @@
[submodule "external/CLI11"]
path = external/CLI11
url = https://github.com/CLIUtils/CLI11.git
[submodule "external/GSL"]
path = external/GSL
url = https://github.com/microsoft/GSL
......@@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
- Replication without using the server. NUM_REPL (0 < NUM_REPL < num_servers) env variable defines the number of
replicas ([!166](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/141)).
- Modified write and reads to use a bitset instead of the traditional hash per chunk in the server.
- Added reattemp support in get_fs_config to other servers, when the initial server fails.
### New
### Changed
......@@ -16,6 +20,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
### New
- Use of GSL to narrow cast syscall return types, corrected syscall return types
([!182](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/182)).
- Support for client-side per process logging, activated
with `LIBGKFS_LOG_PER_PROCESS` ([!179](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/179)).
- Support mtime with option gkfs::config::metadata::
......
......@@ -247,6 +247,13 @@ include_from_source(cli11
GIT_TAG v2.2.0
)
### GSL: Guidelines Support Library
include_from_source(gsl
MESSAGE "[${PROJECT_NAME}] Searching for GSL"
SOURCE_DIR ${GKFS_DEPENDENCIES_PATH}/GSL
GIT_REPOSITORY https://github.com/microsoft/GSL
GIT_TAG v4.0.0
)
################################################################################
## Check configured variables/options and act accordingly
......
......@@ -320,6 +320,13 @@ Support for fstat in renamed files is included.
This is disabled by default.
### Replication
The user can enable the data replication feature by setting the replication environment variable:
`LIBGKFS_NUM_REPL=<num repl>`.
The number of replicas should go from `0` to the `number of servers - 1`. The replication environment variable can be
set up for each client independently.
## Acknowledgment
This software was partially supported by the EC H2020 funded NEXTGenIO project (Project ID: 671951, www.nextgenio.eu).
......@@ -330,7 +337,8 @@ the DFG.
This software is partially supported by the FIDIUM project funded by the DFG.
This work was partially funded by the European Union’s Horizon 2020 and the German Ministry of Education and Research (
BMBF) under the ``Adaptive multi-tier intelligent data manager for Exascale (ADMIRE)'' project (https://www.admire-eurohpc.eu/); Grant Agreement number:
BMBF) under the ``Adaptive multi-tier intelligent data manager for Exascale (ADMIRE)''
project (https://www.admire-eurohpc.eu/); Grant Agreement number:
956748-ADMIRE-H2020-JTI-EuroHPC-2019-1. Further, this work was partially supported by the Spanish Ministry of Economy
and Competitiveness (MINECO) under grants PID2019-107255GB, and the Generalitat de Catalunya under contract
2021-SGR-00412. This publication is part of the project ADMIRE PCI2021-121952, funded by MCIN/AEI/10.13039/501100011033.
......@@ -218,3 +218,89 @@ the logging subsystem to truncate the file used for logging, rather than append
For the daemon, the `GKFS_DAEMON_LOG_PATH=<path/to/file>` environment variable can be provided to set the path to the
log file, and the log module can be selected with the `GKFS_DAEMON_LOG_LEVEL={off,critical,err,warn,info,debug,trace}`
environment variable whereas `trace` produces the most trace records while `info` is the default value.
## Miscellaneous
### External functions
GekkoFS allows to use external functions on your client code, via LD_PRELOAD.
Source code needs to be compiled with -fPIC. We include a pfind io500 substitution,
`examples/gfind/gfind.cpp` and a non-mpi version `examples/gfind/sfind.cpp`
### Data distributors
The data distribution can be selected at compilation time, we have 2 distributors available:
#### Simple Hash (Default)
Chunks are distributed randomly to the different GekkoFS servers.
#### Guided Distributor
The guided distributor allows defining a specific distribution of data on a per directory or file basis.
The distribution configurations are defined within a shared file (called `guided_config.txt` henceforth) with the
following format:
`<path> <chunk_number> <host>`
To enable the distributor, the following CMake compilation flags are required:
* `GKFS_USE_GUIDED_DISTRIBUTION` ON
* `GKFS_USE_GUIDED_DISTRIBUTION_PATH` `<path_guided_config.txt>`
To use a custom distribution, a path needs to have the prefix `#` (e.g., `#/mdt-hard 0 0`), in which all the data of all
files in that directory goes to the same place as the metadata.
Note, that a chunk/host configuration is inherited to all children files automatically even if not using the prefix.
In this example, `/mdt-hard/file1` is therefore also using the same distribution as the `/mdt-hard` directory.
If no prefix is used, the Simple Hash distributor is used.
##### Guided configuration file
Creating a guided configuration file is based on an I/O trace file of a previous execution of the application.
For this the `trace_reads` tracing module is used (see above).
The `trace_reads` module enables a `TRACE_READS` level log at the clients writing the I/O information of the client
which is used as the input for a script that creates the guided distributor setting.
Note that capturing the necessary trace records can involve performance degradation.
To capture the I/O of each client within a SLURM environment, i.e., enabling the `trace_reads` module and print its
output to a user-defined path, the following example can be used:
`srun -N 10 -n 320 --export="ALL" /bin/bash -c "export LIBGKFS_LOG=trace_reads;LIBGKFS_LOG_OUTPUT=${HOME}/test/GLOBAL.txt;LD_PRELOAD=${GKFS_PRLD} <app>"`
Then, the `examples/distributors/guided/generate.py` scrpt is used to create the guided distributor configuration file:
* `python examples/distributors/guided/generate.py ~/test/GLOBAL.txt >> guided_config.txt`
Finally, modify `guided_config.txt` to your distribution requirements.
### Metadata Backends
There are two different metadata backends in GekkoFS. The default one uses `rocksdb`, however an alternative based
on `PARALLAX` from `FORTH`
is available. To enable it use the `-DGKFS_ENABLE_PARALLAX:BOOL=ON` option, you can also disable `rocksdb`
with `-DGKFS_ENABLE_ROCKSDB:BOOL=OFF`.
Once it is enabled, `--dbbackend` option will be functional.
### Statistics
GekkoFS daemons are able to output general operations (`--enable-collection`) and data chunk
statistics (`--enable-chunkstats`) to a specified output file via `--output-stats <FILE>`. Prometheus can also be used
instead or in addition to the output file. It must be enabled at compile time via the CMake
argument `-DGKFS_ENABLE_PROMETHEUS` and the daemon argument `--enable-prometheus`. The corresponding statistics are then
pushed to the Prometheus instance.
### Advanced experimental features
#### Rename
`-DGKFS_RENAME_SUPPORT` allows the application to rename files.
This is an experimental feature, and some scenarios may not work properly.
Support for fstat in renamed files is included.
This is disabled by default.
#### Replication
The user can enable the data replication feature by setting the replication environment variable:
`LIBGKFS_NUM_REPL=<num repl>`.
The number of replicas should go from `0` to the `number of servers - 1`. The replication environment variable can be
set up for each client independently.
\ No newline at end of file
Subproject commit a3534567187d2edc428efd3f13466ff75fe5805c
......@@ -52,7 +52,7 @@ static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE");
#ifdef GKFS_ENABLE_FORWARDING
static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE");
#endif
static constexpr auto NUM_REPL = ADD_PREFIX("NUM_REPL");
} // namespace gkfs::env
#undef ADD_PREFIX
......
......@@ -88,29 +88,29 @@ hook_fstat(unsigned int fd, struct stat* buf);
int
hook_fstatat(int dirfd, const char* cpath, struct stat* buf, int flags);
int
ssize_t
hook_read(unsigned int fd, void* buf, size_t count);
int
ssize_t
hook_pread(unsigned int fd, char* buf, size_t count, loff_t pos);
int
ssize_t
hook_readv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt);
int
ssize_t
hook_preadv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt,
unsigned long pos_l, unsigned long pos_h);
int
ssize_t
hook_write(unsigned int fd, const char* buf, size_t count);
int
ssize_t
hook_pwrite(unsigned int fd, const char* buf, size_t count, loff_t pos);
int
ssize_t
hook_writev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt);
int
ssize_t
hook_pwritev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt,
unsigned long pos_l, unsigned long pos_h);
......@@ -152,10 +152,10 @@ hook_dup2(unsigned int oldfd, unsigned int newfd);
int
hook_dup3(unsigned int oldfd, unsigned int newfd, int flags);
int
long
hook_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count);
int
ssize_t
hook_getdents64(unsigned int fd, struct linux_dirent64* dirp,
unsigned int count);
......@@ -177,7 +177,7 @@ hook_fchdir(unsigned int fd);
int
hook_getcwd(char* buf, unsigned long size);
int
ssize_t
hook_readlinkat(int dirfd, const char* cpath, char* buf, int bufsiz);
int
......@@ -196,7 +196,7 @@ hook_fstatfs(unsigned int fd, struct statfs* buf);
int
hook_fsync(unsigned int fd);
int
ssize_t
hook_getxattr(const char* path, const char* name, void* value, size_t size);
} // namespace gkfs::hook
......
......@@ -105,6 +105,7 @@ private:
bool internal_fds_must_relocate_;
std::bitset<MAX_USER_FDS> protected_fds_;
std::string hostname;
int replicas_;
public:
static PreloadContext*
......@@ -216,6 +217,12 @@ public:
std::string
get_hostname();
void
set_replicas(const int repl);
int
get_replicas();
};
} // namespace preload
......
......@@ -30,6 +30,9 @@
#ifndef GEKKOFS_CLIENT_FORWARD_DATA_HPP
#define GEKKOFS_CLIENT_FORWARD_DATA_HPP
#include <string>
#include <memory>
#include <set>
namespace gkfs::rpc {
struct ChunkStat {
......@@ -43,14 +46,16 @@ struct ChunkStat {
std::pair<int, ssize_t>
forward_write(const std::string& path, const void* buf, off64_t offset,
size_t write_size);
size_t write_size, const int8_t num_copy = 0);
std::pair<int, ssize_t>
forward_read(const std::string& path, void* buf, off64_t offset,
size_t read_size);
size_t read_size, const int8_t num_copies,
std::set<int8_t>& failed);
int
forward_truncate(const std::string& path, size_t current_size, size_t new_size);
forward_truncate(const std::string& path, size_t current_size, size_t new_size,
const int8_t num_copies);
std::pair<int, ChunkStat>
forward_get_chunk_stat();
......
......@@ -50,10 +50,10 @@ class Metadata;
namespace rpc {
int
forward_create(const std::string& path, mode_t mode);
forward_create(const std::string& path, mode_t mode, const int copy);
int
forward_stat(const std::string& path, std::string& attr);
forward_stat(const std::string& path, std::string& attr, const int copy);
#ifdef HAS_RENAME
int
......@@ -62,22 +62,24 @@ forward_rename(const std::string& oldpath, const std::string& newpath,
#endif // HAS_RENAME
int
forward_remove(const std::string& path);
forward_remove(const std::string& path, const int8_t num_copies);
int
forward_decr_size(const std::string& path, size_t length);
forward_decr_size(const std::string& path, size_t length, const int copy);
int
forward_update_metadentry(
const std::string& path, const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags);
forward_update_metadentry(const std::string& path,
const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags,
const int copy);
std::pair<int, off64_t>
forward_update_metadentry_size(const std::string& path, size_t size,
off64_t offset, bool append_flag);
off64_t offset, bool append_flag,
const int num_copies);
std::pair<int, off64_t>
forward_get_metadentry_size(const std::string& path);
forward_get_metadentry_size(const std::string& path, const int copy);
std::pair<int, std::shared_ptr<gkfs::filemap::OpenDir>>
forward_get_dirents(const std::string& path);
......
......@@ -1469,11 +1469,11 @@ struct write_data {
public:
input(const std::string& path, int64_t offset, uint64_t host_id,
uint64_t host_size, uint64_t chunk_n, uint64_t chunk_start,
uint64_t chunk_end, uint64_t total_chunk_size,
const hermes::exposed_memory& buffers)
uint64_t host_size, const std::string& wbitset, uint64_t chunk_n,
uint64_t chunk_start, uint64_t chunk_end,
uint64_t total_chunk_size, const hermes::exposed_memory& buffers)
: m_path(path), m_offset(offset), m_host_id(host_id),
m_host_size(host_size), m_chunk_n(chunk_n),
m_host_size(host_size), m_wbitset(wbitset), m_chunk_n(chunk_n),
m_chunk_start(chunk_start), m_chunk_end(chunk_end),
m_total_chunk_size(total_chunk_size), m_buffers(buffers) {}
......@@ -1512,6 +1512,11 @@ struct write_data {
return m_chunk_n;
}
std::string
wbitset() const {
return m_wbitset;
}
uint64_t
chunk_start() const {
return m_chunk_start;
......@@ -1535,15 +1540,16 @@ struct write_data {
explicit input(const rpc_write_data_in_t& other)
: m_path(other.path), m_offset(other.offset),
m_host_id(other.host_id), m_host_size(other.host_size),
m_chunk_n(other.chunk_n), m_chunk_start(other.chunk_start),
m_chunk_end(other.chunk_end),
m_wbitset(other.wbitset), m_chunk_n(other.chunk_n),
m_chunk_start(other.chunk_start), m_chunk_end(other.chunk_end),
m_total_chunk_size(other.total_chunk_size),
m_buffers(other.bulk_handle) {}
explicit operator rpc_write_data_in_t() {
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_chunk_n, m_chunk_start,
m_chunk_end, m_total_chunk_size, hg_bulk_t(m_buffers)};
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_wbitset.c_str(), m_chunk_n,
m_chunk_start, m_chunk_end, m_total_chunk_size,
hg_bulk_t(m_buffers)};
}
private:
......@@ -1551,6 +1557,7 @@ struct write_data {
int64_t m_offset;
uint64_t m_host_id;
uint64_t m_host_size;
std::string m_wbitset;
uint64_t m_chunk_n;
uint64_t m_chunk_start;
uint64_t m_chunk_end;
......@@ -1647,11 +1654,11 @@ struct read_data {
public:
input(const std::string& path, int64_t offset, uint64_t host_id,
uint64_t host_size, uint64_t chunk_n, uint64_t chunk_start,
uint64_t chunk_end, uint64_t total_chunk_size,
const hermes::exposed_memory& buffers)
uint64_t host_size, const std::string& wbitset, uint64_t chunk_n,
uint64_t chunk_start, uint64_t chunk_end,
uint64_t total_chunk_size, const hermes::exposed_memory& buffers)
: m_path(path), m_offset(offset), m_host_id(host_id),
m_host_size(host_size), m_chunk_n(chunk_n),
m_host_size(host_size), m_wbitset(wbitset), m_chunk_n(chunk_n),
m_chunk_start(chunk_start), m_chunk_end(chunk_end),
m_total_chunk_size(total_chunk_size), m_buffers(buffers) {}
......@@ -1685,6 +1692,11 @@ struct read_data {
return m_host_size;
}
std::string
wbitset() const {
return m_wbitset;
}
uint64_t
chunk_n() const {
return m_chunk_n;
......@@ -1713,15 +1725,16 @@ struct read_data {
explicit input(const rpc_read_data_in_t& other)
: m_path(other.path), m_offset(other.offset),
m_host_id(other.host_id), m_host_size(other.host_size),
m_chunk_n(other.chunk_n), m_chunk_start(other.chunk_start),
m_chunk_end(other.chunk_end),
m_wbitset(other.wbitset), m_chunk_n(other.chunk_n),
m_chunk_start(other.chunk_start), m_chunk_end(other.chunk_end),
m_total_chunk_size(other.total_chunk_size),
m_buffers(other.bulk_handle) {}
explicit operator rpc_read_data_in_t() {
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_chunk_n, m_chunk_start,
m_chunk_end, m_total_chunk_size, hg_bulk_t(m_buffers)};
return {m_path.c_str(), m_offset, m_host_id,
m_host_size, m_wbitset.c_str(), m_chunk_n,
m_chunk_start, m_chunk_end, m_total_chunk_size,
hg_bulk_t(m_buffers)};
}
private:
......@@ -1729,6 +1742,7 @@ struct read_data {
int64_t m_offset;
uint64_t m_host_id;
uint64_t m_host_size;
std::string m_wbitset;
uint64_t m_chunk_n;
uint64_t m_chunk_start;
uint64_t m_chunk_end;
......
......@@ -37,7 +37,6 @@
#include <string>
#include <cstdint>
namespace gkfs::metadata {
constexpr mode_t LINK_MODE = ((S_IRWXU | S_IRWXG | S_IRWXO) | S_IFLNK);
......
......@@ -48,15 +48,20 @@ public:
localhost() const = 0;
virtual host_t
locate_data(const std::string& path, const chunkid_t& chnk_id) const = 0;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const = 0;
// TODO: We need to pass hosts_size in the server side, because the number
// of servers are not defined (in startup)
virtual unsigned int
hosts_size() const = 0;
virtual host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) = 0;
unsigned int hosts_size, const int num_copy) = 0;
virtual host_t
locate_file_metadata(const std::string& path) const = 0;
locate_file_metadata(const std::string& path, const int num_copy) const = 0;
virtual std::vector<host_t>
locate_directory_metadata(const std::string& path) const = 0;
......@@ -75,19 +80,23 @@ public:
SimpleHashDistributor(host_t localhost, unsigned int hosts_size);
unsigned int
hosts_size() const override;
host_t
localhost() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size);
unsigned int host_size, const int num_copy);
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......@@ -96,6 +105,7 @@ public:
class LocalOnlyDistributor : public Distributor {
private:
host_t localhost_;
unsigned int hosts_size_{0};
public:
explicit LocalOnlyDistributor(host_t localhost);
......@@ -103,12 +113,16 @@ public:
host_t
localhost() const override;
unsigned int
hosts_size() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override;
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......@@ -117,7 +131,7 @@ public:
class ForwarderDistributor : public Distributor {
private:
host_t fwd_host_;
unsigned int hosts_size_;
unsigned int hosts_size_{0};
std::vector<host_t> all_hosts_;
std::hash<std::string> str_hash;
......@@ -127,16 +141,20 @@ public:
host_t
localhost() const override final;
unsigned int
hosts_size() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override final;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override final;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size) override final;
unsigned int host_size, const int num_copy) override final;
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......@@ -176,16 +194,20 @@ public:
host_t
localhost() const override;
unsigned int
hosts_size() const override;
host_t
locate_data(const std::string& path,
const chunkid_t& chnk_id) const override;
locate_data(const std::string& path, const chunkid_t& chnk_id,
const int num_copy) const override;
host_t
locate_data(const std::string& path, const chunkid_t& chnk_id,
unsigned int host_size);
unsigned int host_size, const int num_copy);
host_t
locate_file_metadata(const std::string& path) const override;
locate_file_metadata(const std::string& path,
const int num_copy) const override;
std::vector<host_t>
locate_directory_metadata(const std::string& path) const override;
......
......@@ -89,9 +89,9 @@ MERCURY_GEN_PROC(
rpc_read_data_in_t,
((hg_const_string_t) (path))((int64_t) (offset))(
(hg_uint64_t) (host_id))((hg_uint64_t) (host_size))(
(hg_uint64_t) (chunk_n))((hg_uint64_t) (chunk_start))(
(hg_uint64_t) (chunk_end))((hg_uint64_t) (total_chunk_size))(
(hg_bulk_t) (bulk_handle)))
(hg_const_string_t) (wbitset))((hg_uint64_t) (chunk_n))(
(hg_uint64_t) (chunk_start))((hg_uint64_t) (chunk_end))(
(hg_uint64_t) (total_chunk_size))((hg_bulk_t) (bulk_handle)))
MERCURY_GEN_PROC(rpc_data_out_t, ((int32_t) (err))((hg_size_t) (io_size)))
......@@ -99,9 +99,9 @@ MERCURY_GEN_PROC(
rpc_write_data_in_t,
((hg_const_string_t) (path))((int64_t) (offset))(
(hg_uint64_t) (host_id))((hg_uint64_t) (host_size))(
(hg_uint64_t) (chunk_n))((hg_uint64_t) (chunk_start))(
(hg_uint64_t) (chunk_end))((hg_uint64_t) (total_chunk_size))(
(hg_bulk_t) (bulk_handle)))
(hg_const_string_t) (wbitset))((hg_uint64_t) (chunk_n))(
(hg_uint64_t) (chunk_start))((hg_uint64_t) (chunk_end))(
(hg_uint64_t) (total_chunk_size))((hg_bulk_t) (bulk_handle)))
MERCURY_GEN_PROC(rpc_get_dirents_in_t,
((hg_const_string_t) (path))((hg_bulk_t) (bulk_handle)))
......
......@@ -35,6 +35,9 @@ extern "C" {
}
#include <string>
#include <iostream>
#include <sstream>
#include <vector>
namespace gkfs::rpc {
......@@ -49,6 +52,18 @@ std::string
get_host_by_name(const std::string& hostname);
#endif
bool
get_bitset(const std::vector<uint8_t>& data, const uint16_t position);
void
set_bitset(std::vector<uint8_t>& data, const uint16_t position);
std::string
compress_bitset(const std::vector<uint8_t>& bytes);
std::vector<uint8_t>
decompress_bitset(const std::string& compressedString);
} // namespace gkfs::rpc
#endif // GEKKOFS_COMMON_RPC_UTILS_HPP
......@@ -62,6 +62,7 @@ target_link_libraries(
fmt::fmt
Threads::Threads
Date::TZ
Microsoft.GSL::GSL
)
install(
......@@ -113,6 +114,7 @@ if(GKFS_ENABLE_FORWARDING)
fmt::fmt
Threads::Threads
Date::TZ
Microsoft.GSL::GSL
)
install(
......
......@@ -298,11 +298,21 @@ gkfs_create(const std::string& path, mode_t mode) {
if(check_parent_dir(path)) {
return -1;
}
auto err = gkfs::rpc::forward_create(path, mode);
if(err) {
errno = err;
// Write to all replicas, at least one need to success
bool success = false;
for(auto copy = 0; copy < CTX->get_replicas() + 1; copy++) {
auto err = gkfs::rpc::forward_create(path, mode, copy);
if(err) {
errno = err;
} else {
success = true;
errno = 0;
}
}
if(!success) {
return -1;
}
return 0;
}
......@@ -340,7 +350,7 @@ gkfs_remove(const std::string& path) {
return -1;
}
}
auto err = gkfs::rpc::forward_remove(new_path);
auto err = gkfs::rpc::forward_remove(new_path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......@@ -350,7 +360,7 @@ gkfs_remove(const std::string& path) {
#endif // HAS_RENAME
#endif // HAS_SYMLINKS
auto err = gkfs::rpc::forward_remove(path);
auto err = gkfs::rpc::forward_remove(path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......@@ -406,6 +416,7 @@ gkfs_access(const std::string& path, const int mask, bool follow_links) {
* We use blocks to determine if the file is a renamed file.
* If the file is re-renamed (a->b->a) a recovers the block of b
* and we delete b.
* There is no support for replication in rename
* @param old_path
* @param new_path
* @return 0 on success, -1 on failure
......@@ -441,14 +452,14 @@ gkfs_rename(const string& old_path, const string& new_path) {
md_old.value().target_path("");
auto err = gkfs::rpc::forward_update_metadentry(
new_path, md_old.value(), flags);
new_path, md_old.value(), flags, 0);
if(err) {
errno = err;
return -1;
}
// Delete old file
err = gkfs::rpc::forward_remove(old_path);
err = gkfs::rpc::forward_remove(old_path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......@@ -674,7 +685,9 @@ gkfs_lseek(shared_ptr<gkfs::filemap::OpenFile> gkfs_fd, off_t offset,
gkfs_fd->pos(gkfs_fd->pos() + offset);
break;
case SEEK_END: {
auto ret = gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path());
// TODO: handle replicas
auto ret =
gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path(), 0);
auto err = ret.first;
if(err) {
errno = err;
......@@ -723,14 +736,17 @@ gkfs_truncate(const std::string& path, off_t old_size, off_t new_size) {
if(new_size == old_size) {
return 0;
}
auto err = gkfs::rpc::forward_decr_size(path, new_size);
if(err) {
LOG(DEBUG, "Failed to decrease size");
errno = err;
return -1;
for(auto copy = 0; copy < (CTX->get_replicas() + 1); copy++) {
auto err = gkfs::rpc::forward_decr_size(path, new_size, copy);
if(err) {
LOG(DEBUG, "Failed to decrease size");
errno = err;
return -1;
}
}
err = gkfs::rpc::forward_truncate(path, old_size, new_size);
auto err = gkfs::rpc::forward_truncate(path, old_size, new_size,
CTX->get_replicas());
if(err) {
LOG(DEBUG, "Failed to truncate data");
errno = err;
......@@ -864,9 +880,11 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
}
auto path = make_unique<string>(file->path());
auto is_append = file->get_flag(gkfs::filemap::OpenFile_flags::append);
auto write_size = 0;
auto num_replicas = CTX->get_replicas();
auto ret_offset = gkfs::rpc::forward_update_metadentry_size(
*path, count, offset, is_append);
*path, count, offset, is_append, num_replicas);
auto err = ret_offset.first;
if(err) {
LOG(ERROR, "update_metadentry_size() failed with err '{}'", err);
......@@ -888,8 +906,22 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
offset = ret_offset.second;
}
auto ret_write = gkfs::rpc::forward_write(*path, buf, offset, count);
auto ret_write = gkfs::rpc::forward_write(*path, buf, offset, count, 0);
err = ret_write.first;
write_size = ret_write.second;
if(num_replicas > 0) {
auto ret_write_repl = gkfs::rpc::forward_write(*path, buf, offset,
count, num_replicas);
if(err and ret_write_repl.first == 0) {
// We succesfully write the data to some replica
err = ret_write_repl.first;
// Write size will be wrong
write_size = ret_write_repl.second;
}
}
if(err) {
LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err);
errno = err;
......@@ -897,14 +929,14 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
}
if(update_pos) {
// Update offset in file descriptor in the file map
file->pos(offset + ret_write.second);
file->pos(offset + write_size);
}
if(static_cast<size_t>(ret_write.second) != count) {
if(static_cast<size_t>(write_size) != count) {
LOG(WARNING,
"gkfs::rpc::forward_write() wrote '{}' bytes instead of '{}'",
ret_write.second, count);
write_size, count);
}
return ret_write.second; // return written size
return write_size; // return written size
}
/**
......@@ -1024,7 +1056,24 @@ gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf,
if constexpr(gkfs::config::io::zero_buffer_before_read) {
memset(buf, 0, sizeof(char) * count);
}
auto ret = gkfs::rpc::forward_read(file->path(), buf, offset, count);
std::pair<int, off_t> ret;
std::set<int8_t> failed; // set with failed targets.
if(CTX->get_replicas() != 0) {
ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
CTX->get_replicas(), failed);
while(ret.first == EIO) {
ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
CTX->get_replicas(), failed);
LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'",
ret.first);
}
} else {
ret = gkfs::rpc::forward_read(file->path(), buf, offset, count, 0,
failed);
}
auto err = ret.first;
if(err) {
LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'", err);
......@@ -1192,7 +1241,7 @@ gkfs_rmdir(const std::string& path) {
errno = ENOTEMPTY;
return -1;
}
err = gkfs::rpc::forward_remove(path);
err = gkfs::rpc::forward_remove(path, CTX->get_replicas());
if(err) {
errno = err;
return -1;
......
......@@ -37,19 +37,23 @@
#include <common/path_util.hpp>
#include <memory>
#include <string>
#include <gsl/util>
extern "C" {
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/statfs.h>
#include <sys/syscall.h>
}
namespace {
// TODO replace all internal gkfs errno variable usage with LEAF
inline int
with_errno(int ret) {
template <typename T>
inline T
with_errno(T ret) {
return (ret < 0) ? -errno : ret;
}
......@@ -67,12 +71,12 @@ hook_openat(int dirfd, const char* cpath, int flags, mode_t mode) {
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_openat, dirfd, cpath, flags,
mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_openat, dirfd, cpath, flags, mode));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_openat, dirfd,
resolved.c_str(), flags, mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_openat, dirfd, resolved.c_str(), flags, mode));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -103,7 +107,7 @@ hook_close(int fd) {
return 0;
}
return syscall_no_intercept_wrapper(SYS_close, fd);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(SYS_close, fd));
}
#ifdef SYS_stat
int
......@@ -117,7 +121,8 @@ hook_stat(const char* path, struct stat* buf) {
return with_errno(gkfs::syscall::gkfs_stat(rel_path, buf));
}
return syscall_no_intercept_wrapper(SYS_stat, rel_path.c_str(), buf);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_stat, rel_path.c_str(), buf));
}
#endif
......@@ -135,12 +140,12 @@ hook_statx(int dirfd, const char* path, int flags, unsigned int mask,
auto rstatus = CTX->relativize_fd_path(dirfd, path, resolved);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_statx, dirfd, path, flags,
mask, buf);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_statx, dirfd, path, flags, mask, buf));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(
SYS_statx, dirfd, resolved.c_str(), flags, mask, buf);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_statx, dirfd, resolved.c_str(), flags, mask, buf));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -153,8 +158,6 @@ hook_statx(int dirfd, const char* path, int flags, unsigned int mask,
LOG(ERROR, "{}() relativize status unknown: {}", __func__);
return -EINVAL;
}
return syscall_no_intercept(SYS_statx, dirfd, path, flags, mask, buf);
}
#endif
......@@ -170,7 +173,8 @@ hook_lstat(const char* path, struct stat* buf) {
if(CTX->relativize_path(path, rel_path)) {
return with_errno(gkfs::syscall::gkfs_stat(rel_path, buf));
}
return syscall_no_intercept_wrapper(SYS_lstat, rel_path.c_str(), buf);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_lstat, rel_path.c_str(), buf));
}
#endif
......@@ -191,7 +195,8 @@ hook_fstat(unsigned int fd, struct stat* buf) {
#endif
return with_errno(gkfs::syscall::gkfs_stat(path, buf));
}
return syscall_no_intercept_wrapper(SYS_fstat, fd, buf);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fstat, fd, buf));
}
int
......@@ -204,12 +209,12 @@ hook_fstatat(int dirfd, const char* cpath, struct stat* buf, int flags) {
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved, flags);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_newfstatat, dirfd, cpath,
buf, flags);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_newfstatat, dirfd, cpath, buf, flags));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_newfstatat, dirfd,
resolved.c_str(), buf, flags);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_newfstatat, dirfd, resolved.c_str(), buf, flags));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -223,7 +228,7 @@ hook_fstatat(int dirfd, const char* cpath, struct stat* buf, int flags) {
}
}
int
ssize_t
hook_read(unsigned int fd, void* buf, size_t count) {
LOG(DEBUG, "{}() called with fd: {}, buf: {} count: {}", __func__, fd,
......@@ -235,7 +240,7 @@ hook_read(unsigned int fd, void* buf, size_t count) {
return syscall_no_intercept_wrapper(SYS_read, fd, buf, count);
}
int
ssize_t
hook_pread(unsigned int fd, char* buf, size_t count, loff_t pos) {
LOG(DEBUG, "{}() called with fd: {}, buf: {}, count: {}, pos: {}", __func__,
......@@ -249,7 +254,7 @@ hook_pread(unsigned int fd, char* buf, size_t count, loff_t pos) {
return syscall_no_intercept_wrapper(SYS_pread64, fd, buf, count, pos);
}
int
ssize_t
hook_readv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt) {
LOG(DEBUG, "{}() called with fd: {}, iov: {}, iovcnt: {}", __func__, fd,
......@@ -261,7 +266,7 @@ hook_readv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt) {
return syscall_no_intercept_wrapper(SYS_readv, fd, iov, iovcnt);
}
int
ssize_t
hook_preadv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt,
unsigned long pos_l, unsigned long pos_h) {
......@@ -277,7 +282,7 @@ hook_preadv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt,
return syscall_no_intercept_wrapper(SYS_preadv, fd, iov, iovcnt, pos_l);
}
int
ssize_t
hook_write(unsigned int fd, const char* buf, size_t count) {
LOG(DEBUG, "{}() called with fd: {}, buf: {}, count {}", __func__, fd,
......@@ -289,7 +294,7 @@ hook_write(unsigned int fd, const char* buf, size_t count) {
return syscall_no_intercept_wrapper(SYS_write, fd, buf, count);
}
int
ssize_t
hook_pwrite(unsigned int fd, const char* buf, size_t count, loff_t pos) {
LOG(DEBUG, "{}() called with fd: {}, buf: {}, count: {}, pos: {}", __func__,
......@@ -303,7 +308,7 @@ hook_pwrite(unsigned int fd, const char* buf, size_t count, loff_t pos) {
return syscall_no_intercept_wrapper(SYS_pwrite64, fd, buf, count, pos);
}
int
ssize_t
hook_writev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt) {
LOG(DEBUG, "{}() called with fd: {}, iov: {}, iovcnt: {}", __func__, fd,
......@@ -315,7 +320,7 @@ hook_writev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt) {
return syscall_no_intercept_wrapper(SYS_writev, fd, iov, iovcnt);
}
int
ssize_t
hook_pwritev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt,
unsigned long pos_l, unsigned long pos_h) {
......@@ -346,12 +351,12 @@ hook_unlinkat(int dirfd, const char* cpath, int flags) {
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved, false);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_unlinkat, dirfd, cpath,
flags);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_unlinkat, dirfd, cpath, flags));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_unlinkat, dirfd,
resolved.c_str(), flags);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_unlinkat, dirfd, resolved.c_str(), flags));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -386,12 +391,12 @@ hook_symlinkat(const char* oldname, int newdfd, const char* newname) {
CTX->relativize_fd_path(newdfd, newname, newname_resolved, false);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_symlinkat, oldname, newdfd,
newname);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_symlinkat, oldname, newdfd, newname));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_symlinkat, oldname, newdfd,
newname_resolved.c_str());
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_symlinkat, oldname, newdfd, newname_resolved.c_str()));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -431,7 +436,8 @@ hook_access(const char* path, int mask) {
}
return ret;
}
return syscall_no_intercept_wrapper(SYS_access, rel_path.c_str(), mask);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_access, rel_path.c_str(), mask));
}
#endif
......@@ -445,12 +451,12 @@ hook_faccessat(int dirfd, const char* cpath, int mode) {
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_faccessat, dirfd, cpath,
mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_faccessat, dirfd, cpath, mode));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_faccessat, dirfd,
resolved.c_str(), mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_faccessat, dirfd, resolved.c_str(), mode));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -476,12 +482,12 @@ hook_faccessat2(int dirfd, const char* cpath, int mode, int flags) {
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_faccessat2, dirfd, cpath,
mode, flags);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_faccessat2, dirfd, cpath, mode, flags));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_faccessat2, dirfd,
resolved.c_str(), mode, flags);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_faccessat2, dirfd, resolved.c_str(), mode, flags));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -527,7 +533,8 @@ hook_truncate(const char* path, long length) {
if(CTX->relativize_path(path, rel_path)) {
return with_errno(gkfs::syscall::gkfs_truncate(rel_path, length));
}
return syscall_no_intercept_wrapper(SYS_truncate, rel_path.c_str(), length);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_truncate, rel_path.c_str(), length));
}
int
......@@ -539,7 +546,8 @@ hook_ftruncate(unsigned int fd, unsigned long length) {
auto path = CTX->file_map()->get(fd)->path();
return with_errno(gkfs::syscall::gkfs_truncate(path, length));
}
return syscall_no_intercept_wrapper(SYS_ftruncate, fd, length);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_ftruncate, fd, length));
}
int
......@@ -550,7 +558,7 @@ hook_dup(unsigned int fd) {
if(CTX->file_map()->exist(fd)) {
return with_errno(gkfs::syscall::gkfs_dup(fd));
}
return syscall_no_intercept_wrapper(SYS_dup, fd);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(SYS_dup, fd));
}
#ifdef SYS_dup2
int
......@@ -561,7 +569,8 @@ hook_dup2(unsigned int oldfd, unsigned int newfd) {
if(CTX->file_map()->exist(oldfd)) {
return with_errno(gkfs::syscall::gkfs_dup2(oldfd, newfd));
}
return syscall_no_intercept_wrapper(SYS_dup2, oldfd, newfd);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_dup2, oldfd, newfd));
}
#endif
int
......@@ -576,10 +585,11 @@ hook_dup3(unsigned int oldfd, unsigned int newfd, int flags) {
LOG(WARNING, "{}() Not supported", __func__);
return -ENOTSUP;
}
return syscall_no_intercept_wrapper(SYS_dup3, oldfd, newfd, flags);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_dup3, oldfd, newfd, flags));
}
#ifdef SYS_getdents
int
long
hook_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
LOG(DEBUG, "{}() called with fd: {}, dirp: {}, count: {}", __func__, fd,
......@@ -592,7 +602,7 @@ hook_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
}
#endif
int
ssize_t
hook_getdents64(unsigned int fd, struct linux_dirent64* dirp,
unsigned int count) {
......@@ -616,12 +626,12 @@ hook_mkdirat(int dirfd, const char* cpath, mode_t mode) {
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_mkdirat, dirfd,
resolved.c_str(), mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_mkdirat, dirfd, resolved.c_str(), mode));
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_mkdirat, dirfd, cpath,
mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_mkdirat, dirfd, cpath, mode));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -646,12 +656,12 @@ hook_fchmodat(int dirfd, const char* cpath, mode_t mode) {
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved);
switch(rstatus) {
case gkfs::preload::RelativizeStatus::fd_unknown:
return syscall_no_intercept_wrapper(SYS_fchmodat, dirfd, cpath,
mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_fchmodat, dirfd, cpath, mode));
case gkfs::preload::RelativizeStatus::external:
return syscall_no_intercept_wrapper(SYS_fchmodat, dirfd,
resolved.c_str(), mode);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_fchmodat, dirfd, resolved.c_str(), mode));
case gkfs::preload::RelativizeStatus::fd_not_a_dir:
return -ENOTDIR;
......@@ -675,7 +685,8 @@ hook_fchmod(unsigned int fd, mode_t mode) {
LOG(WARNING, "{}() operation not supported", __func__);
return -ENOTSUP;
}
return syscall_no_intercept_wrapper(SYS_fchmod, fd, mode);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fchmod, fd, mode));
}
int
......@@ -766,7 +777,7 @@ hook_getcwd(char* buf, unsigned long size) {
return (CTX->cwd().size() + 1);
}
int
ssize_t
hook_readlinkat(int dirfd, const char* cpath, char* buf, int bufsiz) {
LOG(DEBUG, "{}() called with dirfd: {}, path \"{}\", buf: {}, bufsize: {}",
......@@ -803,7 +814,8 @@ hook_fcntl(unsigned int fd, unsigned int cmd, unsigned long arg) {
arg);
if(!CTX->file_map()->exist(fd)) {
return syscall_no_intercept_wrapper(SYS_fcntl, fd, cmd, arg);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fcntl, fd, cmd, arg));
}
int ret;
switch(cmd) {
......@@ -934,8 +946,8 @@ hook_renameat(int olddfd, const char* oldname, int newdfd, const char* newname,
return -EINVAL;
}
return syscall_no_intercept_wrapper(SYS_renameat2, olddfd, oldpath_pass,
newdfd, newpath_pass, flags);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(
SYS_renameat2, olddfd, oldpath_pass, newdfd, newpath_pass, flags));
}
int
......@@ -948,7 +960,8 @@ hook_statfs(const char* path, struct statfs* buf) {
if(CTX->relativize_path(path, rel_path)) {
return with_errno(gkfs::syscall::gkfs_statfs(buf));
}
return syscall_no_intercept_wrapper(SYS_statfs, rel_path.c_str(), buf);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_statfs, rel_path.c_str(), buf));
}
int
......@@ -959,7 +972,8 @@ hook_fstatfs(unsigned int fd, struct statfs* buf) {
if(CTX->file_map()->exist(fd)) {
return with_errno(gkfs::syscall::gkfs_statfs(buf));
}
return syscall_no_intercept_wrapper(SYS_fstatfs, fd, buf);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fstatfs, fd, buf));
}
/* The function should broadcast a flush message (pmem_persist i.e.) if the
......@@ -974,10 +988,10 @@ hook_fsync(unsigned int fd) {
return 0;
}
return syscall_no_intercept_wrapper(SYS_fsync, fd);
return gsl::narrow_cast<int>(syscall_no_intercept_wrapper(SYS_fsync, fd));
}
int
ssize_t
hook_getxattr(const char* path, const char* name, void* value, size_t size) {
LOG(DEBUG, "{}() called with path '{}' name '{}' value '{}' size '{}'",
......
......@@ -37,6 +37,8 @@
#include <common/rpc/distributor.hpp>
#include <common/common_defs.hpp>
#include <ctime>
#include <cstdlib>
#include <fstream>
#include <hermes.hpp>
......@@ -238,6 +240,11 @@ init_environment() {
EXIT_FAILURE,
"Unable to fetch file system configurations from daemon process through RPC.");
}
// Initialize random number generator and seed for replica selection
// in case of failure, a new replica will be selected
if(CTX->get_replicas() > 0) {
srand(time(nullptr));
}
LOG(INFO, "Environment initialization successful.");
}
......@@ -277,6 +284,7 @@ init_preload() {
gkfs::path::init_cwd();
LOG(DEBUG, "Current working directory: '{}'", CTX->cwd());
LOG(DEBUG, "Number of replicas : '{}'", CTX->get_replicas());
gkfs::preload::init_environment();
CTX->enable_interception();
......