diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e379d82ed506a399fe66a88a6b9a5d7f9d499bb9..d846bcd7f1ea01986383fa83852a826943ad02b6 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -58,7 +58,6 @@ compile GekkoFS: -DGKFS_INSTALL_TESTS:BOOL=ON -DGKFS_ENABLE_FORWARDING:BOOL=ON -DGKFS_ENABLE_AGIOS:BOOL=ON - -DRPC_PROTOCOL="ofi+sockets" -DCMAKE_PREFIX_PATH=${DEPS_INSTALL_PATH} -DCMAKE_INSTALL_PREFIX=${INSTALL_PATH} ${CI_PROJECT_DIR} diff --git a/CMakeLists.txt b/CMakeLists.txt index c4c1af28831ad6e381e4be30da9f3137173faaaa..9292a3cec14a207da123d163a379b37d7e188fa0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -84,7 +84,7 @@ find_package(Snappy REQUIRED) find_package(ZStd REQUIRED) find_package(JeMalloc) # required if rocksdb has been build with jemalloc find_package(RocksDB REQUIRED) -find_package(AGIOS REQUIRED) +find_package(AGIOS) # optional # margo dependencies find_package(Mercury REQUIRED) find_package(Abt REQUIRED) @@ -102,19 +102,6 @@ find_package(Threads REQUIRED) find_package(Date REQUIRED) -set(RPC_PROTOCOL "ofi+sockets" CACHE STRING "Communication plugin used for RPCs") -set_property(CACHE RPC_PROTOCOL PROPERTY STRINGS - "bmi+tcp" - "ofi+sockets" - "ofi+tcp" - "ofi+verbs" - "ofi+psm2" -) -message(STATUS "[gekkofs] RPC protocol: '${RPC_PROTOCOL}'") - -option(USE_SHM "Use shared memory for intra-node communication" OFF) -message(STATUS "[gekkofs] Shared-memory communication: ${USE_SHM}") - option(CREATE_CHECK_PARENTS "Check parent directory existance before creating child node" ON) message(STATUS "[gekkofs] Create checks parents: ${CREATE_CHECK_PARENTS}") diff --git a/README.md b/README.md index 7b36ace3a5a0d77c5c3d99a2f5deb1c8a37afdd2..81811f10e6a46cd91c235c8baec3616bceb06e76 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,31 @@ # GekkoFS -This is a file system. +GekkoFS is a file system capable of aggregating the local I/O capacity and performance of each compute node +in a HPC cluster to produce a high-performance storage space that can be accessed in a distributed manner. +This storage space allows HPC applications and simulations to run in isolation from each other with regards +to I/O, which reduces interferences and improves performance. # Dependencies -## Rocksdb +- Upgrade your gcc to version at least 4.8 to get C++11 support +- CMake >3.6 (>3.11 for GekkoFS testing) -### Debian/Ubuntu - Dependencies +## Debian/Ubuntu - Dependencies -- Upgrade your gcc to version at least 4.8 to get C++11 support. -- Install snappy. This is usually as easy as: `sudo apt-get install libsnappy-dev` -- Install zlib. Try: `sudo apt-get install zlib1g-dev` -- Install bzip2: `sudo apt-get install libbz2-dev` -- Install zstandard: `sudo apt-get install libzstd-dev` -- Install lz4 `sudo apt-get install liblz4-dev` +- snappy: `sudo apt-get install libsnappy-dev` +- zlib: `sudo apt-get install zlib1g-dev` +- bzip2: `sudo apt-get install libbz2-dev` +- zstandard: `sudo apt-get install libzstd-dev` +- lz4: `sudo apt-get install liblz4-dev` +- uuid: `sudo apt-get install uuid-dev` +- capstone: `sudo apt-get install libcapstone-dev` ### CentOS/Red Hat - Dependencies -- Upgrade your gcc to version at least 4.8 to get C++11 support: yum install gcc48-c++ -- Install snappy: - `sudo yum install snappy snappy-devel` -- Install zlib: - `sudo yum install zlib zlib-devel` -- Install bzip2: - `sudo yum install bzip2 bzip2-devel` -- Install ASAN (optional for debugging): - `sudo yum install libasan` -- Install zstandard: + +- snappy: `sudo yum install snappy snappy-devel` +- zlib: `sudo yum install zlib zlib-devel` +- bzip2: `sudo yum install bzip2 bzip2-devel` +- zstandard: ```bash wget https://github.com/facebook/zstd/archive/v1.1.3.tar.gz mv v1.1.3.tar.gz zstd-1.1.3.tar.gz @@ -33,18 +33,22 @@ This is a file system. cd zstd-1.1.3 make && sudo make install ``` +- lz4: `sudo yum install lz4 lz4-devel` +- uuid: `sudo yum install libuuid-devel` +- capstone: `sudo yum install capstone capstone-devel` + # Usage ## Clone and compile direct GekkoFS dependencies -- Go to the `scripts` folder and first clone all dependencies projects. You can choose the according na_plugin +- Go to the `scripts` folder and first clone all dependencies projects. You can choose the according network (na) plugin (execute the script for help): ```bash -usage: dl_dep.sh [-h] [-l] [-n ] [-c ] [-d ] +usage: dl_dep.sh [-h] [-l] [-n ] [-c ] [-d ] source_path - + This script gets all GekkoFS dependency sources (excluding the fs itself) @@ -55,29 +59,32 @@ positional arguments: optional arguments: -h, --help shows this help message and exits -l, --list-dependencies - list dependencies available for download + list dependencies available for download with descriptions -n , --na network layer that is used for communication. Valid: {bmi,ofi,all} - defaults to 'all' - -c , --cluster - additional configurations for specific compute clusters - supported clusters: {mogon1,mogon2,fh2} + defaults to 'ofi' + -c , --config + allows additional configurations, e.g., for specific clusters + supported values: {mogon2, mogon1, ngio, direct, all} + defaults to 'direct' -d , --dependency - download a specific dependency. If unspecified - all dependencies are built and installed. + download a specific dependency and ignore --config setting. If unspecified + all dependencies are built and installed based on set --config setting. + Multiple dependencies are supported: Pass a space-separated string (e.g., "ofi mercury" + -v, --verbose Increase download verbosity ``` - Now use the install script to compile them and install them to the desired directory. You can choose the according -na_plugin (execute the script for help): +(na) network plugin (execute the script for help): ```bash -usage: compile_dep.sh [-h] [-l] [-n ] [-c ] [-d ] [-j ] +usage: compile_dep.sh [-h] [-l] [-n ] [-c ] [-d ] [-j ] source_path install_path - + This script compiles all GekkoFS dependencies (excluding the fs itself) positional arguments: - source_path path to the cloned dependencies path from clone_dep.sh + source_path path to the cloned dependencies path from clone_dep.sh install_path path to the install path of the compiled dependencies @@ -88,12 +95,14 @@ optional arguments: -n , --na network layer that is used for communication. Valid: {bmi,ofi,all} defaults to 'all' - -c , --cluster - additional configurations for specific compute clusters - supported clusters: {mogon1,mogon2,fh2} + -c , --config + allows additional configurations, e.g., for specific clusters + supported values: {mogon1, mogon2, ngio, direct, all} + defaults to 'direct' -d , --dependency - build and install a specific dependency. If unspecified - all dependencies are built and installed. + download a specific dependency and ignore --config setting. If unspecified + all dependencies are built and installed based on set --config setting. + Multiple dependencies are supported: Pass a space-separated string (e.g., "ofi mercury" -j , --compilecores number of cores that are used to compile the dependencies defaults to number of available cores @@ -101,18 +110,14 @@ optional arguments: ``` ## Compile GekkoFS -You need to decide what Mercury NA plugin you want to use. The following NA plugins are available, although only BMI is considered stable at the moment. - - `ofi+sockets` for using the libfabric plugin with TCP - - `ofi+tcp` for using the libfabric plugin with TCP (new version) - - `ofi+verbs` for using the libfabric plugin with Infiniband verbs (not threadsafe. Do not use.) - - `ofi+psm2` for using the libfabric plugin with Intel Omni-Path - - `bmi+tcp` for using the bmi plugin with the tcp protocol + +If above dependencies have been installed outside of the usual system paths, use CMake's `-DCMAKE_PREFIX_PATH` to +make this path known to CMake. ```bash mkdir build && cd build -cmake -DCMAKE_BUILD_TYPE=Release -DRPC_PROTOCOL='ofi+sockets' .. +cmake -DCMAKE_BUILD_TYPE=Release .. make -make install ``` In order to build self-tests, the *optional* GKFS_BUILD_TESTS CMake option needs @@ -121,35 +126,67 @@ to be enabled when building. Once that is done, tests can be run by running ```bash mkdir build && cd build -cmake -DGKFS_BUILD_TESTS=ON -DCMAKE_BUILD_TYPE=Release -DRPC_PROTOCOL='ofi+sockets' .. +cmake -DGKFS_BUILD_TESTS=ON -DCMAKE_BUILD_TYPE=Release .. make make test make install ``` -**IMPORTANT:** Please note that the testing framework requires Python 3.6 as an -additional dependency in order to run. +**IMPORTANT:** Please note that the testing framework requires Python 3.6 and >CMake 3.11 as +additional dependencies in order to run. ## Run GekkoFS -First on each node a daemon has to be started. This can be done in two ways using the `gkfs_daemon` binary directly or -the corresponding startup and shutdown scripts. The latter is recommended for cluster usage. It requires pssh (or -parallel-ssh) with python2. +On each node a daemon (`gkfs_daemon` binary) has to be started. Other tools can be used to execute +the binary on many nodes, e.g., `srun`, `mpiexec/mpirun`, `pdsh`, or `pssh`. + +You need to decide what Mercury NA plugin you want to use for network communication. `ofi+sockets` is the default. +The `-P` argument is used for setting another RPC protocol. See below. + + - `ofi+sockets` for using the libfabric plugin with TCP (stable) + - `ofi+tcp` for using the libfabric plugin with TCP (slower than sockets) + - `ofi+verbs` for using the libfabric plugin with Infiniband verbs (reasonably stable) + - `ofi+psm2` for using the libfabric plugin with Intel Omni-Path (unstable) + - `bmi+tcp` for using the bmi plugin with TCP (alternative to libfabric) ### Start and shut down daemon directly -`./build/bin/gkfs_daemon -r -m ` - -Shut it down by gracefully killing the process. - -### Startup and shutdown scripts +`./build/src/daemon/gkfs_daemon -r -m ` + +Further options: +```bash +Allowed options: + -h [ --help ] Help message + -m [ --mountdir ] arg pseudo mountdir + -r [ --rootdir ] arg data directory + -i [ --metadir ] arg metadata directory, if not set rootdir is used for + metadata + -l [ --listen ] arg Address or interface to bind the daemon on. + Default: local hostname. + When used with ofi+verbs the FI_VERBS_IFACE + environment variable is set accordingly which + associates the verbs device with the network + interface. In case FI_VERBS_IFACE is already + defined, the argument is ignored. Default 'ib'. + -H [ --hosts-file ] arg Shared file used by deamons to register their + enpoints. (default './gkfs_hosts.txt') + -P [ --rpc_protocol ] arg Used RPC protocol for inter-node communication. + Available: {ofi+sockets, ofi+verbs, ofi+psm2} for + (TCP, Infiniband, and Omni-Path, respectively. + (Default ofi+sockets) + Libfabric must have verbs or psm2 support enabled. + --auto_sm Enables intra-node communication (IPCs) via the + `na+sm` (shared memory) protocol, instead of using + the RPC protocol. (Default off) + --version print version and exit +``` -The scripts are located in `scripts/{startup_gkfs.py, shutdown_gkfs.py}`. Use the -h argument for their usage. +Shut it down by gracefully killing the process (SIGTERM). ## Miscellaneous Metadata and actual data will be stored at the ``. The path where the application works on is set with -`` +``. Run the application with the preload library: `LD_PRELOAD=/build/lib/libgkfs_intercept.so ./application`. In the case of an MPI application use the `{mpirun, mpiexec} -x` argument. diff --git a/include/client/gkfs_functions.hpp b/include/client/gkfs_functions.hpp index 86ec304482d78e69ad1eebab59ecf7bca4a099dc..295281fbe6ef7bec9b2289ff1702472d3c422b18 100644 --- a/include/client/gkfs_functions.hpp +++ b/include/client/gkfs_functions.hpp @@ -42,7 +42,10 @@ int gkfs_stat(const std::string& path, struct stat* buf, bool follow_links = tru // Implementation of statx, it uses the normal stat and maps the information to the statx structure // Follow links is true by default #ifdef STATX_TYPE -int gkfs_statx(int dirfd, const std::string& path, int flags, unsigned int mask,struct statx* buf, bool follow_links = true ); + +int gkfs_statx(int dirfd, const std::string& path, int flags, unsigned int mask, struct statx* buf, + bool follow_links = true); + #endif int gkfs_statfs(struct statfs* buf); diff --git a/include/client/hooks.hpp b/include/client/hooks.hpp index fcc043717edd649c5fb94d18ed817544b19fb636..9fd959082427140322b23583bec083a462ee5742 100644 --- a/include/client/hooks.hpp +++ b/include/client/hooks.hpp @@ -44,10 +44,10 @@ int hook_read(unsigned int fd, void* buf, size_t count); int hook_pread(unsigned int fd, char* buf, size_t count, loff_t pos); -int hook_readv(unsigned long fd, const struct iovec * iov, unsigned long iovcnt); +int hook_readv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt); -int hook_preadv(unsigned long fd, const struct iovec * iov, unsigned long iovcnt, - unsigned long pos_l, unsigned long pos_h); +int hook_preadv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt, + unsigned long pos_l, unsigned long pos_h); int hook_write(unsigned int fd, const char* buf, size_t count); diff --git a/include/client/preload_context.hpp b/include/client/preload_context.hpp index 616a92966c4c3b58558b6c53f5564e852046bff7..07e873a0a4f00e5c01a57cf63db343d579b920b7 100644 --- a/include/client/preload_context.hpp +++ b/include/client/preload_context.hpp @@ -83,6 +83,8 @@ private: std::vector hosts_; uint64_t local_host_id_; uint64_t fwd_host_id_; + std::string rpc_protocol_; + bool auto_sm_{false}; bool interception_enabled_; @@ -119,7 +121,6 @@ public: void clear_hosts(); - uint64_t local_host_id() const; void local_host_id(uint64_t id); @@ -128,6 +129,14 @@ public: void fwd_host_id(uint64_t id); + const std::string& rpc_protocol() const; + + void rpc_protocol(const std::string& rpc_protocol); + + bool auto_sm() const; + + void auto_sm(bool auto_sm); + RelativizeStatus relativize_fd_path(int dirfd, const char* raw_path, std::string& relative_path, diff --git a/include/client/preload_util.hpp b/include/client/preload_util.hpp index f2e000ddd717bbcef5e4eecc2e155766f45be927..592b1ede8a150f9c275c6f50d33b61f86d3fb622 100644 --- a/include/client/preload_util.hpp +++ b/include/client/preload_util.hpp @@ -59,11 +59,14 @@ std::shared_ptr get_metadata(const std::string& path, int metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, struct stat& attr); -std::vector> load_hostfile(const std::string& lfpath); - void load_hosts(); + void load_forwarding_map(); +std::vector> read_hosts_file(); + +void connect_to_hosts(const std::vector>& hosts); + } // namespace util } // namespace gkfs diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index a8ab600ff1e7bba0c0f49affa2d2730fa578d7e1..e8c119d913a4863bfa5634df411bb53d34e46f38 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -25,6 +25,7 @@ class OpenDir; } namespace metadata { struct MetadentryUpdateFlags; + class Metadata; } diff --git a/include/daemon/classes/fs_data.hpp b/include/daemon/classes/fs_data.hpp index 8f96f11211ea726096dbe829b9eae1756350d53a..074a70b5d421ca2978699f9112f731801c1c98a9 100644 --- a/include/daemon/classes/fs_data.hpp +++ b/include/daemon/classes/fs_data.hpp @@ -38,7 +38,7 @@ class FsData { private: FsData() {} - //logger + // logger std::shared_ptr spdlogger_; // paths @@ -46,8 +46,11 @@ private: std::string mountdir_; std::string metadir_; + // RPC management + std::string rpc_protocol_; std::string bind_addr_; std::string hosts_file_; + bool use_auto_sm_; // Database std::shared_ptr mdb_; @@ -99,12 +102,20 @@ public: void storage(const std::shared_ptr& storage); + const std::string& rpc_protocol() const; + + void rpc_protocol(const std::string& rpc_protocol); + const std::string& bind_addr() const; void bind_addr(const std::string& addr); const std::string& hosts_file() const; + bool use_auto_sm() const; + + void use_auto_sm(bool use_auto_sm); + void hosts_file(const std::string& lookup_file); bool atime_state() const; diff --git a/include/daemon/scheduler/agios.hpp b/include/daemon/scheduler/agios.hpp index 9ff1104f16c8e36c41eb06569ca300b9707e8a89..457538f75bc700444c983e710ed8c938ace43b0d 100644 --- a/include/daemon/scheduler/agios.hpp +++ b/include/daemon/scheduler/agios.hpp @@ -4,11 +4,14 @@ #include void agios_initialize(); + void agios_shutdown(); -void *agios_callback(int64_t request_id); -void *agios_callback_aggregated(int64_t *requests, int32_t total); -void *agios_eventual_callback(int64_t request_id, void *info); +void* agios_callback(int64_t request_id); + +void* agios_callback_aggregated(int64_t* requests, int32_t total); + +void* agios_eventual_callback(int64_t request_id, void* info); unsigned long long int generate_unique_id(); diff --git a/include/global/cmake_configure.hpp.in b/include/global/cmake_configure.hpp.in index ac06265a12caee43e1dbb0b10ea602ad9d0d360a..0a810132ee89a0d5c56adebfa7e58a9516aeb25e 100644 --- a/include/global/cmake_configure.hpp.in +++ b/include/global/cmake_configure.hpp.in @@ -14,8 +14,6 @@ #ifndef GKFS_CMAKE_CONFIGURE_HPP #define GKFS_CMAKE_CONFIGURE_HPP -#define RPC_PROTOCOL "@RPC_PROTOCOL@" -#cmakedefine01 USE_SHM #cmakedefine01 CREATE_CHECK_PARENTS #cmakedefine01 LOG_SYSCALLS diff --git a/include/global/global_defs.hpp b/include/global/global_defs.hpp index ae099824f5b864f6663659c16137c7c7f7dfd9b7..c2dcf5442e599348965680720cb122fa88fbb844 100644 --- a/include/global/global_defs.hpp +++ b/include/global/global_defs.hpp @@ -46,6 +46,7 @@ constexpr auto ofi_psm2 = "ofi+psm2"; constexpr auto ofi_sockets = "ofi+sockets"; constexpr auto ofi_tcp = "ofi+tcp"; constexpr auto ofi_verbs = "ofi+verbs"; +constexpr auto na_sm = "na+sm"; } // namespace protocol } // namespace rpc diff --git a/scripts/compile_dep.sh b/scripts/compile_dep.sh index 1d4cb98ca267638bfb01df2de558320219645e2e..f15ff66c5688b446b6baf822d6c9f81d991a758e 100755 --- a/scripts/compile_dep.sh +++ b/scripts/compile_dep.sh @@ -13,12 +13,12 @@ VALID_DEP_OPTIONS="mogon2 mogon1 ngio direct all" MOGON1_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "verbs" "agios" + "syscall_intercept" "date" "verbs" ) MOGON2_DEPS=( - "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "agios" "psm2" + "bzip2" "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" + "syscall_intercept" "date" "psm2" ) NGIO_DEPS=( @@ -27,11 +27,11 @@ NGIO_DEPS=( ) DIRECT_DEPS=( - "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" + "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" ) ALL_DEPS=( - "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" + "bzip2" "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" ) @@ -314,6 +314,14 @@ if check_dependency "snappy" "${DEP_CONFIG[@]}"; then make install fi +# build bzip2 for rocksdb +if check_dependency "bzip2" "${DEP_CONFIG[@]}"; then + echo "############################################################ Installing: bzip2" + CURR=${SOURCE}/bzip2 + cd "${CURR}" + make install PREFIX="${INSTALL}" +fi + # build capstone for syscall-intercept if check_dependency "capstone" "${DEP_CONFIG[@]}"; then echo "############################################################ Installing: capstone" diff --git a/scripts/dl_dep.sh b/scripts/dl_dep.sh index bf6c6e6a47b1bd4c8b73322580ec35715e5dd73a..d299330f2c2052538198caa42a97c96e8ac8ef67 100755 --- a/scripts/dl_dep.sh +++ b/scripts/dl_dep.sh @@ -13,25 +13,25 @@ VALID_DEP_OPTIONS="mogon2 mogon1 ngio direct all" MOGON1_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi-verbs" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "agios" + "syscall_intercept" "date" ) MOGON2_DEPS=( - "zstd" "lz4" "snappy" "capstone" "ofi-experimental" "mercury" "argobots" "margo" "rocksdb-experimental" - "syscall_intercept-glibc3" "date" "agios" "psm2" + "bzip2" "zstd" "lz4" "snappy" "capstone" "ofi-experimental" "mercury" "argobots" "margo" "rocksdb-experimental" + "syscall_intercept-glibc3" "date" "psm2" ) NGIO_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi-experimental" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "psm2" + "syscall_intercept" "date" "psm2" "agios" ) DIRECT_DEPS=( - "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" + "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" ) ALL_DEPS=( - "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" + "bzip2" "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" ) @@ -322,6 +322,11 @@ if check_dependency "snappy" "${DEP_CONFIG[@]}"; then wgetdeps "snappy" "https://github.com/google/snappy/archive/1.1.7.tar.gz" & fi +# get bzip2 for rocksdb +if check_dependency "bzip2" "${DEP_CONFIG[@]}"; then + wgetdeps "bzip2" "https://sourceforge.net/projects/bzip2/files/bzip2-1.0.6.tar.gz" & +fi + # get capstone for syscall-intercept if check_dependency "capstone" "${DEP_CONFIG[@]}"; then wgetdeps "capstone" "https://github.com/aquynh/capstone/archive/4.0.1.tar.gz" & @@ -369,21 +374,17 @@ if check_dependency "margo" "${DEP_CONFIG[@]}"; then fi # get rocksdb -if check_dependency "rocksdb" "${DEP_CONFIG[@]}"; then - if check_dependency "rocksdb-experimental" "${DEP_CONFIG[@]}"; then - wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.11.4.tar.gz" & - else - wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.2.2.tar.gz" & - fi +if check_dependency "rocksdb-experimental" "${DEP_CONFIG[@]}"; then + wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.11.4.tar.gz" & +elif check_dependency "rocksdb" "${DEP_CONFIG[@]}"; then + wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.2.2.tar.gz" & fi # get syscall_intercept -if check_dependency "syscall_intercept" "${DEP_CONFIG[@]}"; then - if check_dependency "syscall_intercept-glibc3" "${DEP_CONFIG[@]}"; then - clonedeps "syscall_intercept" "https://github.com/GBuella/syscall_intercept" "ea124fb4ab9eb56bc22a0e94f2b90928c7a88e8c" "-b add_endbr64_and_lea" "syscall_intercept.patch" & - else - clonedeps "syscall_intercept" "https://github.com/pmem/syscall_intercept.git" "cc3412a2ad39f2e26cc307d5b155232811d7408e" "" "syscall_intercept.patch" & - fi +if check_dependency "syscall_intercept-glibc3" "${DEP_CONFIG[@]}"; then + clonedeps "syscall_intercept" "https://github.com/GBuella/syscall_intercept" "ea124fb4ab9eb56bc22a0e94f2b90928c7a88e8c" "-b add_endbr64_and_lea" "syscall_intercept.patch" & +elif check_dependency "syscall_intercept" "${DEP_CONFIG[@]}"; then + clonedeps "syscall_intercept" "https://github.com/pmem/syscall_intercept.git" "cc3412a2ad39f2e26cc307d5b155232811d7408e" "" "syscall_intercept.patch" & fi # get AGIOS diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index d477ec99c24bde135d29cf45c1d310bd4fe553d9..4569b6183c1d90d03bbc1675770fe806d924e6ae 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -65,7 +65,7 @@ target_link_libraries(gkfs_intercept Boost::boost # needed for tokenizer header Threads::Threads Date::TZ -) + ) target_include_directories(gkfs_intercept PRIVATE @@ -77,9 +77,9 @@ install(TARGETS gkfs_intercept LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/gkfs -) + ) -if(GKFS_ENABLE_FORWARDING) +if (GKFS_ENABLE_FORWARDING) set(FWD_PRELOAD_SRC gkfs_functions.cpp hooks.cpp @@ -133,18 +133,18 @@ if(GKFS_ENABLE_FORWARDING) add_library(gkfwd_intercept SHARED ${FWD_PRELOAD_SRC} ${FWD_PRELOAD_HEADERS}) - if(GKFS_ENABLE_AGIOS) + if (GKFS_ENABLE_AGIOS) target_compile_definitions(gkfwd_daemon PUBLIC GKFS_ENABLE_FORWARDING DGKFS_ENABLE_AGIOS - ) - else() + ) + else () target_compile_definitions(gkfwd_daemon PUBLIC GKFS_ENABLE_FORWARDING - ) - endif() + ) + endif () message(STATUS "[gekkofs] Forwarding mode: ${GKFS_ENABLE_FORWARDING}") message(STATUS "[gekkofs] AGIOS scheduling: ${GKFS_ENABLE_AGIOS}") @@ -163,7 +163,7 @@ if(GKFS_ENABLE_FORWARDING) Boost::boost # needed for tokenizer header Threads::Threads Date::TZ - ) + ) target_include_directories(gkfwd_intercept PRIVATE @@ -175,5 +175,5 @@ if(GKFS_ENABLE_FORWARDING) LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/gkfs - ) -endif() \ No newline at end of file + ) +endif () \ No newline at end of file diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index aa399c209ded596fd2cca1e892a7fdd120f91735..3156d8f61ba1ae7f986a95cf012de7d21e749535 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -309,8 +309,8 @@ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask, buf->stx_nlink = tmp.st_nlink; buf->stx_uid = tmp.st_uid; buf->stx_gid = tmp.st_gid; - buf->stx_mode = tmp.st_mode; - buf->stx_ino = tmp.st_ino; + buf->stx_mode = tmp.st_mode; + buf->stx_ino = tmp.st_ino; buf->stx_size = tmp.st_size; buf->stx_blocks = tmp.st_blocks; buf->stx_attributes_mask = 0; @@ -328,6 +328,7 @@ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask, return 0; } + #endif /** diff --git a/src/client/hooks.cpp b/src/client/hooks.cpp index a16d9ea9670301ae6c21e40e61293618dc2c5231..788076d39a4abcad344505dc9ab097d317109fb9 100644 --- a/src/client/hooks.cpp +++ b/src/client/hooks.cpp @@ -99,6 +99,7 @@ int hook_stat(const char* path, struct stat* buf) { } #ifdef STATX_TYPE + int hook_statx(int dirfd, const char* path, int flags, unsigned int mask, struct ::statx* buf) { LOG(DEBUG, "{}() called with dirfd: '{}', path: \"{}\", flags: '{}', mask: '{}', buf: '{}'", @@ -108,25 +109,26 @@ int hook_statx(int dirfd, const char* path, int flags, unsigned int mask, struct auto rstatus = CTX->relativize_fd_path(dirfd, path, resolved); switch (rstatus) { case gkfs::preload::RelativizeStatus::fd_unknown: - return syscall_no_intercept(SYS_statx, dirfd, path, flags, mask, buf); + return syscall_no_intercept(SYS_statx, dirfd, path, flags, mask, buf); case gkfs::preload::RelativizeStatus::external: - return syscall_no_intercept(SYS_statx, dirfd, resolved.c_str(), flags, mask, buf); + return syscall_no_intercept(SYS_statx, dirfd, resolved.c_str(), flags, mask, buf); case gkfs::preload::RelativizeStatus::fd_not_a_dir: return -ENOTDIR; case gkfs::preload::RelativizeStatus::internal: - return with_errno(gkfs::syscall::gkfs_statx(dirfd, resolved.c_str() , flags, mask, buf)); + return with_errno(gkfs::syscall::gkfs_statx(dirfd, resolved.c_str(), flags, mask, buf)); default: LOG(ERROR, "{}() relativize status unknown: {}", __func__); return -EINVAL; } - - return syscall_no_intercept(SYS_statx, dirfd, path, flags, mask, buf); + + return syscall_no_intercept(SYS_statx, dirfd, path, flags, mask, buf); } + #endif int hook_lstat(const char* path, struct stat* buf) { @@ -219,7 +221,7 @@ int hook_readv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt) } int hook_preadv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt, - unsigned long pos_l, unsigned long pos_h) { + unsigned long pos_l, unsigned long pos_h) { LOG(DEBUG, "{}() called with fd: {}, iov: {}, iovcnt: {}, " "pos_l: {}," "pos_h: {}", diff --git a/src/client/intercept.cpp b/src/client/intercept.cpp index 19f786a9bb164f28df471d3568e0fe62306c618e..55bf4b5c0c225b9a437936ebfe32c5afd854ea61 100644 --- a/src/client/intercept.cpp +++ b/src/client/intercept.cpp @@ -446,7 +446,7 @@ int hook(long syscall_number, case SYS_creat: *result = gkfs::hook::hook_openat(AT_FDCWD, reinterpret_cast(arg0), - O_WRONLY | O_CREAT | O_TRUNC, + O_WRONLY | O_CREAT | O_TRUNC, static_cast(arg1)); break; @@ -469,10 +469,10 @@ int hook(long syscall_number, #ifdef STATX_TYPE case SYS_statx: *result = gkfs::hook::hook_statx(static_cast(arg0), - reinterpret_cast(arg1), - static_cast(arg2), - static_cast(arg3), - reinterpret_cast(arg4)); + reinterpret_cast(arg1), + static_cast(arg2), + static_cast(arg3), + reinterpret_cast(arg4)); break; #endif @@ -508,16 +508,16 @@ int hook(long syscall_number, case SYS_readv: *result = gkfs::hook::hook_readv(static_cast(arg0), - reinterpret_cast(arg1), - static_cast(arg2)); + reinterpret_cast(arg1), + static_cast(arg2)); break; case SYS_preadv: *result = gkfs::hook::hook_preadv(static_cast(arg0), - reinterpret_cast(arg1), - static_cast(arg2), - static_cast(arg3), - static_cast(arg4)); + reinterpret_cast(arg1), + static_cast(arg2), + static_cast(arg3), + static_cast(arg4)); break; case SYS_pwrite64: diff --git a/src/client/preload.cpp b/src/client/preload.cpp index 7e5dce8a67bc0ae758f0c9da7e2e1c1912d50c8b..cf2e5b48f1eda4f0ab0d65cf8cce3b10c07a572d 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -60,25 +60,23 @@ inline void exit_error_msg(int errcode, const string& msg) { /** * Initializes the Hermes client for a given transport prefix - * @param transport_prefix - * @return true if succesfully initialized; false otherwise + * @return true if successfully initialized; false otherwise */ -bool init_hermes_client(const std::string& transport_prefix) { +bool init_hermes_client() { try { hermes::engine_options opts{}; -#if USE_SHM - opts |= hermes::use_auto_sm; -#endif - if (gkfs::rpc::protocol::ofi_psm2 == string(RPC_PROTOCOL)) { + if (CTX->auto_sm()) + opts |= hermes::use_auto_sm; + if (gkfs::rpc::protocol::ofi_psm2 == CTX->rpc_protocol()) { opts |= hermes::force_no_block_progress; } ld_network_service = std::make_unique( - hermes::get_transport_type(transport_prefix), opts); + hermes::get_transport_type(CTX->rpc_protocol()), opts); ld_network_service->run(); } catch (const std::exception& ex) { fmt::print(stderr, "Failed to initialize Hermes RPC client {}\n", @@ -94,22 +92,29 @@ bool init_hermes_client(const std::string& transport_prefix) { */ void init_ld_environment_() { + vector> hosts{}; + try { + LOG(INFO, "Loading peer addresses..."); + hosts = gkfs::util::read_hosts_file(); + } catch (const std::exception& e) { + exit_error_msg(EXIT_FAILURE, "Failed to load hosts addresses: "s + e.what()); + } + // initialize Hermes interface to Mercury LOG(INFO, "Initializing RPC subsystem..."); - if (!init_hermes_client(RPC_PROTOCOL)) { + if (!init_hermes_client()) { exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem"); } try { - LOG(INFO, "Loading peer addresses..."); - gkfs::util::load_hosts(); + gkfs::util::connect_to_hosts(hosts); } catch (const std::exception& e) { - exit_error_msg(EXIT_FAILURE, "Failed to load hosts addresses: "s + e.what()); + exit_error_msg(EXIT_FAILURE, "Failed to connect to hosts: "s + e.what()); } /* Setup distributor */ - #ifdef GKFS_ENABLE_FORWARDING +#ifdef GKFS_ENABLE_FORWARDING try { gkfs::util::load_forwarding_map(); @@ -120,11 +125,11 @@ void init_ld_environment_() { auto forwarder_dist = std::make_shared(CTX->fwd_host_id(), CTX->hosts().size()); CTX->distributor(forwarder_dist); - #else +#else auto simple_hash_dist = std::make_shared(CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(simple_hash_dist); - #endif +#endif LOG(INFO, "Retrieving file system configuration..."); @@ -242,9 +247,9 @@ void init_preload() { CTX->unprotect_user_fds(); - #ifdef GKFS_ENABLE_FORWARDING +#ifdef GKFS_ENABLE_FORWARDING init_forwarding_mapper(); - #endif +#endif gkfs::preload::start_interception(); } @@ -253,9 +258,9 @@ void init_preload() { * Called last when preload library is used with the LD_PRELOAD environment variable */ void destroy_preload() { - #ifdef GKFS_ENABLE_FORWARDING +#ifdef GKFS_ENABLE_FORWARDING destroy_forwarding_mapper(); - #endif +#endif CTX->clear_hosts(); LOG(DEBUG, "Peer information deleted"); diff --git a/src/client/preload_context.cpp b/src/client/preload_context.cpp index 9b9f3eca678508206c9f5819adc909f15ffc0bfe..59a2b1f1ae17af7e3d7ffdac009a2184a962a325 100644 --- a/src/client/preload_context.cpp +++ b/src/client/preload_context.cpp @@ -128,6 +128,23 @@ uint64_t PreloadContext::fwd_host_id() const { void PreloadContext::fwd_host_id(uint64_t id) { fwd_host_id_ = id; } + +const std::string& PreloadContext::rpc_protocol() const { + return rpc_protocol_; +} + +void PreloadContext::rpc_protocol(const std::string& rpc_protocol) { + rpc_protocol_ = rpc_protocol; +} + +bool PreloadContext::auto_sm() const { + return auto_sm_; +} + +void PreloadContext::auto_sm(bool auto_sm) { + PreloadContext::auto_sm_ = auto_sm; +} + RelativizeStatus PreloadContext::relativize_fd_path(int dirfd, const char* raw_path, std::string& relative_path, @@ -330,7 +347,7 @@ PreloadContext::protect_user_fds() { const auto fd_is_open = [](int fd) -> bool { const int ret = ::syscall_no_intercept(SYS_fcntl, fd, F_GETFD); return ::syscall_error_code(ret) == 0 || - ::syscall_error_code(ret) != EBADF; + ::syscall_error_code(ret) != EBADF; }; for (int fd = 0; fd < MAX_USER_FDS; ++fd) { @@ -367,4 +384,4 @@ PreloadContext::unprotect_user_fds() { } } // namespace preload -} // namespace gkfs +} // namespace gkfs \ No newline at end of file diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 9bc721f01acc0c73eeab9c3a48136535b2c1d199..cf19a030ae81f6a20ef8e9d842710e6f7745d15f 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include @@ -36,6 +37,13 @@ using namespace std; namespace { +/** + * Looks up a host endpoint via Hermes + * @param uri + * @param max_retries + * @return hermes endpoint, if successful + * @throws std::runtime_error + */ hermes::endpoint lookup_endpoint(const std::string& uri, std::size_t max_retries = 3) { @@ -67,6 +75,82 @@ hermes::endpoint lookup_endpoint(const std::string& uri, uri, error_msg)); } +/** + * extracts protocol from a given URI generated by the RPC server of the daemon + * @param uri + * @throws std::runtime_error + */ +void extract_protocol(const string& uri) { + if (uri.rfind("://") == string::npos) { + // invalid format. kill client + throw runtime_error(fmt::format("Invalid format for URI: '{}'", uri)); + } + string protocol{}; + + if (uri.find(gkfs::rpc::protocol::ofi_sockets) != string::npos) { + protocol = gkfs::rpc::protocol::ofi_sockets; + } else if (uri.find(gkfs::rpc::protocol::ofi_psm2) != string::npos) { + protocol = gkfs::rpc::protocol::ofi_psm2; + } else if (uri.find(gkfs::rpc::protocol::ofi_verbs) != string::npos) { + protocol = gkfs::rpc::protocol::ofi_verbs; + } + // check for shared memory protocol. Can be plain shared memory or real ofi protocol + auto_sm + if (uri.find(gkfs::rpc::protocol::na_sm) != string::npos) { + if (protocol.empty()) + protocol = gkfs::rpc::protocol::na_sm; + else + CTX->auto_sm(true); + } + if (protocol.empty()) { + // unsupported protocol. kill client + throw runtime_error(fmt::format("Unsupported RPC protocol found in hosts file with URI: '{}'", uri)); + } + LOG(INFO, "RPC protocol '{}' extracted from hosts file. Using auto_sm is '{}'", protocol, CTX->auto_sm()); + CTX->rpc_protocol(protocol); +} + +/** + * Reads the daemon generator hosts file by a given path, returning hosts and URI addresses + * @param path to hosts file + * @return vector> + * @throws std::runtime_error + */ +vector> load_hostfile(const std::string& path) { + + LOG(DEBUG, "Loading hosts file: \"{}\"", path); + + ifstream lf(path); + if (!lf) { + throw runtime_error(fmt::format("Failed to open hosts file '{}': {}", + path, strerror(errno))); + } + vector> hosts; + const regex line_re("^(\\S+)\\s+(\\S+)$", + regex::ECMAScript | regex::optimize); + string line; + string host; + string uri; + std::smatch match; + while (getline(lf, line)) { + if (!regex_match(line, match, line_re)) { + + LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", + path, line); + + throw runtime_error( + fmt::format("unrecognized line format: '{}'", line)); + } + host = match[1]; + uri = match[2]; + hosts.emplace_back(host, uri); + } + if (hosts.empty()) { + throw runtime_error("Hosts file found but no suitable addresses could be extracted"); + } + extract_protocol(hosts[0].second); + return hosts; +} + } // namespace namespace gkfs { @@ -152,38 +236,6 @@ int metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md return 0; } -vector> load_hostfile(const std::string& lfpath) { - - LOG(DEBUG, "Loading hosts file: \"{}\"", lfpath); - - ifstream lf(lfpath); - if (!lf) { - throw runtime_error(fmt::format("Failed to open hosts file '{}': {}", - lfpath, strerror(errno))); - } - vector> hosts; - const regex line_re("^(\\S+)\\s+(\\S+)$", - regex::ECMAScript | regex::optimize); - string line; - string host; - string uri; - std::smatch match; - while (getline(lf, line)) { - if (!regex_match(line, match, line_re)) { - - LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", - lfpath, line); - - throw runtime_error( - fmt::format("unrecognized line format: '{}'", line)); - } - host = match[1]; - uri = match[2]; - hosts.emplace_back(host, uri); - } - return hosts; -} - #ifdef GKFS_ENABLE_FORWARDING map load_forwarding_map_file(const std::string& lfpath) { @@ -203,7 +255,7 @@ map load_forwarding_map_file(const std::string& lfpath) { std::smatch match; while (getline(lf, line)) { if (!regex_match(line, match, line_re)) { - + LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", lfpath, line); @@ -221,7 +273,7 @@ map load_forwarding_map_file(const std::string& lfpath) { #ifdef GKFS_ENABLE_FORWARDING void load_forwarding_map() { string forwarding_map_file; - + forwarding_map_file = gkfs::env::get_var(gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path); map forwarding_map; @@ -242,7 +294,7 @@ void load_forwarding_map() { auto local_hostname = get_my_hostname(true); if (forwarding_map.find(local_hostname) == forwarding_map.end()) { - throw runtime_error(fmt::format("Unable to determine the forwarder for host: '{}'", local_hostname)); + throw runtime_error(fmt::format("Unable to determine the forwarder for host: '{}'", local_hostname)); } LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname, forwarding_map[local_hostname]); @@ -250,7 +302,7 @@ void load_forwarding_map() { } #endif -void load_hosts() { +vector> read_hosts_file() { string hostfile; hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path); @@ -269,6 +321,15 @@ void load_hosts() { LOG(INFO, "Hosts pool size: {}", hosts.size()); + return hosts; +} + +/** + * Connects to daemons and lookup Mercury URI addresses via Hermes + * @param hosts vector> + * @throws std::runtime_error through lookup_endpoint() + */ +void connect_to_hosts(const vector>& hosts) { auto local_hostname = gkfs::rpc::get_my_hostname(true); bool local_host_found = false; @@ -313,4 +374,4 @@ void load_hosts() { } } // namespace util -} // namespace gkfs +} // namespace gkfs \ No newline at end of file diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index 0821b1b6356180a691c773584d2a8f58d3eee3b3..0c2c775cc39a994cd742756eb0e73b8a66cbf6a7 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -45,8 +45,8 @@ namespace rpc { * @return pair */ pair forward_write(const string& path, const void* buf, const bool append_flag, - const off64_t in_offset, const size_t write_size, - const int64_t updated_metadentry_size) { + const off64_t in_offset, const size_t write_size, + const int64_t updated_metadentry_size) { assert(write_size > 0); diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index ad35b89e8eef0db06526bff7b4ebf73b1703254a..689e0c01ef777607a0356fc5ce9dcb978e527011 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -61,9 +61,9 @@ target_include_directories(gkfs_daemon install(TARGETS gkfs_daemon RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} -) + ) -if(GKFS_ENABLE_FORWARDING) +if (GKFS_ENABLE_FORWARDING) set(FWD_DAEMON_SRC ../global/rpc/rpc_util.cpp ../global/path_util.cpp @@ -98,18 +98,18 @@ if(GKFS_ENABLE_FORWARDING) ) add_executable(gkfwd_daemon ${FWD_DAEMON_SRC} ${FWD_DAEMON_HEADERS}) - if(GKFS_ENABLE_AGIOS) + if (GKFS_ENABLE_AGIOS) target_compile_definitions(gkfwd_daemon PUBLIC GKFS_ENABLE_FORWARDING DGKFS_ENABLE_AGIOS - ) - else() + ) + else () target_compile_definitions(gkfwd_daemon PUBLIC GKFS_ENABLE_FORWARDING - ) - endif() + ) + endif () message(STATUS "[gekkofs] Forwarding mode: ${GKFS_ENABLE_FORWARDING}") message(STATUS "[gekkofs] AGIOS scheduling: ${GKFS_ENABLE_AGIOS}") @@ -145,5 +145,5 @@ if(GKFS_ENABLE_FORWARDING) install(TARGETS gkfwd_daemon RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - ) -endif() + ) +endif () diff --git a/src/daemon/backend/metadata/CMakeLists.txt b/src/daemon/backend/metadata/CMakeLists.txt index 9c41ace789d279a96712ac601d568a15998658bb..8fdfce5b9f6010f9ae459d36d0b27fd666b090cf 100644 --- a/src/daemon/backend/metadata/CMakeLists.txt +++ b/src/daemon/backend/metadata/CMakeLists.txt @@ -17,4 +17,4 @@ target_link_libraries(metadata_db metadata RocksDB spdlog -) + ) diff --git a/src/daemon/classes/fs_data.cpp b/src/daemon/classes/fs_data.cpp index 6422ab645d0be2bb96956fb91d3be859f46298f5..24117252571131434fd44719aacbfc9757df194e 100644 --- a/src/daemon/classes/fs_data.cpp +++ b/src/daemon/classes/fs_data.cpp @@ -72,6 +72,14 @@ void FsData::metadir(const std::string& metadir) { FsData::metadir_ = metadir; } +const std::string& FsData::rpc_protocol() const { + return rpc_protocol_; +} + +void FsData::rpc_protocol(const std::string& rpc_protocol) { + rpc_protocol_ = rpc_protocol; +} + const std::string& FsData::bind_addr() const { return bind_addr_; } @@ -88,6 +96,14 @@ void FsData::hosts_file(const std::string& lookup_file) { hosts_file_ = lookup_file; } +bool FsData::use_auto_sm() const { + return use_auto_sm_; +} + +void FsData::use_auto_sm(bool use_auto_sm) { + use_auto_sm_ = use_auto_sm; +} + bool FsData::atime_state() const { return atime_state_; } diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 665b2cf11e14924906842f4b95318ab19afe35fa..e963b469e894287b193fc0db5d975f4ad0294774 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -24,6 +24,7 @@ #include #include #include + #ifdef GKFS_ENABLE_AGIOS #include #endif @@ -39,7 +40,7 @@ extern "C" { #include -#include +#include } using namespace std; @@ -50,7 +51,7 @@ static condition_variable shutdown_please; static mutex mtx; void init_io_tasklet_pool() { - assert(gkfs::config::rpc::daemon_io_xstreams >= 0); + static_assert(gkfs::config::rpc::daemon_io_xstreams >= 0, "Daemon IO Execution streams must be higher than 0!"); unsigned int xstreams_num = gkfs::config::rpc::daemon_io_xstreams; //retrieve the pool of the just created scheduler @@ -102,22 +103,18 @@ void register_server_rpcs(margo_instance_id mid) { rpc_srv_get_chunk_stat); } -void init_rpc_server(const string& protocol_port) { +void init_rpc_server() { hg_addr_t addr_self; hg_size_t addr_self_cstring_sz = 128; char addr_self_cstring[128]; struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER; -#if USE_SHM - hg_options.auto_sm = HG_TRUE; -#else - hg_options.auto_sm = HG_FALSE; -#endif + hg_options.auto_sm = GKFS_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE; hg_options.stats = HG_FALSE; hg_options.na_class = nullptr; - if (gkfs::rpc::protocol::ofi_psm2 == string(RPC_PROTOCOL)) + if (gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->rpc_protocol()) hg_options.na_init_info.progress_mode = NA_NO_BLOCK; // Start Margo (this will also initialize Argobots and Mercury internally) - auto mid = margo_init_opt(protocol_port.c_str(), + auto mid = margo_init_opt(GKFS_DATA->bind_addr().c_str(), MARGO_SERVER_MODE, &hg_options, HG_TRUE, @@ -163,20 +160,20 @@ void init_environment() { throw; } - #ifdef GKFS_ENABLE_FORWARDING +#ifdef GKFS_ENABLE_FORWARDING GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); - #endif +#endif - #ifdef GKFS_ENABLE_AGIOS +#ifdef GKFS_ENABLE_AGIOS // Initialize AGIOS scheduler GKFS_DATA->spdlogger()->debug("{}() Initializing AGIOS scheduler: '{}'", __func__, "/tmp/agios.conf"); try { - agios_initialize(); + agios_initialize(); } catch (const std::exception & e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize AGIOS scheduler: {}", __func__, e.what()); throw; } - #endif +#endif // Initialize data backend std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); @@ -190,10 +187,9 @@ void init_environment() { } // Init margo for RPC - GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", - __func__, GKFS_DATA->bind_addr()); + GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", __func__, GKFS_DATA->bind_addr()); try { - init_rpc_server(GKFS_DATA->bind_addr()); + init_rpc_server(); } catch (const std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what()); throw; @@ -227,15 +223,16 @@ void init_environment() { } GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready."); } + #ifdef GKFS_ENABLE_AGIOS /** * Initialize the AGIOS scheduling library */ void agios_initialize() { char configuration[] = "/tmp/agios.conf"; - + if (!agios_init(NULL, NULL, configuration, 0)) { - GKFS_DATA->spdlogger()->error("{}() Failed to initialize AGIOS scheduler: '{}'", __func__, configuration); + GKFS_DATA->spdlogger()->error("{}() Failed to initialize AGIOS scheduler: '{}'", __func__, configuration); agios_exit(); @@ -308,68 +305,37 @@ void initialize_loggers() { gkfs::log::setup(logger_names, level, path); } -int main(int argc, const char* argv[]) { - - // Parse input - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "Help message") - ("mountdir,m", po::value()->required(), "User Fuse mountdir") - ("rootdir,r", po::value()->required(), "data directory") - ("metadir,i", po::value(), "metadata directory, if not set rootdir is used for metadata ") - ("listen,l", po::value(), "Address or interface to bind the daemon on. Default: local hostname.\n" - "When used with ofi+verbs the FI_VERBS_IFACE environment variable is set accordingly " - "which associates the verbs device with the network interface. In case FI_VERBS_IFACE " - "is already defined, the argument is ignored. Default 'ib'.") - ("hosts-file,H", po::value(), - "Shared file used by deamons to register their " - "enpoints. (default './gkfs_hosts.txt')") - ("version", "print version and exit"); - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - - if (vm.count("help")) { - cout << desc << "\n"; - return 1; - } - - if (vm.count("version")) { - cout << GKFS_VERSION_STRING << endl; -#ifndef NDEBUG - cout << "Debug: ON" << endl; -#else - cout << "Debug: OFF" << endl; -#endif - cout << "RPC protocol: " << RPC_PROTOCOL << endl; -#if USE_SHM - cout << "Shared-memory comm: ON" << endl; -#else - cout << "Shared-memory comm: OFF" << endl; -#endif -#if CREATE_CHECK_PARENTS - cout << "Create check parents: ON" << endl; -#else - cout << "Create check parents: OFF" << endl; -#endif - cout << "Chunk size: " << gkfs::config::rpc::chunksize << " bytes" << endl; - return 0; +/** + * + * @param vm + * @throws runtime_error + */ +void parse_input(const po::variables_map& vm) { + auto rpc_protocol = string(gkfs::rpc::protocol::ofi_sockets); + if (vm.count("rpc_protocol")) { + rpc_protocol = vm["rpc_protocol"].as(); + if (rpc_protocol != gkfs::rpc::protocol::ofi_verbs && + rpc_protocol != gkfs::rpc::protocol::ofi_sockets && + rpc_protocol != gkfs::rpc::protocol::ofi_psm2) { + throw runtime_error( + fmt::format("Given RPC protocol '{}' not supported. Check --help for supported protocols.", + rpc_protocol)); + } } - try { - po::notify(vm); - } catch (po::required_option& e) { - std::cerr << "Error: " << e.what() << "\n"; - return 1; + auto use_auto_sm = false; + if (vm.count("auto_sm")) { + use_auto_sm = true; } - - initialize_loggers(); - GKFS_DATA->spdlogger(spdlog::get("main")); + GKFS_DATA->use_auto_sm(use_auto_sm); + GKFS_DATA->spdlogger()->debug("{}() Shared memory (auto_sm) for intra-node communication (IPCs) set to '{}'.", + __func__, use_auto_sm); string addr{}; if (vm.count("listen")) { addr = vm["listen"].as(); // ofi+verbs requires an empty addr to bind to the ib interface - if (RPC_PROTOCOL == string(gkfs::rpc::protocol::ofi_verbs)) { + if (rpc_protocol == gkfs::rpc::protocol::ofi_verbs) { /* * FI_VERBS_IFACE : The prefix or the full name of the network interface associated with the verbs device (default: ib) * Mercury does not allow to bind to an address when ofi+verbs is used @@ -379,22 +345,21 @@ int main(int argc, const char* argv[]) { addr = ""s; } } else { - if (RPC_PROTOCOL != string(gkfs::rpc::protocol::ofi_verbs)) + if (rpc_protocol != gkfs::rpc::protocol::ofi_verbs) addr = gkfs::rpc::get_my_hostname(true); } - GKFS_DATA->bind_addr(fmt::format("{}://{}", RPC_PROTOCOL, addr)); + + GKFS_DATA->rpc_protocol(rpc_protocol); + GKFS_DATA->bind_addr(fmt::format("{}://{}", rpc_protocol, addr)); string hosts_file; if (vm.count("hosts-file")) { hosts_file = vm["hosts-file"].as(); } else { - hosts_file = - gkfs::env::get_var(gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path); + hosts_file = gkfs::env::get_var(gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path); } GKFS_DATA->hosts_file(hosts_file); - GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); - assert(vm.count("mountdir")); auto mountdir = vm["mountdir"].as(); // Create mountdir. We use this dir to get some information on the underlying fs with statfs in gkfs_statfs @@ -403,12 +368,14 @@ int main(int argc, const char* argv[]) { assert(vm.count("rootdir")); auto rootdir = vm["rootdir"].as(); - #ifdef GKFS_ENABLE_FORWARDING + +#ifdef GKFS_ENABLE_FORWARDING // In forwarding mode, the backend is shared auto rootdir_path = bfs::path(rootdir); - #else +#else auto rootdir_path = bfs::path(rootdir) / fmt::format_int(getpid()).str(); - #endif +#endif + GKFS_DATA->spdlogger()->debug("{}() Root directory: '{}'", __func__, rootdir_path.native()); bfs::create_directories(rootdir_path); @@ -417,31 +384,104 @@ int main(int argc, const char* argv[]) { if (vm.count("metadir")) { auto metadir = vm["metadir"].as(); - #ifdef GKFS_ENABLE_FORWARDING +#ifdef GKFS_ENABLE_FORWARDING auto metadir_path = bfs::path(metadir) / fmt::format_int(getpid()).str(); - #else +#else auto metadir_path = bfs::path(metadir); - #endif +#endif bfs::create_directories(metadir_path); GKFS_DATA->metadir(bfs::canonical(metadir_path).native()); GKFS_DATA->spdlogger()->debug("{}() Meta directory: '{}'", - __func__, metadir_path.native()); + __func__, metadir_path.native()); } else { // use rootdir as metadata dir auto metadir = vm["rootdir"].as(); - #ifdef GKFS_ENABLE_FORWARDING +#ifdef GKFS_ENABLE_FORWARDING auto metadir_path = bfs::path(metadir) / fmt::format_int(getpid()).str(); bfs::create_directories(metadir_path); GKFS_DATA->metadir(bfs::canonical(metadir_path).native()); - #else +#else GKFS_DATA->metadir(GKFS_DATA->rootdir()); - #endif +#endif + } + + +} + +int main(int argc, const char* argv[]) { + + // Define arg parsing + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "Help message") + ("mountdir,m", po::value()->required(), "User Fuse mountdir") + ("rootdir,r", po::value()->required(), "data directory") + ("metadir,i", po::value(), "metadata directory, if not set rootdir is used for metadata ") + ("listen,l", po::value(), "Address or interface to bind the daemon on. Default: local hostname.\n" + "When used with ofi+verbs the FI_VERBS_IFACE environment variable is set accordingly " + "which associates the verbs device with the network interface. In case FI_VERBS_IFACE " + "is already defined, the argument is ignored. Default 'ib'.") + ("hosts-file,H", po::value(), + "Shared file used by deamons to register their " + "enpoints. (default './gkfs_hosts.txt')") + ("rpc_protocol,P", po::value(), "Used RPC protocol for inter-node communication.\n" + "Available: {ofi+sockets, ofi+verbs, ofi+psm2} for (TCP, Infiniband, " + "and Omni-Path, respectively. (Default ofi+sockets)\n" + "Libfabric must have enabled support verbs or psm2") + ("auto_sm", "Enables intra-node communication (IPCs) via the `na+sm` (shared memory) protocol, " + "instead of using the RPC protocol. (Default off)") + ("version", "print version and exit"); + po::variables_map vm{}; + po::store(po::parse_command_line(argc, argv, desc), vm); + + if (vm.count("help")) { + cout << desc << "\n"; + return 1; + } + + if (vm.count("version")) { + cout << GKFS_VERSION_STRING << endl; +#ifndef NDEBUG + cout << "Debug: ON" << endl; +#else + cout << "Debug: OFF" << endl; +#endif +#if CREATE_CHECK_PARENTS + cout << "Create check parents: ON" << endl; +#else + cout << "Create check parents: OFF" << endl; +#endif + cout << "Chunk size: " << gkfs::config::rpc::chunksize << " bytes" << endl; + return 0; + } + + try { + po::notify(vm); + } catch (po::required_option& e) { + std::cerr << "Error: " << e.what() << "\n"; + return 1; + } + + // intitialize logging framework + initialize_loggers(); + GKFS_DATA->spdlogger(spdlog::get("main")); + + // parse all input parameters and populate singleton structures + try { + parse_input(vm); + } catch (const std::exception& e) { + cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.", e.what()); + exit(EXIT_FAILURE); } + /* + * Initialize environment and start daemon. Wait until signaled to cancel before shutting down + */ try { + GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); init_environment(); } catch (const std::exception& e) { auto emsg = fmt::format("Failed to initialize environment: {}", e.what()); diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 63ff3c53af88d1851577fa7a17854cf35877d39d..94e355c0a2efbc25241932a459431034c9cba2c2 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -65,7 +65,7 @@ hg_return_t rpc_srv_write(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug( "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); - #ifdef GKFS_ENABLE_AGIOS +#ifdef GKFS_ENABLE_AGIOS int *data; ABT_eventual eventual = ABT_EVENTUAL_NULL; @@ -94,7 +94,7 @@ hg_return_t rpc_srv_write(hg_handle_t handle) { if (!agios_release_request(agios_path, AGIOS_WRITE, in.total_chunk_size, in.offset)) { GKFS_DATA->spdlogger()->error("{}() Failed to release request from AGIOS", __func__); } - #endif +#endif /* * 2. Set up buffers for pull bulk transfers @@ -152,14 +152,14 @@ hg_return_t rpc_srv_write(hg_handle_t handle) { for (auto chnk_id_file = in.chunk_start; chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - #ifndef GKFS_ENABLE_FORWARDING +#ifndef GKFS_ENABLE_FORWARDING if (distributor.locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); continue; } - #endif +#endif chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host @@ -188,7 +188,7 @@ hg_return_t rpc_srv_write(hg_handle_t handle) { // origin offset of a chunk is dependent on a given offset in a write operation if (in.offset > 0) origin_offset = (gkfs::config::rpc::chunksize - in.offset) + - ((chnk_id_file - in.chunk_start) - 1) * gkfs::config::rpc::chunksize; + ((chnk_id_file - in.chunk_start) - 1) * gkfs::config::rpc::chunksize; else origin_offset = (chnk_id_file - in.chunk_start) * gkfs::config::rpc::chunksize; // last chunk might have different transfer_size @@ -279,7 +279,7 @@ hg_return_t rpc_srv_read(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug( "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); - #ifdef GKFS_ENABLE_AGIOS +#ifdef GKFS_ENABLE_AGIOS int *data; ABT_eventual eventual = ABT_EVENTUAL_NULL; @@ -308,7 +308,7 @@ hg_return_t rpc_srv_read(hg_handle_t handle) { if (!agios_release_request(agios_path, AGIOS_READ, in.total_chunk_size, in.offset)) { GKFS_DATA->spdlogger()->error("{}() Failed to release request from AGIOS", __func__); } - #endif +#endif /* * 2. Set up buffers for pull bulk transfers @@ -329,11 +329,11 @@ hg_return_t rpc_srv_read(hg_handle_t handle) { GKFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } - #ifndef GKFS_ENABLE_FORWARDING +#ifndef GKFS_ENABLE_FORWARDING auto const host_id = in.host_id; auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); - #endif +#endif // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); @@ -359,14 +359,14 @@ hg_return_t rpc_srv_read(hg_handle_t handle) { for (auto chnk_id_file = in.chunk_start; chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - #ifndef GKFS_ENABLE_FORWARDING +#ifndef GKFS_ENABLE_FORWARDING if (distributor.locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); continue; } - #endif +#endif chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // Only relevant in the first iteration of the loop and if the chunk hashes to this host @@ -512,6 +512,7 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_write) DEFINE_MARGO_RPC_HANDLER(rpc_srv_read) DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate) + DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat) #ifdef GKFS_ENABLE_AGIOS diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 2a9a188fb0fc091a400629a1a727983ad24bba19..57ec47f99deca08918ae12bd964a09795b90ae4e 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -320,7 +320,7 @@ hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { } GKFS_DATA->spdlogger()->trace("{}() path '{}' Read database with '{}' entries", __func__, in.path, - entries.size()); + entries.size()); if (entries.empty()) { out.err = 0; @@ -372,7 +372,8 @@ hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { for (auto const& e: entries) { if (e.first.empty()) { - GKFS_DATA->spdlogger()->warn("{}() Entry in readdir() empty. If this shows up, something else is very wrong.", __func__); + GKFS_DATA->spdlogger()->warn( + "{}() Entry in readdir() empty. If this shows up, something else is very wrong.", __func__); } *bool_ptr = e.second; bool_ptr++; @@ -400,13 +401,12 @@ hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { out.dirents_size = entries.size(); out.err = 0; GKFS_DATA->spdlogger()->debug("{}() Sending output response err '{}' dirents_size '{}'. DONE", __func__, - out.err, - out.dirents_size); + out.err, + out.dirents_size); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } - #ifdef HAS_SYMLINKS hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { diff --git a/src/daemon/scheduler/agios.cpp b/src/daemon/scheduler/agios.cpp index 2648c1c93e33e2bad2108b9f15ada9805df5bbbc..818a3fd711077041d6f372081b95ebf0cdd162f2 100644 --- a/src/daemon/scheduler/agios.cpp +++ b/src/daemon/scheduler/agios.cpp @@ -4,7 +4,7 @@ unsigned long long int generate_unique_id() { // Calculates the hash of this request timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); - unsigned long long int id = ts.tv_sec*1000000000L + ts.tv_nsec; + unsigned long long int id = ts.tv_sec * 1000000000L + ts.tv_nsec; return id; } \ No newline at end of file diff --git a/src/global/metadata.cpp b/src/global/metadata.cpp index 6f56fa26c13b369b475622e228573cba75723feb..a85c1647df0998496b0d86ed08ba2b134851ce2b 100644 --- a/src/global/metadata.cpp +++ b/src/global/metadata.cpp @@ -252,7 +252,8 @@ void Metadata::target_path(const std::string& target_path) { bool Metadata::is_link() const { return S_ISLNK(mode_); } -#endif + +#endif } // namespace metadata } // namespace gkfs diff --git a/src/global/rpc/distributor.cpp b/src/global/rpc/distributor.cpp index ba8d700cf6cc33ddcb3618bd582082471d27fe88..e69f1fb71049ec276c78504b172f2ef5a348ccce 100644 --- a/src/global/rpc/distributor.cpp +++ b/src/global/rpc/distributor.cpp @@ -69,10 +69,10 @@ locate_directory_metadata(const string& path) const { } ForwarderDistributor:: -ForwarderDistributor(host_t fwhost, unsigned int hosts_size) : - fwd_host_(fwhost), - hosts_size_(hosts_size), - all_hosts_(hosts_size) { +ForwarderDistributor(host_t fwhost, unsigned int hosts_size) : + fwd_host_(fwhost), + hosts_size_(hosts_size), + all_hosts_(hosts_size) { ::iota(all_hosts_.begin(), all_hosts_.end(), 0); }