Skip to content
Snippets Groups Projects
Commit c300a359 authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch 'marc/62-shared-file-metadata-congestion-2' into 'master'

Resolve "Shared file metadata congestion"

During write operations, the client must update the file size on the responsible metadata daemon. The write size cache
can reduce the metadata load on the daemon and reduce the number of RPCs during write operations, especially for many
small I/O operations. In the past, we have observed that a daemon can become network-congested, especially for single shared files, many processes, and small I/O operations, which bottlenecks the overall I/O throughput. Nevertheless, the cache can have a broad impact on small I/O operations as 1 RPC for updating the size is removed which already improves small file I/O on a single node.

Note that this cache may impact file size consistency in which stat operations may not reflect the actual file size
until the file is closed. The cache does not impact the consistency of the file data itself. We did not observe any issues with the cache for HPC applications and benchmarks, but it technically breaks POSIX. So, for now, I suggest it to be experimental and opt-in.

- `LIBGKFS_WRITE_SIZE_CACHE` - Enable caching the write size of files (default: OFF).
- `LIBGKFS_WRITE_SIZE_CACHE_THRESHOLD` - Set the number of write operations after which the file size is synchronized
  with the corresponding daemon (default: 1000). The file size is further synchronized when the file is `close()`d or
  when `fsync()` is called.

Depends on !194

Closes #62

Closes #62

See merge request !193
parents 950ba459 680fe6b5
No related branches found
No related tags found
1 merge request!193Resolve "Shared file metadata congestion"
Pipeline #4754 passed
Showing with 347 additions and 37 deletions
......@@ -8,9 +8,18 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### New
- Added a write size cache to the file system client to reduce potential metadata network bottlenecks during small I/O
operations ([!193](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/193)).
- The cache is experimental and thus disabled by default. Added the following environment variables.
- `LIBGKFS_WRITE_SIZE_CACHE` - Enable caching the write size of files (default: OFF).
- `LIBGKFS_WRITE_SIZE_CACHE_THRESHOLD` - Set the number of write operations after which the file size is synchronized
with the corresponding daemon (default: 1000). The file size is further synchronized when the file is `close()`d or
when `fsync()` is called.
- Added a directory cache for the file system client to improve `ls -l` type operations by avoiding consecutive stat calls
([!194](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/194)).
- The cache is experimental and thus disabled by default and can be enabled with the env variable `LIBGKFS_DISABLE_DIR_CACHE` set to `ON`.
- The cache is experimental and thus disabled by default. Added the following environment variables.
- `LIBGKFS_DENTRY_CACHE` - Enable caching directory entries until closing the directory (default: OFF).
Further compile-time settings available at `include/config.hpp`.
- Added file system expansion support ([!196](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/196)).
- Added the tool `gkfs_malleability` to steer start, status, and finalize requests for expansion operations.
- `-DGKFS_BUILD_TOOLS=ON` must be set for CMake to build the tool.
......
......@@ -517,8 +517,24 @@ Client-metrics require the CMake argument `-DGKFS_ENABLE_CLIENT_METRICS=ON` (see
- `LIBGKFS_PROXY_PID_FILE` - Path to the proxy pid file (when using the GekkoFS proxy).
- `LIBGKFS_NUM_REPL` - Number of replicas for data.
#### Caching
- `LIBGKFS_DENTRY_CACHE` - Enable caching directory entries until closing the directory (default: OFF).
Improves performance for `ls -l` type operations. Further compile-time settings available at `include/config.hpp`.
##### Dentry cache
Improves performance for `ls -l` type operations by caching file metadata for subsequent `stat()` operations during
`readdir()`. Dependening on the size of the directory, this can avoid a signficant number of stat RPCs.
- `LIBGKFS_DENTRY_CACHE` - Enable caching directory entries until closing the directory (default: OFF).
Further compile-time settings available at `include/config.hpp`.
##### Write size cache
During write operations, the client must update the file size on the responsible metadata daemon. The write size cache
can reduce the metadata load on the daemon and reduce the number of RPCs during write operations, especially for many
small I/O operations.
Note that this cache may impact file size consistency in which stat operations may not reflect the actual file size
until the file is closed. The cache does not impact the consistency of the file data itself.
- `LIBGKFS_WRITE_SIZE_CACHE` - Enable caching the write size of files (default: OFF).
- `LIBGKFS_WRITE_SIZE_CACHE_THRESHOLD` - Set the number of write operations after which the file size is synchronized
with the corresponding daemon (default: 1000). The file size is further synchronized when the file is `close()`d or
when `fsync()` is called.
### Daemon
#### Logging
......
......@@ -39,6 +39,7 @@
#include <mutex>
#include <optional>
#include <cstdint>
#include <utility>
namespace gkfs::cache {
......@@ -132,6 +133,60 @@ public:
};
} // namespace dir
namespace file {
class WriteSizeCache {
private:
// <path<cnt, size>>
std::unordered_map<std::string, std::pair<size_t, size_t>> size_cache;
std::mutex mtx_;
// Flush threshold in number of write ops per file
size_t flush_threshold_{0};
public:
WriteSizeCache() = default;
virtual ~WriteSizeCache() = default;
/**
* @brief Record the size of a file and add it to the cache
* @param path gekkofs path
* @param size current size to set for given path
* @return [size_update counter, current cached size]
*/
std::pair<size_t, size_t>
record(std::string path, size_t size);
/**
* @brief reset entry from the cache
* @param path
* @param evict if true, entry is removed from cache, reseted to cnt 0
* otherwise
* @return [size_update counter, current cached size]
*/
std::pair<size_t, size_t>
reset(const std::string& path, bool evict);
/**
* @brief Flush the cache for a given path contacting the corresponding
* daemon
* @param path
* @param evict during flush: if true, entry is removed from cache, reseted
* to cnt 0 otherwise
* @return error code and flushed size
*/
std::pair<int, off64_t>
flush(const std::string& path, bool evict = true);
// GETTER/SETTER
size_t
flush_threshold() const;
void
flush_threshold(size_t flush_threshold);
};
} // namespace file
} // namespace gkfs::cache
#endif // GKFS_CLIENT_CACHE
......@@ -60,7 +60,12 @@ static constexpr auto METRICS_IP_PORT = ADD_PREFIX("METRICS_IP_PORT");
static constexpr auto NUM_REPL = ADD_PREFIX("NUM_REPL");
static constexpr auto PROXY_PID_FILE = ADD_PREFIX("PROXY_PID_FILE");
static constexpr auto DENTRY_CACHE = ADD_PREFIX("DENTRY_CACHE");
namespace cache {
static constexpr auto DENTRY = ADD_PREFIX("DENTRY_CACHE");
static constexpr auto WRITE_SIZE = ADD_PREFIX("WRITE_SIZE_CACHE");
static constexpr auto WRITE_SIZE_THRESHOLD =
ADD_PREFIX("WRITE_SIZE_CACHE_THRESHOLD");
} // namespace cache
} // namespace gkfs::env
......
......@@ -157,6 +157,9 @@ gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
int
gkfs_rmdir(const std::string& path);
int
gkfs_fsync(unsigned int fd);
int
gkfs_close(unsigned int fd);
......
......@@ -59,6 +59,9 @@ namespace cache {
namespace dir {
class DentryCache;
}
namespace file {
class WriteSizeCache;
}
} // namespace cache
namespace preload {
......@@ -98,6 +101,9 @@ private:
std::shared_ptr<FsConfig> fs_conf_;
std::shared_ptr<gkfs::cache::dir::DentryCache> dentry_cache_;
bool use_dentry_cache_{false};
std::shared_ptr<gkfs::cache::file::WriteSizeCache> write_size_cache_;
bool use_write_size_cache_{false};
std::string cwd_;
std::vector<std::string> mountdir_components_;
......@@ -249,6 +255,18 @@ public:
void
use_dentry_cache(bool use_dentry_cache);
std::shared_ptr<gkfs::cache::file::WriteSizeCache>
write_size_cache() const;
void
write_size_cache(std::shared_ptr<gkfs::cache::file::WriteSizeCache>
write_size_cache);
bool
use_write_size_cache() const;
void
use_write_size_cache(bool use_dentry_cache);
void
enable_interception();
......
......@@ -78,6 +78,18 @@ int
metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md,
struct stat& attr);
/**
* @brief Updates write size on the metadata daemon
* @param path
* @param count
* @param offset
* @param is_append
* @return <err, return_offset>
*/
std::pair<int, off64_t>
update_file_size(const std::string& path, size_t count, off64_t offset,
bool is_append);
void
load_hosts();
......
......@@ -36,6 +36,9 @@ namespace gkfs::env {
std::string
get_var(const std::string& name, const std::string& default_value = "");
int
get_var(const std::string& name, int default_value);
bool
var_is_set(const std::string& name);
......
......@@ -52,6 +52,11 @@ constexpr bool use_dentry_cache = false;
// When enabled, the dentry cache is cleared when a directory is closed.
// Disabling this may cause semantic issues.
constexpr bool clear_dentry_cache_on_close = true;
// When enabled, write operations no longer update the file size on each write.
// Instead, the size is updated every `write_size_flush_threshold` writes per
// file. fsync/close flushes the size to the server immediately.
constexpr bool use_write_size_cache = false;
constexpr auto write_size_flush_threshold = 1000;
} // namespace cache
namespace client_metrics {
......
......@@ -126,4 +126,60 @@ DentryCache::clear() {
} // namespace dir
namespace file {
std::pair<size_t, size_t>
WriteSizeCache::record(std::string path, size_t size) {
std::lock_guard<std::mutex> const lock(mtx_);
auto& pair = size_cache.try_emplace(std::move(path), std::make_pair(0, 0))
.first->second;
pair.first++;
if(pair.second < size) {
pair.second = size;
}
return pair;
}
std::pair<size_t, size_t>
WriteSizeCache::reset(const std::string& path, bool evict) {
std::lock_guard<std::mutex> const lock(mtx_);
auto it = size_cache.find(path);
if(it == size_cache.end()) {
return {};
}
auto entry = it->second;
if(evict) {
// remove entry from cache and discard cached size
size_cache.erase(it);
} else {
// reset counter and keep cached size
it->second.first = 0;
}
return entry;
}
std::pair<int, off64_t>
WriteSizeCache::flush(const std::string& path, bool evict) {
// mutex is set in reset(). No need to lock here
auto [latest_entry_cnt, latest_entry_size] = reset(path, false);
// no new updates in cache, don't return size
if(latest_entry_cnt == 0) {
return {};
}
LOG(DEBUG,
"WriteSizeCache::{}() Flushing and updating size for path '{}' size '{}' on the server. Evict: {}",
__func__, path, latest_entry_size, evict);
return gkfs::utils::update_file_size(path, latest_entry_size, 0, false);
}
size_t
WriteSizeCache::flush_threshold() const {
return flush_threshold_;
}
void
WriteSizeCache::flush_threshold(size_t flush_threshold) {
flush_threshold_ = flush_threshold;
}
} // namespace file
} // namespace gkfs::cache
......@@ -124,6 +124,7 @@ check_parent_dir(const std::string& path) {
#endif // GKFS_CREATE_CHECK_PARENTS
return 0;
}
} // namespace
namespace gkfs::syscall {
......@@ -921,44 +922,57 @@ gkfs_dup2(const int oldfd, const int newfd) {
ssize_t
gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
off64_t offset, bool update_pos) {
if(file.type() != gkfs::filemap::FileType::regular) {
assert(file.type() == gkfs::filemap::FileType::directory);
LOG(WARNING, "Cannot write to directory");
errno = EISDIR;
return -1;
}
int err;
auto path = make_unique<string>(file.path());
auto is_append = file.get_flag(gkfs::filemap::OpenFile_flags::append);
auto write_size = 0;
auto num_replicas = CTX->get_replicas();
pair<int, long> ret_offset;
if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) {
ret_offset = gkfs::rpc::forward_update_metadentry_size_proxy(
*path, count, offset, is_append);
LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'",
__func__, *path, count, offset, is_append);
if(CTX->use_write_size_cache() && !is_append) {
auto [size_update_cnt, cached_size] =
CTX->write_size_cache()->record(*path, offset + count);
if(size_update_cnt > CTX->write_size_cache()->flush_threshold()) {
err = CTX->write_size_cache()->flush(*path, false).first;
if(err) {
LOG(ERROR,
"update_metadentry_size() during cache flush failed with err '{}'",
err);
errno = err;
return -1;
}
}
} else {
ret_offset = gkfs::rpc::forward_update_metadentry_size(
*path, count, offset, is_append, num_replicas);
}
auto err = ret_offset.first;
if(err) {
LOG(ERROR, "update_metadentry_size() failed with err '{}'", err);
errno = err;
return -1;
}
if(is_append) {
// When append is set the EOF is set to the offset
// forward_update_metadentry_size returns. This is because it is an
// atomic operation on the server and reserves the space for this append
if(ret_offset.second == -1) {
LOG(ERROR,
"update_metadentry_size() received -1 as starting offset. "
"This occurs when the staring offset could not be extracted "
"from RocksDB's merge operations. Inform GekkoFS devs.");
errno = EIO;
auto ret_offset =
gkfs::utils::update_file_size(*path, count, offset, is_append);
err = ret_offset.first;
if(err) {
LOG(ERROR, "update_metadentry_size() failed with err '{}'", err);
errno = err;
return -1;
}
offset = ret_offset.second;
if(is_append) {
// When append is set the EOF is set to the offset
// forward_update_metadentry_size returns. This is because it is an
// atomic operation on the server and reserves the space for this
// append
if(ret_offset.second == -1) {
LOG(ERROR,
"update_metadentry_size() received -1 as starting offset. "
"This occurs when the staring offset could not be extracted "
"from RocksDB's merge operations. Inform GekkoFS devs.");
errno = EIO;
return -1;
}
offset = ret_offset.second;
}
}
pair<int, long> ret_write;
......@@ -1571,6 +1585,27 @@ gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
return written;
}
int
gkfs_fsync(unsigned int fd) {
auto file = CTX->file_map()->get(fd);
if(!file) {
errno = 0;
return 0;
}
// flush write size cache to be server consistent
if(CTX->use_write_size_cache()) {
auto err = CTX->write_size_cache()->flush(file->path(), true).first;
if(err) {
LOG(ERROR, "{}() write_size_cache() failed with err '{}'", __func__,
err);
errno = err;
return -1;
}
}
errno = 0;
return 0;
}
/**
* @brief Closes an fd. To be used externally
*
......@@ -1579,7 +1614,18 @@ gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
*/
int
gkfs_close(unsigned int fd) {
if(CTX->file_map()->exist(fd)) {
auto file = CTX->file_map()->get(fd);
if(file) {
// flush write size cache to be server consistent
if(CTX->use_write_size_cache()) {
auto err = CTX->write_size_cache()->flush(file->path(), true).first;
if(err) {
LOG(ERROR, "{}() write_size_cache() failed with err '{}'",
__func__, err);
errno = err;
return -1;
}
}
if(CTX->use_dentry_cache() &&
gkfs::config::cache::clear_dentry_cache_on_close) {
// clear cache for directory
......
......@@ -959,14 +959,11 @@ hook_fstatfs(unsigned int fd, struct statfs* buf) {
* application needs the capabilities*/
int
hook_fsync(unsigned int fd) {
LOG(DEBUG, "{}() called with fd: {}", __func__, fd);
if(CTX->file_map()->exist(fd)) {
errno = 0;
return 0;
return with_errno(gkfs::syscall::gkfs_fsync(fd));
}
return syscall_no_intercept_wrapper(SYS_fsync, fd);
}
......
......@@ -259,7 +259,7 @@ init_environment() {
CTX->distributor(distributor);
}
auto use_dcache = gkfs::env::get_var(gkfs::env::DENTRY_CACHE,
auto use_dcache = gkfs::env::get_var(gkfs::env::cache::DENTRY,
gkfs::config::cache::use_dentry_cache
? "ON"
: "OFF") == "ON";
......@@ -276,13 +276,49 @@ init_environment() {
"Failed to initialize dentry cache: "s + e.what());
}
} else {
if(gkfs::env::var_is_set(gkfs::env::DENTRY_CACHE)) {
if(gkfs::env::var_is_set(gkfs::env::cache::DENTRY)) {
LOG(INFO, "Dentry cache is disabled by environment variable.");
} else {
LOG(INFO, "Dentry cache is disabled by configuration.");
}
}
auto use_write_size_cache =
gkfs::env::get_var(gkfs::env::cache::WRITE_SIZE,
gkfs::config::cache::use_write_size_cache
? "ON"
: "OFF") == "ON";
if(use_write_size_cache) {
try {
LOG(INFO, "Initializing write size cache...");
auto write_size_cache =
std::make_shared<gkfs::cache::file::WriteSizeCache>();
CTX->write_size_cache(write_size_cache);
CTX->write_size_cache()->flush_threshold(gkfs::env::get_var(
gkfs::env::cache::WRITE_SIZE_THRESHOLD,
gkfs::config::cache::write_size_flush_threshold));
CTX->use_write_size_cache(true);
if(CTX->write_size_cache()->flush_threshold() == 0) {
LOG(WARNING,
"Write size cache is enabled but flush threshold is set to 0. Cache is disabled as a result.");
CTX->use_write_size_cache(false);
} else {
LOG(INFO, "Write size cache enabled. Flushing at '{}' writes",
CTX->write_size_cache()->flush_threshold());
}
} catch(const std::exception& e) {
exit_error_msg(EXIT_FAILURE,
"Failed to initialize write size cache: "s +
e.what());
}
} else {
if(gkfs::env::var_is_set(gkfs::env::cache::WRITE_SIZE)) {
LOG(INFO, "Write size cache is disabled by environment variable.");
} else {
LOG(INFO, "Write size cache is disabled by configuration.");
}
}
LOG(INFO, "Retrieving file system configuration...");
if(!gkfs::rpc::forward_get_fs_config()) {
......
......@@ -433,6 +433,27 @@ PreloadContext::use_dentry_cache(bool use_dentry_cache) {
use_dentry_cache_ = use_dentry_cache;
}
std::shared_ptr<gkfs::cache::file::WriteSizeCache>
PreloadContext::write_size_cache() const {
return write_size_cache_;
}
void
PreloadContext::write_size_cache(
std::shared_ptr<gkfs::cache::file::WriteSizeCache> write_size_cache) {
write_size_cache_ = write_size_cache;
}
bool
PreloadContext::use_write_size_cache() const {
return use_write_size_cache_;
}
void
PreloadContext::use_write_size_cache(bool use_write_size_cache) {
use_write_size_cache_ = use_write_size_cache;
}
void
PreloadContext::enable_interception() {
interception_enabled_ = true;
......
......@@ -326,6 +326,23 @@ metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md,
return 0;
}
pair<int, off64_t>
update_file_size(const std::string& path, size_t count, off64_t offset,
bool is_append) {
LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'",
__func__, path, count, offset, is_append);
pair<int, long> ret_offset;
auto num_replicas = CTX->get_replicas();
if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) {
ret_offset = gkfs::rpc::forward_update_metadentry_size_proxy(
path, count, offset, is_append);
} else {
ret_offset = gkfs::rpc::forward_update_metadentry_size(
path, count, offset, is_append, num_replicas);
}
return ret_offset;
}
map<string, uint64_t>
load_forwarding_map_file(const std::string& lfpath) {
......
......@@ -40,6 +40,14 @@ get_var(const string& name, const string& default_value) {
return val != nullptr ? string(val) : default_value;
}
int
get_var(const string& name, const int default_value) {
auto* const val_str = ::secure_getenv(name.c_str());
if(!val_str)
return default_value;
return stoi(val_str);
}
bool
var_is_set(const string& name) {
const char* const val = ::secure_getenv(name.c_str());
......
......@@ -131,7 +131,10 @@ ChunkStorage::destroy_chunk_space(const string& file_path) const {
auto err_str = fmt::format(
"{}() Failed to remove chunk directory. Path: '{}', Error: '{}'",
__func__, chunk_dir, e.what());
throw ChunkStorageException(e.code().value(), err_str);
if(e.code().value() != ENOENT) {
throw ChunkStorageException(e.code().value(), err_str);
}
// throw ChunkStorageException(e.code().value(), err_str);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment