diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1a2457ca37deaba250fc1567daf328e945055e2f..e379d82ed506a399fe66a88a6b9a5d7f9d499bb9 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -56,6 +56,8 @@ compile GekkoFS: -DCMAKE_BUILD_TYPE=Debug -DGKFS_BUILD_TESTS:BOOL=ON -DGKFS_INSTALL_TESTS:BOOL=ON + -DGKFS_ENABLE_FORWARDING:BOOL=ON + -DGKFS_ENABLE_AGIOS:BOOL=ON -DRPC_PROTOCOL="ofi+sockets" -DCMAKE_PREFIX_PATH=${DEPS_INSTALL_PATH} -DCMAKE_INSTALL_PREFIX=${INSTALL_PATH} diff --git a/CMake/FindAGIOS.cmake b/CMake/FindAGIOS.cmake new file mode 100644 index 0000000000000000000000000000000000000000..a11910d335d667da25d0abdb15b0de89cc83e241 --- /dev/null +++ b/CMake/FindAGIOS.cmake @@ -0,0 +1,18 @@ +find_path(AGIOS_INCLUDE_DIR + NAMES agios.h +) + +find_library(AGIOS_LIBRARY + NAMES agios +) + +set(AGIOS_INCLUDE_DIRS ${AGIOS_INCLUDE_DIR}) +set(AGIOS_LIBRARIES ${AGIOS_LIBRARY}) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(AGIOS DEFAULT_MSG AGIOS_LIBRARIES AGIOS_INCLUDE_DIRS) + +mark_as_advanced( + AGIOS_LIBRARY + AGIOS_INCLUDE_DIR +) \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 60f9230b019618a6413267863f93af21aa8afc6d..c4c1af28831ad6e381e4be30da9f3137173faaaa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -84,6 +84,7 @@ find_package(Snappy REQUIRED) find_package(ZStd REQUIRED) find_package(JeMalloc) # required if rocksdb has been build with jemalloc find_package(RocksDB REQUIRED) +find_package(AGIOS REQUIRED) # margo dependencies find_package(Mercury REQUIRED) find_package(Abt REQUIRED) @@ -142,6 +143,9 @@ if(ENABLE_CLIENT_LOG) endif() message(STATUS "[gekkofs] Client logging output: ${ENABLE_CLIENT_LOG}") +option(GKFS_ENABLE_FORWARDING "Enable forwarding mode" OFF) +option(GKFS_ENABLE_AGIOS "Enable AGIOS scheduling library" OFF) + set(CLIENT_LOG_MESSAGE_SIZE 1024 CACHE STRING "Maximum size of a log message in the client library") add_definitions(-DLIBGKFS_LOG_MESSAGE_SIZE=${CLIENT_LOG_MESSAGE_SIZE}) message(STATUS "[gekkofs] Maximum log message size in the client library: ${CLIENT_LOG_MESSAGE_SIZE}") @@ -232,6 +236,15 @@ if(GKFS_BUILD_TESTS) message(STATUS "[gekkofs] Preparing tests...") set(GKFS_TESTS_INTERFACE "lo" CACHE STRING "Network interface to use when running tests (default: lo)") message(STATUS "[gekkofs] Network interface for tests: ${GKFS_TESTS_INTERFACE}") + + message(STATUS "[gekkofs] Check for forwarding tests...") + if(ENABLE_FORWARDING) + set(GKFS_TESTS_FORWARDING "ON" CACHE STRING "Enable I/O forwarding tests (default: OFF)") + else() + set(GKFS_TESTS_FORWARDING "OFF" CACHE STRING "Enable I/O forwarding tests (default: OFF)") + endif() + message(STATUS "[gekkofs] Forwarding tests: ${GKFS_TESTS_FORWARDING}") + add_subdirectory(tests) else() unset(GKFS_TESTS_INTERFACE CACHE) diff --git a/config/GekkoFWD/agios/access_times.func b/config/GekkoFWD/agios/access_times.func new file mode 100644 index 0000000000000000000000000000000000000000..a6accf35569dd46591dea5fad94b27241e429e1a --- /dev/null +++ b/config/GekkoFWD/agios/access_times.func @@ -0,0 +1,11 @@ +2 +8 64 +566.8405 6784.471 +238.2483 1173.456 +145.6056 1018.143 +187.2437 -201.8793 +64 128 +571.2364 -740.0433 +234.6933 41.99119 +218.3765 -6295.179 +113.1743 2235.442 diff --git a/config/GekkoFWD/agios/agios.conf b/config/GekkoFWD/agios/agios.conf new file mode 100644 index 0000000000000000000000000000000000000000..e0fc38d0a854d12d19a73597e2eab05ac3dc9df5 --- /dev/null +++ b/config/GekkoFWD/agios/agios.conf @@ -0,0 +1,92 @@ +library_options: +{ #ATTENTION: with my most recent modification on statistics, the prediction module is NOT WORKING, DO NOT USE IT. + #should we generate trace files during execution? + trace = false ; + + #should we trace predicted requests? (for debug purposes, this trace will not be useful for future executions) + trace_predict = false ; + + #should we make a complete trace, with all scheduler's operations, such as waiting times? (for debug purposes) + trace_full = false ; + + #should the prediction module read trace files? (if false, the Prediction Module is useless) + predict_read_traces = false ; + + #should the prediction module try to predict aggregations? (requires predict_read_traces = true) + predict_request_aggregation = false ; + + #should the prediction module create simplified traces with information (the metrics) it obtained from the real traces? + predict_write_simplified_traces = false; + + #the tolerance for arrival times difference when checking if two predicted requests are the same (in %) + prediction_time_error = 10 + + #this parameter gives the frequency with which the prediction module will redo its predicted aggregations (in number of requests that must be processed between refreshs). This is necessary because these predictions use a factor that represents the ability to overlap waiting times with processing of other requests. At initialization, this factor will be calculated from the provided trace files, but during execution it can be recalculated using measurements for this ability during the actual scheduling. If the parameter is set to -1, aggregations will not be recalculated during execution. (in number of requests) + prediction_recalculate_alpha_period = -1 + + #prefix and sufix for trace files (with path). Their names must be trace_file_prefix+"."+number+"."+trace_file_sufix, with ordered numbers (no holes) + trace_file_prefix = "/tmp/agios_tracefile" + trace_file_sufix = "out" + #prefix for simple trace files (with path). Their names will be prefix+"."+number+"."+trace_file_sufix + simple_trace_prefix = "/tmp/agios_simpletracefile" + + #parameters used by aIOLi and MLF + waiting_time = 900000 + aioli_quantum = 65536 + mlf_quantum = 8192 + + #parameter used by TW (ms) + time_window_size = 1000 #the paper proposing TW recommends 1000 for HDD and 250 for SSD. + + #file (with path) with access times functions (generated by SeRRa - http://serratoool.bitbucket.org/). Used by aIOLi to quantum assignment and by the mechanism that automatically selects the best scheduling algorithm to use. If you are using a static algorithm which is not aIOLi, this does not matter, but you need to provide it anyway. In this case, you can use the one provided as example with the library source code + access_times_func_file = "/tmp/access_times.func" + + #to how many scheduling algorithms the performance module keeps measurements. When we are changing scheduling algorithms, we may observe new measurements (through the agios_release_request function) to the previous algorithms, so we could update information we have for them. It makes no sense to have a big value for performance_values if we don't change algorithms too often + performance_values = 5 + + #how many scheduling algorithms changes we register in the stats file? + proc_algs = 1000 + + #default I/O scheduling algorithm to use (the one to be used if the previous value was set to false) + #existing algorithms (case sensitive): "MLF", "aIOLi", "SJF", "TO", "TO-agg", "SRTF", "TW", "NOOP", "DYN_TREE", "ARMED_BANDIT" (case sensitive) + # SRTF it's experimental and uses information from trace files (don't use it if these are not available) + # NOOP is the NO operation scheduling algorithm, you should not observe performance improvements by using this one + # TW only makes sense if the user is providing AGIOS correct application id for each request. Don't use it otherwise + # DYN_TREE is a dynamic scheduling algorithm which selects the best (among MLF, AIOLI, SFJ, TO, TO-AGG, and NOOP) according to a decision tree. Use it only when using AGIOS to schedule requests to parallel file system' servers. + # ARMED_BANDIT is another dynamic algorithm which periodically selects between MLF, aIOLi, SJF, TO, TO-agg, and NOOP. It keeps performance measurements for these algorithms and gives more probability to the better ones. + default_algorithm = "TO"; + + # TWINS window size, in us + twins_window_duration = 1000; + dynamic_twins = false; + # time between window size adaptation, in ms + dynamic_twins_period = 1000; + + dynamic_twins_clients = 32; + dynamic_twins_processes = 128; + + # Only relevant if default_algorithm is a dynamic one. this parameter gives the frequency with which the automatic scheduling algorithm selection will recalculate the scheduling algorithm. This selection will be done using the access pattern from this period. If -1 is provided, then the selection will be done at the beginning of execution only (using information from traces for DYN_TREE). The next parameter gives the minimum number of requests which need to happen in this period for the selection to be done (otherwise we will wait longer before recalculating). (in msec). Notice that it makes no sense to use ARMED_BANDIT without setting period and min_reqnumber, as it needs to be iteractive. + select_algorithm_period = 1000 + + select_algorithm_min_reqnumber=1 + + #also for dynamic algorithms. says if TW should be one of the options considered by the dynamic scheduler. TW requires identifying requests according to the applications they come from. If this is not possible, don't use TW! + enable_TW = false ; + + #if default_algorithm is a dynamic algorithm, you need to indicate which static algorithm to use at first (before automatically selecting the next one). Only relevant if you are using ARMED_BANDIT + starting_algorithm = "SJF" ; + + # ARMED_BANDIT parameters - only relevant if default_algorithm is ARMED_BANDIT, but need to be provided anyway. min_ab_probability gives the minimum probability given to scheduling algorithms which performed poorly. The algorithm needs to maintain some probability to accomodate changes in the access pattern. validity_window is the period of time (in msec) for which performance measurements are still valid, after this time period we discard them (so we adapt to new situations). performance_window determines how many performance measurements the ARMED_BANDIT algorithms keeps for each scheduling algorithm option (taking the average of them). Keeping a huge window takes more memory. Moreover, it is related with validity_window and select_algorithm_period as if we discard measurements too often, we will never fill the whole window. + min_ab_probability = 3 + validity_window = 360000 + performance_window = 10 +}; +user_info: +{ + #stripe size used by the library's users (in bytes). This is used for detecting the access pattern at a parallel file system server. Useless for other uses. + stripe_size = 32768 ; + + #maximum buffer size used for storing trace parts (in KB). Having a buffer avoids generating requests to the local file system, which interfere in performance. On the other hand, having a large buffer can affect performance and decrease available space for data buffer. + max_trace_buffer_size = 32768 ; + +}; diff --git a/docker/build_env.docker b/docker/build_env.docker index 46094c08ef0c1975c51da3330dd7ace54671487d..346807b86f756312b0c24843cd7af93197a59545 100644 --- a/docker/build_env.docker +++ b/docker/build_env.docker @@ -25,6 +25,7 @@ RUN yum -y -q update && yum -y -q install \ gcc \ gcc-c++ \ openssl-devel \ + libconfig-devel \ # Mercury dependencies libtool \ libtool-ltdl-devel \ @@ -34,6 +35,7 @@ RUN yum -y -q update && yum -y -q install \ snappy-devel \ libzstd-devel \ lz4-devel \ + bzip2 \ bzip2-devel \ # ada-fs requires C++ 14 devtoolset-8-gcc \ @@ -81,6 +83,7 @@ RUN yum -y -q update && yum -y -q install \ gcc \ gcc-c++ \ openssl-devel \ + libconfig-devel \ # Mercury dependencies libtool \ libtool-ltdl-devel \ @@ -90,6 +93,7 @@ RUN yum -y -q update && yum -y -q install \ snappy-devel \ libzstd-devel \ lz4-devel \ + bzip2 \ bzip2-devel \ # ada-fs requires C++ 14 devtoolset-8-gcc \ diff --git a/docker/debian_build_env.docker b/docker/debian_build_env.docker index b697b49e999a176ef05e1abae885f5e37d5c614b..743e10d22718d65cef27eb67a56e98dd71704750 100644 --- a/docker/debian_build_env.docker +++ b/docker/debian_build_env.docker @@ -19,6 +19,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ cmake \ gcc \ g++ \ + # AGIOS dependencies + libconfig-dev \ # Mercury dependencies libltdl-dev \ lbzip2 \ diff --git a/include/client/env.hpp b/include/client/env.hpp index e438ab7f4e84bd74eeb9e98266a7436c8df49a87..0555371f78994c07f0bb3277c176b2984b8a7d7c 100644 --- a/include/client/env.hpp +++ b/include/client/env.hpp @@ -33,6 +33,9 @@ static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); static constexpr auto LOG_OUTPUT_TRUNC = ADD_PREFIX("LOG_OUTPUT_TRUNC"); static constexpr auto CWD = ADD_PREFIX("CWD"); static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE"); +#ifdef GKFS_ENABLE_FORWARDING +static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE"); +#endif } // namespace env } // namespace gkfs diff --git a/include/client/preload_context.hpp b/include/client/preload_context.hpp index 0eb43be26b87a1638050336b4c06bd592dada04b..616a92966c4c3b58558b6c53f5564e852046bff7 100644 --- a/include/client/preload_context.hpp +++ b/include/client/preload_context.hpp @@ -82,6 +82,7 @@ private: std::vector hosts_; uint64_t local_host_id_; + uint64_t fwd_host_id_; bool interception_enabled_; @@ -123,6 +124,10 @@ public: void local_host_id(uint64_t id); + uint64_t fwd_host_id() const; + + void fwd_host_id(uint64_t id); + RelativizeStatus relativize_fd_path(int dirfd, const char* raw_path, std::string& relative_path, diff --git a/include/client/preload_util.hpp b/include/client/preload_util.hpp index 37cf4fc51dd79984ac1be4bd15738df5409980d5..f2e000ddd717bbcef5e4eecc2e155766f45be927 100644 --- a/include/client/preload_util.hpp +++ b/include/client/preload_util.hpp @@ -62,6 +62,7 @@ int metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md std::vector> load_hostfile(const std::string& lfpath); void load_hosts(); +void load_forwarding_map(); } // namespace util } // namespace gkfs diff --git a/include/config.hpp b/include/config.hpp index 982eddcf42a7e068cca53ac40409c0b1a370acf0..83b5f4bc2ce42991464d770eeb70bba7eb45b9aa 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -24,6 +24,7 @@ namespace gkfs { namespace config { constexpr auto hostfile_path = "./gkfs_hosts.txt"; +constexpr auto forwarding_file_path = "./gkfs_forwarding.map"; namespace io { /* diff --git a/include/daemon/scheduler/agios.hpp b/include/daemon/scheduler/agios.hpp new file mode 100644 index 0000000000000000000000000000000000000000..9ff1104f16c8e36c41eb06569ca300b9707e8a89 --- /dev/null +++ b/include/daemon/scheduler/agios.hpp @@ -0,0 +1,15 @@ +#ifndef IFS_SCHEDULER_HPP +#define IFS_SCHEDULER_HPP + +#include + +void agios_initialize(); +void agios_shutdown(); + +void *agios_callback(int64_t request_id); +void *agios_callback_aggregated(int64_t *requests, int32_t total); +void *agios_eventual_callback(int64_t request_id, void *info); + +unsigned long long int generate_unique_id(); + +#endif \ No newline at end of file diff --git a/include/global/rpc/distributor.hpp b/include/global/rpc/distributor.hpp index 2a0b79ff3ed41783f34bc5e56b64cd42dc485acd..d01a79412fcc8b400bdaf0435e1df69300710b3f 100644 --- a/include/global/rpc/distributor.hpp +++ b/include/global/rpc/distributor.hpp @@ -69,6 +69,24 @@ public: std::vector locate_directory_metadata(const std::string& path) const override; }; +class ForwarderDistributor : public Distributor { +private: + host_t fwd_host_; + unsigned int hosts_size_; + std::vector all_hosts_; + std::hash str_hash; +public: + ForwarderDistributor(host_t fwhost, unsigned int hosts_size); + + host_t localhost() const override final; + + host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const override final; + + host_t locate_file_metadata(const std::string& path) const override; + + std::vector locate_directory_metadata(const std::string& path) const override; +}; + } // namespace rpc } // namespace gkfs diff --git a/scripts/compile_dep.sh b/scripts/compile_dep.sh index 79a0ad69249b46e149067b1e270eae5db313781e..1d4cb98ca267638bfb01df2de558320219645e2e 100755 --- a/scripts/compile_dep.sh +++ b/scripts/compile_dep.sh @@ -13,26 +13,26 @@ VALID_DEP_OPTIONS="mogon2 mogon1 ngio direct all" MOGON1_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "verbs" + "syscall_intercept" "date" "verbs" "agios" ) MOGON2_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "psm2" + "syscall_intercept" "date" "agios" "psm2" ) NGIO_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "psm2" + "syscall_intercept" "date" "agios" "psm2" ) DIRECT_DEPS=( - "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" + "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" ) ALL_DEPS=( "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) usage_short() { @@ -367,6 +367,16 @@ if check_dependency "ofi" "${DEP_CONFIG[@]}"; then fi fi +# AGIOS +if check_dependency "agios" "${DEP_CONFIG[@]}"; then + echo "############################################################ Installing: AGIOS" + CURR=${SOURCE}/agios + prepare_build_dir "${CURR}" + cd "${CURR}"/build + $CMAKE -DCMAKE_INSTALL_PREFIX="${INSTALL}" .. + make install +fi + # Mercury if check_dependency "mercury" "${DEP_CONFIG[@]}"; then @@ -442,7 +452,7 @@ if check_dependency "syscall_intercept" "${DEP_CONFIG[@]}"; then CURR=${SOURCE}/syscall_intercept prepare_build_dir "${CURR}" cd "${CURR}"/build - $CMAKE -DCMAKE_INSTALL_PREFIX="${INSTALL}" -DCMAKE_BUILD_TYPE:STRING=Debug -DBUILD_EXAMPLES:BOOL=OFF -DBUILD_TESTS:BOOK=OFF .. + $CMAKE -DCMAKE_PREFIX_PATH="${INSTALL}" -DCMAKE_INSTALL_PREFIX="${INSTALL}" -DCMAKE_BUILD_TYPE:STRING=Debug -DBUILD_EXAMPLES:BOOL=OFF -DBUILD_TESTS:BOOK=OFF .. make install fi diff --git a/scripts/dl_dep.sh b/scripts/dl_dep.sh index d8cbff6a78daa4570abee50362a6a23fdbff639c..bf6c6e6a47b1bd4c8b73322580ec35715e5dd73a 100755 --- a/scripts/dl_dep.sh +++ b/scripts/dl_dep.sh @@ -13,12 +13,12 @@ VALID_DEP_OPTIONS="mogon2 mogon1 ngio direct all" MOGON1_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi-verbs" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) MOGON2_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi-experimental" "mercury" "argobots" "margo" "rocksdb-experimental" - "syscall_intercept-glibc3" "date" "psm2" + "syscall_intercept-glibc3" "date" "agios" "psm2" ) NGIO_DEPS=( @@ -27,12 +27,12 @@ NGIO_DEPS=( ) DIRECT_DEPS=( - "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" + "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" ) ALL_DEPS=( "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) # Stop all backround jobs on interruption. @@ -386,6 +386,11 @@ if check_dependency "syscall_intercept" "${DEP_CONFIG[@]}"; then fi fi +# get AGIOS +if check_dependency "agios" "${DEP_CONFIG[@]}"; then +clonedeps "agios" "https://github.com/francielizanon/agios.git" "c26a6544200f823ebb8f890dd94e653d148bf226" "-b development" & +fi + # get date if check_dependency "date" "${DEP_CONFIG[@]}"; then clonedeps "date" "https://github.com/HowardHinnant/date.git" "e7e1482087f58913b80a20b04d5c58d9d6d90155" & diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index 0be20f8eae41115f6c389c81ed93cd48e35ed4d8..d477ec99c24bde135d29cf45c1d310bd4fe553d9 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -79,3 +79,101 @@ install(TARGETS gkfs_intercept PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/gkfs ) +if(GKFS_ENABLE_FORWARDING) + set(FWD_PRELOAD_SRC + gkfs_functions.cpp + hooks.cpp + intercept.cpp + logging.cpp + open_file_map.cpp + open_dir.cpp + path.cpp + preload.cpp + preload_context.cpp + preload_util.cpp + ../global/path_util.cpp + ../global/rpc/rpc_util.cpp + rpc/rpc_types.cpp + rpc/forward_data.cpp + rpc/forward_management.cpp + rpc/forward_metadata.cpp + syscalls/detail/syscall_info.c + ) + set(FWD_PRELOAD_HEADERS + ../../include/client/gkfs_functions.hpp + ../../include/config.hpp + ../../include/client/env.hpp + ../../include/client/hooks.hpp + ../../include/client/intercept.hpp + ../../include/client/logging.hpp + ../../include/client/make_array.hpp + ../../include/client/open_file_map.hpp + ../../include/client/open_dir.hpp + ../../include/client/path.hpp + ../../include/client/preload.hpp + ../../include/client/preload_context.hpp + ../../include/client/preload_util.hpp + ../../include/client/rpc/rpc_types.hpp + ../../include/client/rpc/forward_management.hpp + ../../include/client/rpc/forward_metadata.hpp + ../../include/client/rpc/forward_data.hpp + ../../include/client/syscalls/args.hpp + ../../include/client/syscalls/decoder.hpp + ../../include/client/syscalls/errno.hpp + ../../include/client/syscalls/rets.hpp + ../../include/client/syscalls/syscall.hpp + ../../include/client/syscalls/detail/syscall_info.h + ../../include/global/cmake_configure.hpp + ../../include/global/chunk_calc_util.hpp + ../../include/global/global_defs.hpp + ../../include/global/path_util.hpp + ../../include/global/rpc/rpc_types.hpp + ../../include/global/rpc/rpc_util.hpp + ) + + add_library(gkfwd_intercept SHARED ${FWD_PRELOAD_SRC} ${FWD_PRELOAD_HEADERS}) + + if(GKFS_ENABLE_AGIOS) + target_compile_definitions(gkfwd_daemon + PUBLIC + GKFS_ENABLE_FORWARDING + DGKFS_ENABLE_AGIOS + ) + else() + target_compile_definitions(gkfwd_daemon + PUBLIC + GKFS_ENABLE_FORWARDING + ) + endif() + + message(STATUS "[gekkofs] Forwarding mode: ${GKFS_ENABLE_FORWARDING}") + message(STATUS "[gekkofs] AGIOS scheduling: ${GKFS_ENABLE_AGIOS}") + + target_link_libraries(gkfwd_intercept + # internal + metadata + distributor + env_util + # external + Syscall_intercept::Syscall_intercept + dl + mercury + hermes + fmt::fmt + Boost::boost # needed for tokenizer header + Threads::Threads + Date::TZ + ) + + target_include_directories(gkfwd_intercept + PRIVATE + ${ABT_INCLUDE_DIRS} + ${MARGO_INCLUDE_DIRS} + ) + + install(TARGETS gkfwd_intercept + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/gkfs + ) +endif() \ No newline at end of file diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 2f69aa0c4e3afe2a64195702e93055f1a864a12a..fbef20130671a60f113f4007c2bc1d34f1782313 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -468,7 +468,7 @@ ssize_t gkfs_write(int fd, const void* buf, size_t count) { ssize_t gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) { auto file = CTX->file_map()->get(fd); - auto pos = offset; // keep truck of current position + auto pos = offset; // keep track of current position ssize_t written = 0; ssize_t ret; for (int i = 0; i < iovcnt; ++i) { @@ -542,7 +542,7 @@ ssize_t gkfs_read(int fd, void* buf, size_t count) { ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) { auto file = CTX->file_map()->get(fd); - auto pos = offset; // keep truck of current position + auto pos = offset; // keep track of current position ssize_t read = 0; ssize_t ret; for (int i = 0; i < iovcnt; ++i) { diff --git a/src/client/preload.cpp b/src/client/preload.cpp index a48c36c25c9c6902de56d6419c942ec48b2bbf77..7e5dce8a67bc0ae758f0c9da7e2e1c1912d50c8b 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -37,6 +37,13 @@ namespace { // make sure that things are only initialized once pthread_once_t init_env_thread = PTHREAD_ONCE_INIT; +#ifdef GKFS_ENABLE_FORWARDING +pthread_t mapper; +bool forwarding_running; + +pthread_mutex_t remap_mutex; +pthread_cond_t remap_signal; +#endif inline void exit_error_msg(int errcode, const string& msg) { @@ -102,9 +109,22 @@ void init_ld_environment_() { } /* Setup distributor */ + #ifdef GKFS_ENABLE_FORWARDING + try { + gkfs::util::load_forwarding_map(); + + LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id()); + } catch (std::exception& e){ + exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what())); + } + + auto forwarder_dist = std::make_shared(CTX->fwd_host_id(), CTX->hosts().size()); + CTX->distributor(forwarder_dist); + #else auto simple_hash_dist = std::make_shared(CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(simple_hash_dist); + #endif LOG(INFO, "Retrieving file system configuration..."); @@ -115,6 +135,54 @@ void init_ld_environment_() { LOG(INFO, "Environment initialization successful."); } +#ifdef GKFS_ENABLE_FORWARDING +void *forwarding_mapper(void* p) { + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec += 10; // 10 seconds + + int previous = -1; + + while (forwarding_running) { + try { + gkfs::util::load_forwarding_map(); + + if (previous != CTX->fwd_host_id()) { + LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id()); + + previous = CTX->fwd_host_id(); + } + } catch (std::exception& e) { + exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what())); + } + + pthread_mutex_lock(&remap_mutex); + pthread_cond_timedwait(&remap_signal, &remap_mutex, &timeout); + pthread_mutex_unlock(&remap_mutex); + } + + return nullptr; +} +#endif + +#ifdef GKFS_ENABLE_FORWARDING +void init_forwarding_mapper() { + forwarding_running = true; + + pthread_create(&mapper, NULL, forwarding_mapper, NULL); +} +#endif + +#ifdef GKFS_ENABLE_FORWARDING +void destroy_forwarding_mapper() { + forwarding_running = false; + + pthread_cond_signal(&remap_signal); + + pthread_join(mapper, NULL); +} +#endif + void log_prog_name() { std::string line; std::ifstream cmdline("/proc/self/cmdline"); @@ -174,6 +242,10 @@ void init_preload() { CTX->unprotect_user_fds(); + #ifdef GKFS_ENABLE_FORWARDING + init_forwarding_mapper(); + #endif + gkfs::preload::start_interception(); } @@ -181,6 +253,9 @@ void init_preload() { * Called last when preload library is used with the LD_PRELOAD environment variable */ void destroy_preload() { + #ifdef GKFS_ENABLE_FORWARDING + destroy_forwarding_mapper(); + #endif CTX->clear_hosts(); LOG(DEBUG, "Peer information deleted"); diff --git a/src/client/preload_context.cpp b/src/client/preload_context.cpp index ce696e74e01b000bd8c017c7c4d6dcf95b56cb59..9b9f3eca678508206c9f5819adc909f15ffc0bfe 100644 --- a/src/client/preload_context.cpp +++ b/src/client/preload_context.cpp @@ -121,6 +121,13 @@ void PreloadContext::local_host_id(uint64_t id) { local_host_id_ = id; } +uint64_t PreloadContext::fwd_host_id() const { + return fwd_host_id_; +} + +void PreloadContext::fwd_host_id(uint64_t id) { + fwd_host_id_ = id; +} RelativizeStatus PreloadContext::relativize_fd_path(int dirfd, const char* raw_path, std::string& relative_path, @@ -360,4 +367,4 @@ PreloadContext::unprotect_user_fds() { } } // namespace preload -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index e9663604ad51a1d7c2662aab0626c3d30b926d6f..12c94357271db325c9cb2ffc8a43e269eb7862fe 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -176,6 +176,72 @@ vector> load_hostfile(const std::string& lfpath) { return hosts; } +#ifdef GKFS_ENABLE_FORWARDING +map load_forwarding_map_file(const std::string& lfpath) { + + LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath); + + ifstream lf(lfpath); + if (!lf) { + throw runtime_error(fmt::format("Failed to open forwarding map file '{}': {}", + lfpath, strerror(errno))); + } + map forwarding_map; + const regex line_re("^(\\S+)\\s+(\\S+)$", + regex::ECMAScript | regex::optimize); + string line; + string host; + uint64_t forwarder; + std::smatch match; + while (getline(lf, line)) { + if (!regex_match(line, match, line_re)) { + + LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", + lfpath, line); + + throw runtime_error( + fmt::format("unrecognized line format: '{}'", line)); + } + host = match[1]; + forwarder = std::stoi(match[2].str()); + forwarding_map[host] = forwarder; + } + return forwarding_map; +} +#endif + +#ifdef GKFS_ENABLE_FORWARDING +void load_forwarding_map() { + string forwarding_map_file; + + forwarding_map_file = gkfs::env::get_var(gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path); + + map forwarding_map; + + while (forwarding_map.size() == 0) { + try { + forwarding_map = load_forwarding_map_file(forwarding_map_file); + } catch (const exception& e) { + auto emsg = fmt::format("Failed to load forwarding map file: {}", e.what()); + throw runtime_error(emsg); + } + } + + //if (forwarding_map.size() == 0) { + // throw runtime_error(fmt::format("Forwarding map file is empty: '{}'", forwarding_map_file)); + //} + + auto local_hostname = get_my_hostname(true); + + if (forwarding_map.find(local_hostname) == forwarding_map.end()) { + throw runtime_error(fmt::format("Unable to determine the forwarder for host: '{}'", local_hostname)); + } + LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname, forwarding_map[local_hostname]); + + CTX->fwd_host_id(forwarding_map[local_hostname]); +} +#endif + void load_hosts() { string hostfile; @@ -239,4 +305,4 @@ void load_hosts() { } } // namespace util -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index 945a21983032a05ed2ef1675e2e6f96f5ba85c86..ad35b89e8eef0db06526bff7b4ebf73b1703254a 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -62,3 +62,88 @@ target_include_directories(gkfs_daemon install(TARGETS gkfs_daemon RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) + +if(GKFS_ENABLE_FORWARDING) + set(FWD_DAEMON_SRC + ../global/rpc/rpc_util.cpp + ../global/path_util.cpp + daemon.cpp + util.cpp + ops/metadentry.cpp + ops/data.cpp + classes/fs_data.cpp + classes/rpc_data.cpp + handler/srv_data.cpp + handler/srv_metadata.cpp + handler/srv_management.cpp + scheduler/agios.cpp + ) + set(FWD_DAEMON_HEADERS + ../../include/config.hpp + ../../include/version.hpp + ../../include/global/cmake_configure.hpp + ../../include/global/global_defs.hpp + ../../include/global/rpc/rpc_types.hpp + ../../include/global/rpc/rpc_util.hpp + ../../include/global/path_util.hpp + ../../include/daemon/daemon.hpp + ../../include/daemon/util.hpp + ../../include/daemon/scheduler/agios.hpp + ../../include/daemon/ops/data.hpp + ../../include/daemon/ops/metadentry.hpp + ../../include/daemon/classes/fs_data.hpp + ../../include/daemon/classes/rpc_data.hpp + ../../include/daemon/handler/rpc_defs.hpp + ../../include/daemon/handler/rpc_util.hpp + ) + add_executable(gkfwd_daemon ${FWD_DAEMON_SRC} ${FWD_DAEMON_HEADERS}) + + if(GKFS_ENABLE_AGIOS) + target_compile_definitions(gkfwd_daemon + PUBLIC + GKFS_ENABLE_FORWARDING + DGKFS_ENABLE_AGIOS + ) + else() + target_compile_definitions(gkfwd_daemon + PUBLIC + GKFS_ENABLE_FORWARDING + ) + endif() + + message(STATUS "[gekkofs] Forwarding mode: ${GKFS_ENABLE_FORWARDING}") + message(STATUS "[gekkofs] AGIOS scheduling: ${GKFS_ENABLE_AGIOS}") + + target_link_libraries(gkfwd_daemon + # internal libs + metadata + metadata_db + storage + distributor + log_util + env_util + spdlog + fmt::fmt + ${AGIOS_LIBRARIES} + # margo libs + ${ABT_LIBRARIES} + mercury + ${MARGO_LIBRARIES} + # others + Boost::boost # needed for tokenizer header + Boost::program_options + Boost::filesystem + Threads::Threads + ) + + target_include_directories(gkfwd_daemon + PRIVATE + ${AGIOS_INCLUDE_DIRS} + ${ABT_INCLUDE_DIRS} + ${MARGO_INCLUDE_DIRS} + ) + + install(TARGETS gkfwd_daemon + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + ) +endif() diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 5caf65c13ef5fdcf6cdaca92c09e3aa04ed4308e..665b2cf11e14924906842f4b95318ab19afe35fa 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -24,6 +24,9 @@ #include #include #include +#ifdef GKFS_ENABLE_AGIOS +#include +#endif #include #include @@ -160,6 +163,20 @@ void init_environment() { throw; } + #ifdef GKFS_ENABLE_FORWARDING + GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); + #endif + + #ifdef GKFS_ENABLE_AGIOS + // Initialize AGIOS scheduler + GKFS_DATA->spdlogger()->debug("{}() Initializing AGIOS scheduler: '{}'", __func__, "/tmp/agios.conf"); + try { + agios_initialize(); + } catch (const std::exception & e) { + GKFS_DATA->spdlogger()->error("{}() Failed to initialize AGIOS scheduler: {}", __func__, e.what()); + throw; + } + #endif // Initialize data backend std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); @@ -210,6 +227,22 @@ void init_environment() { } GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready."); } +#ifdef GKFS_ENABLE_AGIOS +/** + * Initialize the AGIOS scheduling library + */ +void agios_initialize() { + char configuration[] = "/tmp/agios.conf"; + + if (!agios_init(NULL, NULL, configuration, 0)) { + GKFS_DATA->spdlogger()->error("{}() Failed to initialize AGIOS scheduler: '{}'", __func__, configuration); + + agios_exit(); + + throw; + } +} +#endif /** * Destroys the margo, argobots, and mercury environments @@ -370,7 +403,12 @@ int main(int argc, const char* argv[]) { assert(vm.count("rootdir")); auto rootdir = vm["rootdir"].as(); + #ifdef GKFS_ENABLE_FORWARDING + // In forwarding mode, the backend is shared + auto rootdir_path = bfs::path(rootdir); + #else auto rootdir_path = bfs::path(rootdir) / fmt::format_int(getpid()).str(); + #endif GKFS_DATA->spdlogger()->debug("{}() Root directory: '{}'", __func__, rootdir_path.native()); bfs::create_directories(rootdir_path); @@ -378,11 +416,29 @@ int main(int argc, const char* argv[]) { if (vm.count("metadir")) { auto metadir = vm["metadir"].as(); - bfs::create_directories(metadir); - GKFS_DATA->metadir(bfs::canonical(metadir).native()); + + #ifdef GKFS_ENABLE_FORWARDING + auto metadir_path = bfs::path(metadir) / fmt::format_int(getpid()).str(); + #else + auto metadir_path = bfs::path(metadir); + #endif + + bfs::create_directories(metadir_path); + GKFS_DATA->metadir(bfs::canonical(metadir_path).native()); + + GKFS_DATA->spdlogger()->debug("{}() Meta directory: '{}'", + __func__, metadir_path.native()); } else { // use rootdir as metadata dir + auto metadir = vm["rootdir"].as(); + + #ifdef GKFS_ENABLE_FORWARDING + auto metadir_path = bfs::path(metadir) / fmt::format_int(getpid()).str(); + bfs::create_directories(metadir_path); + GKFS_DATA->metadir(bfs::canonical(metadir_path).native()); + #else GKFS_DATA->metadir(GKFS_DATA->rootdir()); + #endif } try { diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 9b8cca9d71c543f1343e02b041c6f71fecf40c13..27863fd4b4ef1ed9b72a7052a8c36b52385f56d6 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -22,6 +22,14 @@ #include #include +#ifdef GKFS_ENABLE_AGIOS +#include + +#define AGIOS_READ 0 +#define AGIOS_WRITE 1 +#define AGIOS_SERVER_ID_IGNORE 0 +#endif + using namespace std; /** @@ -51,6 +59,37 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug( "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); + #ifdef GKFS_ENABLE_AGIOS + int *data; + ABT_eventual eventual = ABT_EVENTUAL_NULL; + + /* creating eventual */ + ABT_eventual_create(sizeof(int64_t), &eventual); + + unsigned long long int request_id = generate_unique_id(); + char *agios_path = (char*) in.path; + + // We should call AGIOS before chunking (as that is an internal way to handle the requests) + if (!agios_add_request(agios_path, AGIOS_WRITE, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, agios_eventual_callback, eventual)) { + GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS", __func__); + } else { + GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS", __func__, request_id); + } + + /* Block until the eventual is signaled */ + ABT_eventual_wait(eventual, (void **)&data); + + unsigned long long int result = *data; + GKFS_DATA->spdlogger()->debug("{}() request {} was unblocked (offset = {})!", __func__, result, in.offset); + + ABT_eventual_free(&eventual); + + // Let AGIOS knows it can release the request, as it is completed + if (!agios_release_request(agios_path, AGIOS_WRITE, in.total_chunk_size, in.offset)) { + GKFS_DATA->spdlogger()->error("{}() Failed to release request from AGIOS", __func__); + } + #endif + /* * 2. Set up buffers for pull bulk transfers */ @@ -107,12 +146,15 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { for (auto chnk_id_file = in.chunk_start; chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host + #ifndef GKFS_ENABLE_FORWARDING if (distributor.locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); continue; } + #endif + chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { @@ -229,9 +271,40 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); + GKFS_DATA->spdlogger()->debug( "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); + #ifdef GKFS_ENABLE_AGIOS + int *data; + ABT_eventual eventual = ABT_EVENTUAL_NULL; + + /* creating eventual */ + ABT_eventual_create(sizeof(int64_t), &eventual); + + unsigned long long int request_id = generate_unique_id(); + char *agios_path = (char*) in.path; + + // We should call AGIOS before chunking (as that is an internal way to handle the requests) + if (!agios_add_request(agios_path, AGIOS_READ, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, agios_eventual_callback, eventual)) { + GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS", __func__); + } else { + GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS", __func__, request_id); + } + + /* block until the eventual is signaled */ + ABT_eventual_wait(eventual, (void **)&data); + + unsigned long long int result = *data; + GKFS_DATA->spdlogger()->debug("{}() request {} was unblocked (offset = {})!", __func__, result, in.offset); + + ABT_eventual_free(&eventual); + + // let AGIOS knows it can release the request, as it is completed + if (!agios_release_request(agios_path, AGIOS_READ, in.total_chunk_size, in.offset)) { + GKFS_DATA->spdlogger()->error("{}() Failed to release request from AGIOS", __func__); + } + #endif /* * 2. Set up buffers for pull bulk transfers @@ -252,9 +325,11 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { GKFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } + #ifndef GKFS_ENABLE_FORWARDING auto const host_id = in.host_id; auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + #endif // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); @@ -280,12 +355,15 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { for (auto chnk_id_file = in.chunk_start; chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host + #ifndef GKFS_ENABLE_FORWARDING if (distributor.locate_data(in.path, chnk_id_file) != host_id) { GKFS_DATA->spdlogger()->trace( "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", __func__, chnk_id_file, host_id, chnk_id_curr); continue; } + #endif + chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { @@ -427,4 +505,13 @@ static hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat) +#ifdef GKFS_ENABLE_AGIOS +void *agios_eventual_callback(int64_t request_id, void* info) { + GKFS_DATA->spdlogger()->debug("{}() custom callback request {} is ready", __func__, request_id); + + ABT_eventual_set((ABT_eventual) info, &request_id, sizeof(int64_t)); + + return 0; +} +#endif diff --git a/src/daemon/scheduler/agios.cpp b/src/daemon/scheduler/agios.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2648c1c93e33e2bad2108b9f15ada9805df5bbbc --- /dev/null +++ b/src/daemon/scheduler/agios.cpp @@ -0,0 +1,10 @@ +#include + +unsigned long long int generate_unique_id() { + // Calculates the hash of this request + timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + unsigned long long int id = ts.tv_sec*1000000000L + ts.tv_nsec; + + return id; +} \ No newline at end of file diff --git a/src/global/rpc/distributor.cpp b/src/global/rpc/distributor.cpp index ab059785302aec0a56fa224a5632c0d6ac066d44..ba8d700cf6cc33ddcb3618bd582082471d27fe88 100644 --- a/src/global/rpc/distributor.cpp +++ b/src/global/rpc/distributor.cpp @@ -68,5 +68,32 @@ locate_directory_metadata(const string& path) const { return {localhost_}; } +ForwarderDistributor:: +ForwarderDistributor(host_t fwhost, unsigned int hosts_size) : + fwd_host_(fwhost), + hosts_size_(hosts_size), + all_hosts_(hosts_size) { + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); +} + +host_t ForwarderDistributor:: +localhost() const { + return fwd_host_; +} + +host_t ForwarderDistributor:: +locate_data(const std::string& path, const chunkid_t& chnk_id) const { + return fwd_host_; +} + +host_t ForwarderDistributor:: +locate_file_metadata(const std::string& path) const { + return str_hash(path) % hosts_size_; +} + +std::vector ForwarderDistributor:: +locate_directory_metadata(const std::string& path) const { + return all_hosts_; +} } // namespace rpc -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index f29f079e9bbb7081e3e16505ec179e41b3b3e924..60e8540dcb3311d484a60857566adb08988f7db1 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -32,12 +32,14 @@ gkfs_add_python_test( SOURCE operations/ ) +if (GLIBC_HAS_STATX) gkfs_add_python_test( NAME test_lseek PYTHON_VERSION 3.6 WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/tests/integration SOURCE position/test_lseek.py ) +endif() gkfs_add_python_test( NAME test_shell @@ -46,6 +48,12 @@ gkfs_add_python_test( SOURCE shell/ ) +gkfs_add_python_test( + NAME forwarding + PYTHON_VERSION 3.6 + WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/tests/integration + SOURCE forwarding/ +) gkfs_add_python_test( NAME test_data @@ -54,7 +62,6 @@ gkfs_add_python_test( SOURCE data/ ) - if(GKFS_INSTALL_TESTS) install(DIRECTORY harness DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/gkfs/tests/integration @@ -97,13 +104,12 @@ if(GKFS_INSTALL_TESTS) PATTERN ".pytest_cache" EXCLUDE ) - install(DIRECTORY data - DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/gkfs/tests/integration - FILES_MATCHING - REGEX ".*\\.py" - PATTERN "__pycache__" EXCLUDE - PATTERN ".pytest_cache" EXCLUDE + DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/gkfs/tests/integration + FILES_MATCHING + REGEX ".*\\.py" + PATTERN "__pycache__" EXCLUDE + PATTERN ".pytest_cache" EXCLUDE ) install(DIRECTORY shell @@ -113,4 +119,12 @@ if(GKFS_INSTALL_TESTS) PATTERN "__pycache__" EXCLUDE PATTERN ".pytest_cache" EXCLUDE ) + + install(DIRECTORY forwarding + DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/gkfs/tests/integration + FILES_MATCHING + REGEX ".*\\.py" + PATTERN "__pycache__" EXCLUDE + PATTERN ".pytest_cache" EXCLUDE + ) endif() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 91d1ffe2af12c11d79ece510b07f74a749892bd2..4dd28b8d6fbcdf892c7818798493194986ada04e 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -19,7 +19,7 @@ from pathlib import Path from harness.logger import logger, initialize_logging, finalize_logging from harness.cli import add_cli_options, set_default_log_formatter from harness.workspace import Workspace, FileCreator -from harness.gkfs import Daemon, Client, ShellClient +from harness.gkfs import Daemon, Client, ShellClient, FwdDaemon, FwdClient, ShellFwdClient, FwdDaemonCreator, FwdClientCreator from harness.reporter import report_test_status, report_test_headline, report_assertion_pass def pytest_configure(config): @@ -86,8 +86,8 @@ def gkfs_daemon(test_workspace, request): """ interface = request.config.getoption('--interface') - daemon = Daemon(interface, test_workspace) + yield daemon.run() daemon.shutdown() @@ -118,3 +118,24 @@ def file_factory(test_workspace): """ return FileCreator(test_workspace) + +@pytest.fixture +def gkfwd_daemon_factory(test_workspace, request): + """ + Returns a factory that can create forwarding daemons + in the test workspace. + """ + + interface = request.config.getoption('--interface') + + return FwdDaemonCreator(interface, test_workspace) + +@pytest.fixture +def gkfwd_client_factory(test_workspace): + """ + Sets up a gekkofs client environment so that + operations (system calls, library calls, ...) can + be requested from a co-running daemon. + """ + + return FwdClientCreator(test_workspace) diff --git a/tests/integration/forwarding/test_map.py b/tests/integration/forwarding/test_map.py new file mode 100644 index 0000000000000000000000000000000000000000..5a2d33a0b306f96f8afddb0cabd44acfe7bee654 --- /dev/null +++ b/tests/integration/forwarding/test_map.py @@ -0,0 +1,292 @@ +################################################################################ +# Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# SPDX-License-Identifier: MIT # +################################################################################ + +import harness +from pathlib import Path +import errno +import stat +import os +import time +import ctypes +import socket +import sh +import sys +import pytest +from harness.logger import logger + +nonexisting = "nonexisting" + + +def test_two_io_nodes(gkfwd_daemon_factory, gkfwd_client_factory): + """Write files from two clients using two daemons""" + + d00 = gkfwd_daemon_factory.create() + d01 = gkfwd_daemon_factory.create() + + c00 = gkfwd_client_factory.create('c-0') + c01 = gkfwd_client_factory.create('c-1') + + file = d00.mountdir / "file-c00" + + # create a file in gekkofs + ret = c00.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # write a buffer we know + buf = b'42' + ret = c00.write(file, buf, len(buf)) + + assert ret.retval == len(buf) # Return the number of written bytes + assert ret.errno == 115 #FIXME: Should be 0! + + # open the file to read + ret = c00.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # read the file + ret = c00.read(file, len(buf)) + + assert ret.buf == buf + assert ret.retval == len(buf) # Return the number of read bytes + assert ret.errno == 115 #FIXME: Should be 0! + + + file = d01.mountdir / "file-c01" + + # create a file in gekkofs + ret = c01.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # write a buffer we know + buf = b'42' + ret = c01.write(file, buf, len(buf)) + + assert ret.retval == len(buf) # Return the number of written bytes + assert ret.errno == 115 #FIXME: Should be 0! + + # open the file to read + ret = c01.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # read the file + ret = c01.read(file, len(buf)) + + assert ret.buf == buf + assert ret.retval == len(buf) # Return the number of read bytes + assert ret.errno == 115 #FIXME: Should be 0! + + # both files should be there and accessible by the two clients + ret = c00.readdir(d00.mountdir) + + assert len(ret.dirents) == 2 + + assert ret.dirents[0].d_name == 'file-c00' + assert ret.dirents[0].d_type == 8 # DT_REG + assert ret.errno == 115 #FIXME: Should be 0! + + assert ret.dirents[1].d_name == 'file-c01' + assert ret.dirents[1].d_type == 8 # DT_REG + assert ret.errno == 115 #FIXME: Should be 0! + + with open(c00.log) as f: + lines = f.readlines() + + for line in lines: + if 'Forward to' in line: + ion = line.split()[-1] + + assert ion == '0' + + with open(c01.log) as f: + lines = f.readlines() + + for line in lines: + if 'Forward to' in line: + ion = line.split()[-1] + + assert ion == '1' + +def test_two_io_nodes_remap(gkfwd_daemon_factory, gkfwd_client_factory): + """Write files from two clients using two daemons""" + + d00 = gkfwd_daemon_factory.create() + d01 = gkfwd_daemon_factory.create() + + c00 = gkfwd_client_factory.create('c-0') + c01 = gkfwd_client_factory.create('c-1') + + file = d00.mountdir / "file-c00-1" + + # create a file in gekkofs + ret = c00.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # write a buffer we know + buf = b'42' + ret = c00.write(file, buf, len(buf)) + + assert ret.retval == len(buf) # Return the number of written bytes + assert ret.errno == 115 #FIXME: Should be 0! + + with open(c00.log) as f: + lines = f.readlines() + + for line in lines: + if 'Forward to' in line: + ion = line.split()[-1] + + assert ion == '0' + + # recreate the mapping so that the server that wrote will now read + c00.remap('c-1') + + # we need to wait for at least the number of seconds between remap calls + time.sleep(10) + + file = d00.mountdir / "file-c00-2" + + # open the file to write + ret = c00.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # read the file + buf = b'24' + ret = c00.write(file, buf, len(buf)) + + assert ret.retval == len(buf) # Return the number of read bytes + assert ret.errno == 115 #FIXME: Should be 0! + + with open(c00.log) as f: + lines = f.readlines() + + for line in lines: + if 'Forward to' in line: + ion = line.split()[-1] + + assert ion == '1' + +def test_two_io_nodes_operations(gkfwd_daemon_factory, gkfwd_client_factory): + """Write files from one client and read in the other using two daemons""" + + d00 = gkfwd_daemon_factory.create() + d01 = gkfwd_daemon_factory.create() + + c00 = gkfwd_client_factory.create('c-0') + c01 = gkfwd_client_factory.create('c-1') + + file = d00.mountdir / "file-c00" + + # create a file in gekkofs + ret = c00.open(file, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # write a buffer we know + buf = b'42' + ret = c00.write(file, buf, len(buf)) + + assert ret.retval == len(buf) # Return the number of written bytes + assert ret.errno == 115 #FIXME: Should be 0! + + # open the file to read + ret = c00.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # read the file + ret = c00.read(file, len(buf)) + + assert ret.buf == buf + assert ret.retval == len(buf) # Return the number of read bytes + assert ret.errno == 115 #FIXME: Should be 0! + + # open the file to read + ret = c01.open(file, + os.O_RDONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval == 10000 + assert ret.errno == 115 #FIXME: Should be 0! + + # read the file + ret = c01.read(file, len(buf)) + + assert ret.buf == buf + assert ret.retval == len(buf) # Return the number of read bytes + assert ret.errno == 115 #FIXME: Should be 0! + + # the file should be there and accessible by the two clients + ret = c00.readdir(d00.mountdir) + + assert len(ret.dirents) == 1 + + assert ret.dirents[0].d_name == 'file-c00' + assert ret.dirents[0].d_type == 8 # DT_REG + assert ret.errno == 115 #FIXME: Should be 0! + + # the file should be there and accessible by the two clients + ret = c01.readdir(d01.mountdir) + + assert len(ret.dirents) == 1 + + assert ret.dirents[0].d_name == 'file-c00' + assert ret.dirents[0].d_type == 8 # DT_REG + assert ret.errno == 115 #FIXME: Should be 0! + + with open(c00.log) as f: + lines = f.readlines() + + for line in lines: + if 'Forward to' in line: + ion = line.split()[-1] + + assert ion == '0' + + with open(c01.log) as f: + lines = f.readlines() + + for line in lines: + if 'Forward to' in line: + ion = line.split()[-1] + + assert ion == '1' \ No newline at end of file diff --git a/tests/integration/harness/gkfs.py b/tests/integration/harness/gkfs.py index 36abe40ec6828b031d7e3063ebbd3e7ac22014f3..07373ddb0015418e33cc19fa04acec88da8bdbcc 100644 --- a/tests/integration/harness/gkfs.py +++ b/tests/integration/harness/gkfs.py @@ -32,6 +32,18 @@ gkfs_client_log_file = 'gkfs_client.log' gkfs_client_log_level = 'all' gkfs_daemon_active_log_pattern = r'Startup successful. Daemon is ready.' +gkfwd_daemon_cmd = 'gkfwd_daemon' +gkfwd_client_cmd = 'gkfs.io' +gkfwd_client_lib_file = 'libgkfwd_intercept.so' +gkfwd_hosts_file = 'gkfs_hosts.txt' +gkfwd_forwarding_map_file = 'gkfs_forwarding.map' +gkfwd_daemon_log_file = 'gkfwd_daemon.log' +gkfwd_daemon_log_level = '100' +gkfwd_client_log_file = 'gkfwd_client.log' +gkfwd_client_log_level = 'all' +gkfwd_daemon_active_log_pattern = r'Startup successful. Daemon is ready.' + + def get_ip_addr(iface): return netifaces.ifaddresses(iface)[netifaces.AF_INET][0]['addr'] @@ -128,6 +140,49 @@ def _process_exists(pid): return True +class FwdDaemonCreator: + """ + Factory that allows tests to create forwarding daemons in a workspace. + """ + + def __init__(self, interface, workspace): + self._interface = interface + self._workspace = workspace + + def create(self): + """ + Create a forwarding daemon in the tests workspace. + + Returns + ------- + The `FwdDaemon` object to interact with the daemon. + """ + + daemon = FwdDaemon(self._interface, self._workspace) + daemon.run() + + return daemon + +class FwdClientCreator: + """ + Factory that allows tests to create forwarding daemons in a workspace. + """ + + def __init__(self, workspace): + self._workspace = workspace + + def create(self, identifier): + """ + Create a forwarding client in the tests workspace. + + Returns + ------- + The `FwdClient` object to interact with the daemon. + """ + + return FwdClient(self._workspace, identifier) + + class Daemon: def __init__(self, interface, workspace): @@ -582,3 +637,462 @@ class ShellClient: @property def cwd(self): return self._workspace.twd + +class FwdDaemon: + def __init__(self, interface, workspace): + + self._address = get_ephemeral_address(interface) + self._workspace = workspace + + self._cmd = sh.Command(gkfwd_daemon_cmd, self._workspace.bindirs) + self._env = os.environ.copy() + + libdirs = ':'.join( + filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + + [str(p) for p in self._workspace.libdirs])) + + self._patched_env = { + 'LD_LIBRARY_PATH' : libdirs, + 'GKFS_HOSTS_FILE' : self.cwd / gkfwd_hosts_file, + 'GKFS_DAEMON_LOG_PATH' : self.logdir / gkfwd_daemon_log_file, + 'GKFS_LOG_LEVEL' : gkfwd_daemon_log_level + } + self._env.update(self._patched_env) + + def run(self): + + args = [ '--mountdir', self.mountdir, + '--metadir', self.metadir, + '--rootdir', self.rootdir, + '-l', self._address ] + + logger.debug(f"spawning daemon") + logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, args))) + logger.debug(f"patched env:\n{pformat(self._patched_env)}") + + self._proc = self._cmd( + args, + _env=self._env, +# _out=sys.stdout, +# _err=sys.stderr, + _bg=True, + ) + + logger.debug(f"daemon process spawned (PID={self._proc.pid})") + logger.debug("waiting for daemon to be ready") + + try: + self.wait_until_active(self._proc.pid, 10.0) + except Exception as ex: + logger.error(f"daemon initialization failed: {ex}") + + # if the daemon initialized correctly but took longer than + # `timeout`, we may be leaving running processes behind + if _process_exists(self._proc.pid): + self.shutdown() + + logger.critical(f"daemon was shut down, what is ex? {ex.__repr__}?") + raise ex + + logger.debug("daemon is ready") + + return self + + def wait_until_active(self, pid, timeout, max_lines=50): + """ + Waits until a GKFS daemon is active or until a certain timeout + has expired. Checks if the daemon is running by searching its + log for a pre-defined readiness message. + + Parameters + ---------- + pid: `int` + The PID of the daemon process to wait for. + + timeout: `number` + The number of seconds to wait for + + max_lines: `int` + The maximum number of log lines to check for a match. + """ + + gkfs_daemon_active_log_pattern = r'Startup successful. Daemon is ready.' + + init_time = perf_counter() + + while perf_counter() - init_time < timeout: + try: + logger.debug(f"checking log file") + with open(self.logdir / gkfwd_daemon_log_file) as log: + for line in islice(log, max_lines): + if re.search(gkfwd_daemon_active_log_pattern, line) is not None: + return + except FileNotFoundError: + # Log is missing, the daemon might have crashed... + logger.debug(f"daemon log file missing, checking if daemon is alive...") + + pid=self._proc.pid + + if not _process_exists(pid): + raise RuntimeError(f"process {pid} is not running") + + # ... or it might just be lazy. let's give it some more time + logger.debug(f"daemon {pid} found, retrying...") + + raise RuntimeError("initialization timeout exceeded") + + def shutdown(self): + logger.debug(f"terminating daemon") + + try: + self._proc.terminate() + err = self._proc.wait() + except sh.SignalException_SIGTERM: + pass + except Exception: + raise + + + @property + def cwd(self): + return self._workspace.twd + + @property + def rootdir(self): + return self._workspace.rootdir + + @property + def metadir(self): + return self._workspace.metadir + + @property + def mountdir(self): + return self._workspace.mountdir + + @property + def logdir(self): + return self._workspace.logdir + + @property + def interface(self): + return self._interface + + +class FwdClient: + """ + A class to represent a GekkoFS client process with a patched LD_PRELOAD. + This class allows tests to interact with the file system using I/O-related + function calls, be them system calls (e.g. read()) or glibc I/O functions + (e.g. opendir()). + """ + def __init__(self, workspace, identifier): + self._parser = IOParser() + self._workspace = workspace + self._identifier = identifier + self._cmd = sh.Command(gkfwd_client_cmd, self._workspace.bindirs) + self._env = os.environ.copy() + + gkfwd_forwarding_map_file_local = '{}-{}'.format(identifier, gkfwd_forwarding_map_file) + + # create the forwarding map file + fwd_map_file = open(self.cwd / gkfwd_forwarding_map_file_local, 'w') + fwd_map_file.write('{} {}\n'.format(socket.gethostname(), int(identifier.split('-')[1]))) + fwd_map_file.close() + + # record the map so we can modify it latter if needed + self._map = self.cwd / gkfwd_forwarding_map_file_local + + # we need to ensure each client will have a distinct log + gkfwd_client_log_file_local = '{}-{}'.format(identifier, gkfwd_client_log_file) + self._log = self._workspace.logdir / gkfwd_client_log_file_local + + libdirs = ':'.join( + filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + + [str(p) for p in self._workspace.libdirs])) + + # ensure the client interception library is available: + # to avoid running code with potentially installed libraries, + # it must be found in one (and only one) of the workspace's bindirs + preloads = [] + for d in self._workspace.bindirs: + search_path = Path(d) / gkfwd_client_lib_file + if search_path.exists(): + preloads.append(search_path) + + if len(preloads) == 0: + logger.error(f'No client libraries found in the test\'s binary directories:') + pytest.exit("Aborted due to initialization error. Check test logs.") + + if len(preloads) != 1: + logger.error(f'Multiple client libraries found in the test\'s binary directories:') + for p in preloads: + logger.error(f' {p}') + logger.error(f'Make sure that only one copy of the client library is available.') + pytest.exit("Aborted due to initialization error. Check test logs.") + + self._preload_library = preloads[0] + + self._patched_env = { + 'LD_LIBRARY_PATH' : libdirs, + 'LD_PRELOAD' : self._preload_library, + 'LIBGKFS_HOSTS_FILE' : self.cwd / gkfwd_hosts_file, + 'LIBGKFS_FORWARDING_MAP_FILE' : self.cwd / gkfwd_forwarding_map_file_local, + 'LIBGKFS_LOG' : gkfs_client_log_level, + 'LIBGKFS_LOG_OUTPUT' : self._workspace.logdir / gkfwd_client_log_file_local + } + + self._env.update(self._patched_env) + + @property + def preload_library(self): + """ + Return the preload library detected for this client + """ + + return self._preload_library + + def run(self, cmd, *args): + + logger.debug(f"running client") + logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, list(args)))) + logger.debug(f"patched env: {pformat(self._patched_env)}") + + out = self._cmd( + [ cmd ] + list(args), + _env = self._env, + # _out=sys.stdout, + # _err=sys.stderr, + ) + + logger.debug(f"command output: {out.stdout}") + return self._parser.parse(cmd, out.stdout) + + def remap(self, identifier): + fwd_map_file = open(self.cwd / self._map, 'w') + fwd_map_file.write('{} {}\n'.format(socket.gethostname(), int(identifier.split('-')[1]))) + fwd_map_file.close() + + def __getattr__(self, name): + return _proxy_exec(self, name) + + @property + def cwd(self): + return self._workspace.twd + + @property + def log(self): + return self._log + +class ShellFwdClient: + """ + A class to represent a GekkoFS shell client process. + This class allows tests to execute shell commands or scripts via bash -c + on a GekkoFS instance. + """ + + def __init__(self, workspace): + self._workspace = workspace + self._cmd = sh.Command("bash") + self._env = os.environ.copy() + + # create the forwarding map file + fwd_map_file = open(self.cwd / gkfwd_forwarding_map_file, 'w') + fwd_map_file.write('{} {}\n'.format(socket.gethostname(), 0)) + fwd_map_file.close() + + libdirs = ':'.join( + filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + + [str(p) for p in self._workspace.libdirs])) + + # ensure the client interception library is available: + # to avoid running code with potentially installed libraries, + # it must be found in one (and only one) of the workspace's bindirs + preloads = [] + for d in self._workspace.bindirs: + search_path = Path(d) / gkfwd_client_lib_file + if search_path.exists(): + preloads.append(search_path) + + if len(preloads) != 1: + logger.error(f'Multiple client libraries found in the test\'s binary directories:') + for p in preloads: + logger.error(f' {p}') + logger.error(f'Make sure that only one copy of the client library is available.') + pytest.exit("Aborted due to initialization error") + + self._preload_library = preloads[0] + + self._patched_env = { + 'LD_LIBRARY_PATH' : libdirs, + 'LD_PRELOAD' : self._preload_library, + 'LIBGKFS_HOSTS_FILE' : self.cwd / gkfwd_hosts_file, + 'LIBGKFS_FORWARDING_MAP_FILE' : self.cwd / gkfwd_forwarding_map_file, + 'LIBGKFS_LOG' : gkfwd_client_log_level, + 'LIBGKFS_LOG_OUTPUT' : self._workspace.logdir / gkfwd_client_log_file + } + + self._env.update(self._patched_env) + + @property + def patched_environ(self): + """ + Return the patched environment required to run a test as a string that + can be prepended to a shell command. + """ + + return ' '.join(f'{k}="{v}"' for k,v in self._patched_env.items()) + + def script(self, code, intercept_shell=True, timeout=60, timeout_signal=signal.SIGKILL): + """ + Execute a shell script passed as an argument in bash. + + For instance, the following snippet: + + mountdir = pathlib.Path('/tmp') + file01 = 'file01' + + ShellClient().script( + f''' + expected_pathname={mountdir / file01} + if [[ -e ${{expected_pathname}} ]]; + then + exit 0 + fi + exit 1 + ''') + + transforms into: + + bash -c ' + expected_pathname=/tmp/file01 + if [[ -e ${expected_pathname} ]]; + then + exit 0 + fi + exit 1 + ' + + Note that since we are using Python's f-strings, for variable + expansions to work correctly, they need to be defined with double + braces, e.g. ${{expected_pathname}}. + + Parameters + ---------- + code: `str` + The script code to be passed to 'bash -c'. + + intercept_shell: `bool` + Controls whether the shell executing the script should be + executed with LD_PRELOAD=libgkfwd_intercept.so (default: True). + + timeout: `int` + How much time, in seconds, we should give the process to complete. + If the process does not finish within the timeout, it will be sent + the signal defined by `timeout_signal`. + + Default value: 60 + + timeout_signal: `int` + The signal to be sent to the process if `timeout` is not None. + + Default value: signal.SIGKILL + + Returns + ------- + A sh.RunningCommand instance that allows interacting with + the finished process. + """ + + logger.debug(f"running bash") + logger.debug(f"cmd: bash -c '{code}'") + logger.debug(f"timeout: {timeout} seconds") + logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}") + + if intercept_shell: + logger.debug(f"patched env: {self._patched_env}") + + # 'sh' raises an exception if the return code is not zero; + # since we'd rather check for return codes explictly, we + # whitelist all exit codes from 1 to 255 as 'ok' using the + # _ok_code argument + return self._cmd('-c', + code, + _env = (self._env if intercept_shell else os.environ), + # _out=sys.stdout, + # _err=sys.stderr, + _timeout=timeout, + _timeout_signal=timeout_signal, + # _ok_code=list(range(0, 256)) + ) + + def run(self, cmd, *args, timeout=60, timeout_signal=signal.SIGKILL): + """ + Execute a shell command with arguments. + + For example, the following snippet: + + mountdir = pathlib.Path('/tmp') + file01 = 'file01' + + ShellClient().stat('--terse', mountdir / file01) + + transforms into: + + bash -c 'stat --terse /tmp/file01' + + Parameters: + ----------- + cmd: `str` + The command to execute. + + args: `list` + The list of arguments for the command. + + timeout: `number` + How much time, in seconds, we should give the process to complete. + If the process does not finish within the timeout, it will be sent + the signal defined by `timeout_signal`. + + Default value: 60 + + timeout_signal: `int` + The signal to be sent to the process if `timeout` is not None. + + Default value: signal.SIGKILL + + Returns + ------- + A ShellCommand instance that allows interacting with the finished + process. Note that ShellCommand wraps sh.RunningCommand and adds s + extra properties to it. + """ + + bash_c_args = f"{cmd} {' '.join(str(a) for a in args)}" + logger.debug(f"running bash") + logger.debug(f"cmd: bash -c '{bash_c_args}'") + logger.debug(f"timeout: {timeout} seconds") + logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}") + logger.debug(f"patched env:\n{pformat(self._patched_env)}") + + # 'sh' raises an exception if the return code is not zero; + # since we'd rather check for return codes explictly, we + # whitelist all exit codes from 1 to 255 as 'ok' using the + # _ok_code argument + proc = self._cmd('-c', + bash_c_args, + _env = self._env, + # _out=sys.stdout, + # _err=sys.stderr, + _timeout=timeout, + _timeout_signal=timeout_signal, + # _ok_code=list(range(0, 256)) + ) + + return ShellCommand(cmd, proc) + + def __getattr__(self, name): + return _proxy_exec(self, name) + + @property + def cwd(self): + return self._workspace.twd diff --git a/tests/integration/harness/workspace.py b/tests/integration/harness/workspace.py index 319b55a906223302cb2e3b05cfe7b65cf6559515..50f135c74f48bb0d4152fe6bcd6c33d7dae27287 100644 --- a/tests/integration/harness/workspace.py +++ b/tests/integration/harness/workspace.py @@ -48,6 +48,7 @@ class Workspace: self._libdirs = libdirs self._logdir = self._twd / 'logs' self._rootdir = self._twd / 'root' + self._metadir = self._twd / 'meta' self._mountdir = self._twd / 'mnt' self._tmpdir = self._twd / 'tmp' @@ -76,6 +77,10 @@ class Workspace: def rootdir(self): return self._rootdir + @property + def metadir(self): + return self._metadir + @property def mountdir(self): return self._mountdir