diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1a2457ca37deaba250fc1567daf328e945055e2f..9b1ac1266cc67409a9da6d39533f43ba2fc84edb 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -2,28 +2,44 @@ stages: - download deps - build deps - build + - build forwarding - test + - test forwarding variables: - DEPS_SRC_PATH: "${CI_PROJECT_DIR}/deps/src" - DEPS_INSTALL_PATH: "${CI_PROJECT_DIR}/deps/install" - DEPS_COMMIT: "${CI_PROJECT_DIR}/deps/install/gkfs_deps_commit" - BUILD_PATH: "${CI_PROJECT_DIR}/gkfs/build" - INSTALL_PATH: "${CI_PROJECT_DIR}/gkfs/install" - INTEGRATION_TESTS_BIN_PATH: "${CI_PROJECT_DIR}/gkfs/install/share/gkfs/tests/integration" - INTEGRATION_TESTS_RUN_PATH: "${CI_PROJECT_DIR}/gkfs/install/share/gkfs/tests/integration/run" - TESTS_BUILD_PATH: "${CI_PROJECT_DIR}/test/build" - PYTEST: "${CI_PROJECT_DIR}/gkfs/install/share/gkfs/tests/integration/pytest-venv/bin/py.test" - LOG_PATH: "${CI_PROJECT_DIR}/logs" - LD_LIBRARY_PATH: "${CI_PROJECT_DIR}/deps/install/lib;${CI_PROJECT_DIR}/deps/install/lib64" + DEPS_SRC_PATH: "${CI_PROJECT_DIR}/deps/src" + DEPS_INSTALL_PATH: "${CI_PROJECT_DIR}/deps/install" + DEPS_COMMIT: "${CI_PROJECT_DIR}/deps/install/gkfs_deps_commit" + + BUILD_PATH: "${CI_PROJECT_DIR}/gkfs/build" + INSTALL_PATH: "${CI_PROJECT_DIR}/gkfs/install" + INTEGRATION_TESTS_BIN_PATH: "${CI_PROJECT_DIR}/gkfs/install/share/gkfs/tests/integration" + INTEGRATION_TESTS_RUN_PATH: "${CI_PROJECT_DIR}/gkfs/install/share/gkfs/tests/integration/run" + + BUILD_FWD_PATH: "${CI_PROJECT_DIR}/gkfwd/build" + INSTALL_FWD_PATH: "${CI_PROJECT_DIR}/gkfwd/install" + INTEGRATION_TESTS_FWD_BIN_PATH: "${CI_PROJECT_DIR}/gkfwd/install/share/gkfs/tests/integration" + INTEGRATION_TESTS_FWD_RUN_PATH: "${CI_PROJECT_DIR}/gkfwd/install/share/gkfs/tests/integration/run" + + TESTS_BUILD_PATH: "${CI_PROJECT_DIR}/test/gkfs/build" + PYTEST: "${CI_PROJECT_DIR}/gkfs/install/share/gkfs/tests/integration/pytest-venv/bin/py.test" + + TESTS_BUILD_FWD_PATH: "${CI_PROJECT_DIR}/test/gkfwd/build" + PYTEST_FWD: "${CI_PROJECT_DIR}/gkfwd/install/share/gkfs/tests/integration/pytest-venv/bin/py.test" + + LOG_PATH: "${CI_PROJECT_DIR}/logs" + + LD_LIBRARY_PATH: "${CI_PROJECT_DIR}/deps/install/lib;${CI_PROJECT_DIR}/deps/install/lib64" # Configuration variables - GKFS_LOG_LEVEL: "100" - GKFS_DAEMON_LOG_PATH: "${CI_PROJECT_DIR}/logs/daemon.log" - LIBGKFS_LOG: "all" - LIBGKFS_LOG_OUTPUT: "${CI_PROJECT_DIR}/logs/gkfs_client.log" - GIT_SUBMODULE_STRATEGY: recursive + GKFS_LOG_LEVEL: "100" + GKFS_DAEMON_LOG_PATH: "${CI_PROJECT_DIR}/logs/daemon.log" + LIBGKFS_LOG: "all" + LIBGKFS_LOG_OUTPUT: "${CI_PROJECT_DIR}/logs/gkfs_client.log" + GIT_SUBMODULE_STRATEGY: recursive -image: gekkofs/gekkofs:build_env-0.8.0 +# Temporary new image file +# image: gekkofs/gekkofs:build_env-0.8.0 +image: jeanbez/gekkofs-forwarding:latest compile dependencies: stage: build deps @@ -65,6 +81,29 @@ compile GekkoFS: paths: - ${INSTALL_PATH} +compile GekkoFWD: + stage: build + dependencies: + - "compile dependencies" + script: + - mkdir -p ${BUILD_FWD_PATH} && cd ${BUILD_FWD_PATH} + - cmake + -Wdev + -Wdeprecate + -DCMAKE_BUILD_TYPE=Debug + -DGKFS_BUILD_TESTS:BOOL=ON + -DGKFS_INSTALL_TESTS:BOOL=ON + -DRPC_PROTOCOL="ofi+sockets" + -DENABLE_FORWARDING:BOOL=ON \ + -DENABLE_AGIOS:BOOL=ON \ + -DCMAKE_PREFIX_PATH=${DEPS_INSTALL_PATH} + -DCMAKE_INSTALL_PREFIX=${INSTALL_FWD_PATH} + ${CI_PROJECT_DIR} + - make -j$(nproc) install + artifacts: + paths: + - ${INSTALL_FWD_PATH} + compile tests: stage: build dependencies: @@ -147,3 +186,85 @@ test lseek: when: on_failure paths: - "${LOG_PATH}" + +test wr: + stage: test forwarding + dependencies: + - "compile GekkoFWD" + script: + - mkdir -p "${LOG_PATH}" + - cp agios/* /tmp + - echo "`hostname` 0" > gkfs_forwarding.map + - ${INSTALL_FWD_PATH}/bin/gkfs_daemon --mount /tmp/mountdir --root /tmp/root & + - sleep 4 + - LD_PRELOAD=${INSTALL_FWD_PATH}/lib/libgkfs_intercept.so ${TESTS_BUILD_FWD_PATH}/gkfs_test_wr + artifacts: + when: on_failure + paths: + - "${LOG_PATH}" + +test directories: + stage: test forwarding + dependencies: + - "compile GekkoFWD" + script: + - mkdir -p "${LOG_PATH}" + - cp agios/* /tmp + - echo "`hostname` 0" > gkfs_forwarding.map + - ${INSTALL_FWD_PATH}/bin/gkfs_daemon --mount /tmp/mountdir --root /tmp/root & + - sleep 4 + - LD_PRELOAD=${INSTALL_FWD_PATH}/lib/libgkfs_intercept.so ${TESTS_BUILD_FWD_PATH}/gkfs_test_dir + artifacts: + when: on_failure + paths: + - "${LOG_PATH}" + +test truncate: + stage: test forwarding + dependencies: + - "compile GekkoFWD" + script: + - mkdir -p "${LOG_PATH}" + - cp agios/* /tmp + - echo "`hostname` 0" > gkfs_forwarding.map + - ${INSTALL_FWD_PATH}/bin/gkfs_daemon --mount /tmp/mountdir --root /tmp/root & + - sleep 4 + - LD_PRELOAD=${INSTALL_FWD_PATH}/lib/libgkfs_intercept.so ${TESTS_BUILD_FWD_PATH}/gkfs_test_truncate + artifacts: + when: on_failure + paths: + - "${LOG_PATH}" + +test path resolution: + stage: test forwarding + dependencies: + - "compile GekkoFWD" + script: + - mkdir -p "${LOG_PATH}" + - cp agios/* /tmp + - echo "`hostname` 0" > gkfs_forwarding.map + - ${INSTALL_FWD_PATH}/bin/gkfs_daemon --mount /tmp/mountdir --root /tmp/root & + - sleep 4 + - LD_PRELOAD=${INSTALL_FWD_PATH}/lib/libgkfs_intercept.so ${TESTS_BUILD_FWD_PATH}/gkfs_test_path_resolution + artifacts: + when: on_failure + paths: + - "${LOG_PATH}" + +test lseek: + stage: test forwarding + dependencies: + - "compile GekkoFWD" + script: + - mkdir -p "${LOG_PATH}" + - cp agios/* /tmp + - echo "`hostname` 0" > gkfs_forwarding.map + - ${INSTALL_FWD_PATH}/bin/gkfs_daemon --mount /tmp/mountdir --root /tmp/root & + - sleep 4 + - LD_PRELOAD=${INSTALL_FWD_PATH}/lib/libgkfs_intercept.so ${TESTS_BUILD_FWD_PATH}/gkfs_test_lseek + artifacts: + when: on_failure + paths: + - "${LOG_PATH}" + + diff --git a/CMake/FindAGIOS.cmake b/CMake/FindAGIOS.cmake new file mode 100644 index 0000000000000000000000000000000000000000..a11910d335d667da25d0abdb15b0de89cc83e241 --- /dev/null +++ b/CMake/FindAGIOS.cmake @@ -0,0 +1,18 @@ +find_path(AGIOS_INCLUDE_DIR + NAMES agios.h +) + +find_library(AGIOS_LIBRARY + NAMES agios +) + +set(AGIOS_INCLUDE_DIRS ${AGIOS_INCLUDE_DIR}) +set(AGIOS_LIBRARIES ${AGIOS_LIBRARY}) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(AGIOS DEFAULT_MSG AGIOS_LIBRARIES AGIOS_INCLUDE_DIRS) + +mark_as_advanced( + AGIOS_LIBRARY + AGIOS_INCLUDE_DIR +) \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e6d61290c9cd164b509ad2abb17b0d1b793380b..c515b50bd1c24ef5135e9d6c809b130c21f1430d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -84,6 +84,7 @@ find_package(Snappy REQUIRED) find_package(ZStd REQUIRED) find_package(JeMalloc) # required if rocksdb has been build with jemalloc find_package(RocksDB REQUIRED) +find_package(AGIOS REQUIRED) # margo dependencies find_package(Mercury REQUIRED) find_package(Abt REQUIRED) @@ -141,6 +142,17 @@ if(ENABLE_CLIENT_LOG) add_definitions(-DGKFS_ENABLE_LOGGING) endif() message(STATUS "[gekkofs] Client logging output: ${ENABLE_CLIENT_LOG}") +option(ENABLE_FORWARDING "Enable forwarding mode" OFF) +if(ENABLE_FORWARDING) + add_definitions(-DGKFS_ENABLE_FORWARDING) +endif() +message(STATUS "[gekkofs] Forwarding mode: ${ENABLE_FORWARDING}") + +option(ENABLE_AGIOS "Enable AGIOS scheduling library" OFF) +if(ENABLE_AGIOS) + add_definitions(-DGKFS_ENABLE_AGIOS) +endif() +message(STATUS "[gekkofs] AGIOS scheduling: ${ENABLE_AGIOS}") set(CLIENT_LOG_MESSAGE_SIZE 1024 CACHE STRING "Maximum size of a log message in the client library") add_definitions(-DLIBGKFS_LOG_MESSAGE_SIZE=${CLIENT_LOG_MESSAGE_SIZE}) diff --git a/agios/access_times.func b/agios/access_times.func new file mode 100644 index 0000000000000000000000000000000000000000..a6accf35569dd46591dea5fad94b27241e429e1a --- /dev/null +++ b/agios/access_times.func @@ -0,0 +1,11 @@ +2 +8 64 +566.8405 6784.471 +238.2483 1173.456 +145.6056 1018.143 +187.2437 -201.8793 +64 128 +571.2364 -740.0433 +234.6933 41.99119 +218.3765 -6295.179 +113.1743 2235.442 diff --git a/agios/agios.conf b/agios/agios.conf new file mode 100644 index 0000000000000000000000000000000000000000..e0fc38d0a854d12d19a73597e2eab05ac3dc9df5 --- /dev/null +++ b/agios/agios.conf @@ -0,0 +1,92 @@ +library_options: +{ #ATTENTION: with my most recent modification on statistics, the prediction module is NOT WORKING, DO NOT USE IT. + #should we generate trace files during execution? + trace = false ; + + #should we trace predicted requests? (for debug purposes, this trace will not be useful for future executions) + trace_predict = false ; + + #should we make a complete trace, with all scheduler's operations, such as waiting times? (for debug purposes) + trace_full = false ; + + #should the prediction module read trace files? (if false, the Prediction Module is useless) + predict_read_traces = false ; + + #should the prediction module try to predict aggregations? (requires predict_read_traces = true) + predict_request_aggregation = false ; + + #should the prediction module create simplified traces with information (the metrics) it obtained from the real traces? + predict_write_simplified_traces = false; + + #the tolerance for arrival times difference when checking if two predicted requests are the same (in %) + prediction_time_error = 10 + + #this parameter gives the frequency with which the prediction module will redo its predicted aggregations (in number of requests that must be processed between refreshs). This is necessary because these predictions use a factor that represents the ability to overlap waiting times with processing of other requests. At initialization, this factor will be calculated from the provided trace files, but during execution it can be recalculated using measurements for this ability during the actual scheduling. If the parameter is set to -1, aggregations will not be recalculated during execution. (in number of requests) + prediction_recalculate_alpha_period = -1 + + #prefix and sufix for trace files (with path). Their names must be trace_file_prefix+"."+number+"."+trace_file_sufix, with ordered numbers (no holes) + trace_file_prefix = "/tmp/agios_tracefile" + trace_file_sufix = "out" + #prefix for simple trace files (with path). Their names will be prefix+"."+number+"."+trace_file_sufix + simple_trace_prefix = "/tmp/agios_simpletracefile" + + #parameters used by aIOLi and MLF + waiting_time = 900000 + aioli_quantum = 65536 + mlf_quantum = 8192 + + #parameter used by TW (ms) + time_window_size = 1000 #the paper proposing TW recommends 1000 for HDD and 250 for SSD. + + #file (with path) with access times functions (generated by SeRRa - http://serratoool.bitbucket.org/). Used by aIOLi to quantum assignment and by the mechanism that automatically selects the best scheduling algorithm to use. If you are using a static algorithm which is not aIOLi, this does not matter, but you need to provide it anyway. In this case, you can use the one provided as example with the library source code + access_times_func_file = "/tmp/access_times.func" + + #to how many scheduling algorithms the performance module keeps measurements. When we are changing scheduling algorithms, we may observe new measurements (through the agios_release_request function) to the previous algorithms, so we could update information we have for them. It makes no sense to have a big value for performance_values if we don't change algorithms too often + performance_values = 5 + + #how many scheduling algorithms changes we register in the stats file? + proc_algs = 1000 + + #default I/O scheduling algorithm to use (the one to be used if the previous value was set to false) + #existing algorithms (case sensitive): "MLF", "aIOLi", "SJF", "TO", "TO-agg", "SRTF", "TW", "NOOP", "DYN_TREE", "ARMED_BANDIT" (case sensitive) + # SRTF it's experimental and uses information from trace files (don't use it if these are not available) + # NOOP is the NO operation scheduling algorithm, you should not observe performance improvements by using this one + # TW only makes sense if the user is providing AGIOS correct application id for each request. Don't use it otherwise + # DYN_TREE is a dynamic scheduling algorithm which selects the best (among MLF, AIOLI, SFJ, TO, TO-AGG, and NOOP) according to a decision tree. Use it only when using AGIOS to schedule requests to parallel file system' servers. + # ARMED_BANDIT is another dynamic algorithm which periodically selects between MLF, aIOLi, SJF, TO, TO-agg, and NOOP. It keeps performance measurements for these algorithms and gives more probability to the better ones. + default_algorithm = "TO"; + + # TWINS window size, in us + twins_window_duration = 1000; + dynamic_twins = false; + # time between window size adaptation, in ms + dynamic_twins_period = 1000; + + dynamic_twins_clients = 32; + dynamic_twins_processes = 128; + + # Only relevant if default_algorithm is a dynamic one. this parameter gives the frequency with which the automatic scheduling algorithm selection will recalculate the scheduling algorithm. This selection will be done using the access pattern from this period. If -1 is provided, then the selection will be done at the beginning of execution only (using information from traces for DYN_TREE). The next parameter gives the minimum number of requests which need to happen in this period for the selection to be done (otherwise we will wait longer before recalculating). (in msec). Notice that it makes no sense to use ARMED_BANDIT without setting period and min_reqnumber, as it needs to be iteractive. + select_algorithm_period = 1000 + + select_algorithm_min_reqnumber=1 + + #also for dynamic algorithms. says if TW should be one of the options considered by the dynamic scheduler. TW requires identifying requests according to the applications they come from. If this is not possible, don't use TW! + enable_TW = false ; + + #if default_algorithm is a dynamic algorithm, you need to indicate which static algorithm to use at first (before automatically selecting the next one). Only relevant if you are using ARMED_BANDIT + starting_algorithm = "SJF" ; + + # ARMED_BANDIT parameters - only relevant if default_algorithm is ARMED_BANDIT, but need to be provided anyway. min_ab_probability gives the minimum probability given to scheduling algorithms which performed poorly. The algorithm needs to maintain some probability to accomodate changes in the access pattern. validity_window is the period of time (in msec) for which performance measurements are still valid, after this time period we discard them (so we adapt to new situations). performance_window determines how many performance measurements the ARMED_BANDIT algorithms keeps for each scheduling algorithm option (taking the average of them). Keeping a huge window takes more memory. Moreover, it is related with validity_window and select_algorithm_period as if we discard measurements too often, we will never fill the whole window. + min_ab_probability = 3 + validity_window = 360000 + performance_window = 10 +}; +user_info: +{ + #stripe size used by the library's users (in bytes). This is used for detecting the access pattern at a parallel file system server. Useless for other uses. + stripe_size = 32768 ; + + #maximum buffer size used for storing trace parts (in KB). Having a buffer avoids generating requests to the local file system, which interfere in performance. On the other hand, having a large buffer can affect performance and decrease available space for data buffer. + max_trace_buffer_size = 32768 ; + +}; diff --git a/docker/build_env.docker b/docker/build_env.docker index 46094c08ef0c1975c51da3330dd7ace54671487d..ef83cbaa9c13f09950ec5b7698e83eacea95ef72 100644 --- a/docker/build_env.docker +++ b/docker/build_env.docker @@ -25,6 +25,7 @@ RUN yum -y -q update && yum -y -q install \ gcc \ gcc-c++ \ openssl-devel \ + libconfig-devel \ # Mercury dependencies libtool \ libtool-ltdl-devel \ @@ -34,6 +35,7 @@ RUN yum -y -q update && yum -y -q install \ snappy-devel \ libzstd-devel \ lz4-devel \ + bzip2 \ bzip2-devel \ # ada-fs requires C++ 14 devtoolset-8-gcc \ @@ -78,9 +80,11 @@ RUN yum -y -q update && yum -y -q install \ automake \ cmake \ cmake3 \ + sudo \ gcc \ gcc-c++ \ openssl-devel \ + libconfig-devel \ # Mercury dependencies libtool \ libtool-ltdl-devel \ @@ -90,6 +94,7 @@ RUN yum -y -q update && yum -y -q install \ snappy-devel \ libzstd-devel \ lz4-devel \ + bzip2 \ bzip2-devel \ # ada-fs requires C++ 14 devtoolset-8-gcc \ diff --git a/docker/debian_build_env.docker b/docker/debian_build_env.docker index b697b49e999a176ef05e1abae885f5e37d5c614b..743e10d22718d65cef27eb67a56e98dd71704750 100644 --- a/docker/debian_build_env.docker +++ b/docker/debian_build_env.docker @@ -19,6 +19,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ cmake \ gcc \ g++ \ + # AGIOS dependencies + libconfig-dev \ # Mercury dependencies libltdl-dev \ lbzip2 \ diff --git a/include/client/env.hpp b/include/client/env.hpp index e438ab7f4e84bd74eeb9e98266a7436c8df49a87..0555371f78994c07f0bb3277c176b2984b8a7d7c 100644 --- a/include/client/env.hpp +++ b/include/client/env.hpp @@ -33,6 +33,9 @@ static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); static constexpr auto LOG_OUTPUT_TRUNC = ADD_PREFIX("LOG_OUTPUT_TRUNC"); static constexpr auto CWD = ADD_PREFIX("CWD"); static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE"); +#ifdef GKFS_ENABLE_FORWARDING +static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE"); +#endif } // namespace env } // namespace gkfs diff --git a/include/client/gkfs_functions.hpp b/include/client/gkfs_functions.hpp index 7ae71ce99300cb5f32188b1b14445a6de368e330..674174c1ca6d9e11912e05065f346e756480dc70 100644 --- a/include/client/gkfs_functions.hpp +++ b/include/client/gkfs_functions.hpp @@ -76,6 +76,9 @@ ssize_t gkfs_pread_ws(int fd, void* buf, size_t count, off64_t offset); ssize_t gkfs_read(int fd, void* buf, size_t count); +ssize_t gkfs_readv(int fd, const struct iovec* iov, int iovcnt); + +ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset); int gkfs_opendir(const std::string& path); diff --git a/include/client/hooks.hpp b/include/client/hooks.hpp index a0aacee421d46c30ee1060d40f54dff6c341aabf..d23127bc46fa7f41e973dcdb62ea073b701cfab9 100644 --- a/include/client/hooks.hpp +++ b/include/client/hooks.hpp @@ -40,6 +40,11 @@ int hook_read(unsigned int fd, void* buf, size_t count); int hook_pread(unsigned int fd, char* buf, size_t count, loff_t pos); +int hook_readv(unsigned long fd, const struct iovec * iov, unsigned long iovcnt); + +int hook_preadv(unsigned long fd, const struct iovec * iov, unsigned long iovcnt, + unsigned long pos_l, unsigned long pos_h); + int hook_write(unsigned int fd, const char* buf, size_t count); int hook_pwrite(unsigned int fd, const char* buf, size_t count, loff_t pos); diff --git a/include/client/preload_context.hpp b/include/client/preload_context.hpp index 0eb43be26b87a1638050336b4c06bd592dada04b..616a92966c4c3b58558b6c53f5564e852046bff7 100644 --- a/include/client/preload_context.hpp +++ b/include/client/preload_context.hpp @@ -82,6 +82,7 @@ private: std::vector<hermes::endpoint> hosts_; uint64_t local_host_id_; + uint64_t fwd_host_id_; bool interception_enabled_; @@ -123,6 +124,10 @@ public: void local_host_id(uint64_t id); + uint64_t fwd_host_id() const; + + void fwd_host_id(uint64_t id); + RelativizeStatus relativize_fd_path(int dirfd, const char* raw_path, std::string& relative_path, diff --git a/include/client/preload_util.hpp b/include/client/preload_util.hpp index 37cf4fc51dd79984ac1be4bd15738df5409980d5..f2e000ddd717bbcef5e4eecc2e155766f45be927 100644 --- a/include/client/preload_util.hpp +++ b/include/client/preload_util.hpp @@ -62,6 +62,7 @@ int metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md std::vector<std::pair<std::string, std::string>> load_hostfile(const std::string& lfpath); void load_hosts(); +void load_forwarding_map(); } // namespace util } // namespace gkfs diff --git a/include/config.hpp b/include/config.hpp index 982eddcf42a7e068cca53ac40409c0b1a370acf0..83b5f4bc2ce42991464d770eeb70bba7eb45b9aa 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -24,6 +24,7 @@ namespace gkfs { namespace config { constexpr auto hostfile_path = "./gkfs_hosts.txt"; +constexpr auto forwarding_file_path = "./gkfs_forwarding.map"; namespace io { /* diff --git a/include/daemon/scheduler/agios.hpp b/include/daemon/scheduler/agios.hpp new file mode 100644 index 0000000000000000000000000000000000000000..9ff1104f16c8e36c41eb06569ca300b9707e8a89 --- /dev/null +++ b/include/daemon/scheduler/agios.hpp @@ -0,0 +1,15 @@ +#ifndef IFS_SCHEDULER_HPP +#define IFS_SCHEDULER_HPP + +#include <agios.h> + +void agios_initialize(); +void agios_shutdown(); + +void *agios_callback(int64_t request_id); +void *agios_callback_aggregated(int64_t *requests, int32_t total); +void *agios_eventual_callback(int64_t request_id, void *info); + +unsigned long long int generate_unique_id(); + +#endif \ No newline at end of file diff --git a/include/global/rpc/distributor.hpp b/include/global/rpc/distributor.hpp index 2a0b79ff3ed41783f34bc5e56b64cd42dc485acd..e2641d2989835959bb4720eaf16ee4072d87b7be 100644 --- a/include/global/rpc/distributor.hpp +++ b/include/global/rpc/distributor.hpp @@ -69,6 +69,23 @@ public: std::vector<host_t> locate_directory_metadata(const std::string& path) const override; }; +class ForwarderDistributor : public Distributor { +private: + host_t fwd_host_; + unsigned int hosts_size_; + std::hash<std::string> str_hash; +public: + ForwarderDistributor(host_t fwhost, unsigned int hosts_size); + + host_t localhost() const override; + + host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const override; + + host_t locate_file_metadata(const std::string& path) const override; + + std::vector<host_t> locate_directory_metadata(const std::string& path) const override; +}; + } // namespace rpc } // namespace gkfs diff --git a/scripts/compile_dep.sh b/scripts/compile_dep.sh index 1a37a58a7732a0ddb91eb83f484c5778a8f68b3b..4332685dedaf8153e8b8dec5bbfaa2e0597e4e21 100755 --- a/scripts/compile_dep.sh +++ b/scripts/compile_dep.sh @@ -13,21 +13,21 @@ VALID_DEP_OPTIONS="mogon2 mogon1 direct all" MOGON1_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" "verbs" + "syscall_intercept" "date" "verbs" "agios" ) MOGON2_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) DIRECT_DEPS=( - "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" + "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" ) ALL_DEPS=( "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) usage_short() { @@ -343,6 +343,16 @@ if check_dependency "ofi" "${DEP_CONFIG[@]}"; then fi fi +# AGIOS +if check_dependency "agios" "${DEP_CONFIG[@]}"; then + echo "############################################################ Installing: AGIOS" + CURR=${SOURCE}/agios + prepare_build_dir "${CURR}" + cd "${CURR}"/build + $CMAKE -DCMAKE_INSTALL_PREFIX="${INSTALL}" .. + make install +fi + # Mercury if check_dependency "mercury" "${DEP_CONFIG[@]}"; then @@ -418,7 +428,7 @@ if check_dependency "syscall_intercept" "${DEP_CONFIG[@]}"; then CURR=${SOURCE}/syscall_intercept prepare_build_dir "${CURR}" cd "${CURR}"/build - $CMAKE -DCMAKE_INSTALL_PREFIX="${INSTALL}" -DCMAKE_BUILD_TYPE:STRING=Debug -DBUILD_EXAMPLES:BOOL=OFF -DBUILD_TESTS:BOOK=OFF .. + $CMAKE -DCMAKE_PREFIX_PATH="${INSTALL}" -DCMAKE_INSTALL_PREFIX="${INSTALL}" -DCMAKE_BUILD_TYPE:STRING=Debug -DBUILD_EXAMPLES:BOOL=OFF -DBUILD_TESTS:BOOK=OFF .. make install fi diff --git a/scripts/dl_dep.sh b/scripts/dl_dep.sh index ca859984b2efde51095b93898413e223e349d970..60fe48420a3c62be7716e739d4e369adc7979ee4 100755 --- a/scripts/dl_dep.sh +++ b/scripts/dl_dep.sh @@ -13,21 +13,21 @@ VALID_DEP_OPTIONS="mogon2 mogon1 direct all" MOGON1_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi-verbs" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) MOGON2_DEPS=( "zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) DIRECT_DEPS=( - "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" + "ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date" "agios" ) ALL_DEPS=( "zstd" "lz4" "snappy" "capstone" "bmi" "ofi" "mercury" "argobots" "margo" "rocksdb" - "syscall_intercept" "date" + "syscall_intercept" "date" "agios" ) # Stop all backround jobs on interruption. @@ -360,6 +360,11 @@ if check_dependency "syscall_intercept" "${DEP_CONFIG[@]}"; then clonedeps "syscall_intercept" "https://github.com/pmem/syscall_intercept.git" "cc3412a2ad39f2e26cc307d5b155232811d7408e" "" "syscall_intercept.patch" & fi +# get AGIOS +if check_dependency "agios" "${DEP_CONFIG[@]}"; then +clonedeps "agios" "https://github.com/francielizanon/agios.git" "c26a6544200f823ebb8f890dd94e653d148bf226" "-b development" & +fi + # get date if check_dependency "date" "${DEP_CONFIG[@]}"; then clonedeps "date" "https://github.com/HowardHinnant/date.git" "e7e1482087f58913b80a20b04d5c58d9d6d90155" & diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 029304a7726f2474a81cd60fe492bdd854703dde..0e1037a2b83e3ef09eee94bf3e075be1586dada6 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -485,6 +485,49 @@ ssize_t gkfs_read(int fd, void* buf, size_t count) { return ret; } +ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) { + + auto file = CTX->file_map()->get(fd); + auto pos = offset; // keep truck of current position + ssize_t read = 0; + ssize_t ret; + for (int i = 0; i < iovcnt; ++i) { + auto count = (iov + i)->iov_len; + if (count == 0) { + continue; + } + auto buf = (iov + i)->iov_base; + ret = gkfs_pread(file, reinterpret_cast<char*>(buf), count, pos); + if (ret == -1) { + break; + } + read += ret; + pos += ret; + + if (static_cast<size_t>(ret) < count) { + break; + } + } + + if (read == 0) { + return -1; + } + return read; +} + +ssize_t gkfs_readv(int fd, const struct iovec* iov, int iovcnt) { + + auto gkfs_fd = CTX->file_map()->get(fd); + auto pos = gkfs_fd->pos(); // retrieve the current offset + auto ret = gkfs_preadv(fd, iov, iovcnt, pos); + assert(ret != 0); + if (ret < 0) { + return -1; + } + gkfs_fd->pos(pos + ret); + return ret; +} + ssize_t gkfs_pread_ws(int fd, void* buf, size_t count, off64_t offset) { auto gkfs_fd = CTX->file_map()->get(fd); return gkfs_pread(gkfs_fd, reinterpret_cast<char*>(buf), count, offset); diff --git a/src/client/hooks.cpp b/src/client/hooks.cpp index fa437d1ede9fb044896a706305642ab79fc21b48..6c2539a765e5e3e616ec312b3c6c96b647568130 100644 --- a/src/client/hooks.cpp +++ b/src/client/hooks.cpp @@ -176,6 +176,30 @@ int hook_pread(unsigned int fd, char* buf, size_t count, loff_t pos) { return syscall_no_intercept(SYS_pread64, fd, buf, count, pos); } +int hook_readv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt) { + + LOG(DEBUG, "{}() called with fd: {}, iov: {}, iovcnt: {}", + __func__, fd, fmt::ptr(iov), iovcnt); + + if (CTX->file_map()->exist(fd)) { + return with_errno(gkfs::syscall::gkfs_readv(fd, iov, iovcnt)); + } + return syscall_no_intercept(SYS_readv, fd, iov, iovcnt); +} + +int hook_preadv(unsigned long fd, const struct iovec* iov, unsigned long iovcnt, + unsigned long pos_l, unsigned long pos_h) { + + LOG(DEBUG, "{}() called with fd: {}, iov: {}, iovcnt: {}, " + "pos_l: {}," "pos_h: {}", + __func__, fd, fmt::ptr(iov), iovcnt, pos_l, pos_h); + + if (CTX->file_map()->exist(fd)) { + return with_errno(gkfs::syscall::gkfs_preadv(fd, iov, iovcnt, pos_l)); + } + return syscall_no_intercept(SYS_preadv, fd, iov, iovcnt, pos_l); +} + int hook_write(unsigned int fd, const char* buf, size_t count) { LOG(DEBUG, "{}() called with fd: {}, buf: {}, count {}", @@ -220,7 +244,7 @@ int hook_pwritev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt if (CTX->file_map()->exist(fd)) { return with_errno(gkfs::syscall::gkfs_pwritev(fd, iov, iovcnt, pos_l)); } - return syscall_no_intercept(SYS_pwritev, fd, iov, iovcnt); + return syscall_no_intercept(SYS_pwritev, fd, iov, iovcnt, pos_l); } int hook_unlinkat(int dirfd, const char* cpath, int flags) { @@ -760,4 +784,4 @@ int hook_fstatfs(unsigned int fd, struct statfs* buf) { } } // namespace hook -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/client/intercept.cpp b/src/client/intercept.cpp index 9905a0b5db5282a5e61b7e738aa7b50664bd8f3f..70434b4c12f00b58721119d75df3276bf0276ca6 100644 --- a/src/client/intercept.cpp +++ b/src/client/intercept.cpp @@ -496,6 +496,20 @@ int hook(long syscall_number, static_cast<loff_t>(arg3)); break; + case SYS_readv: + *result = gkfs::hook::hook_readv(static_cast<unsigned long>(arg0), + reinterpret_cast<const struct iovec*>(arg1), + static_cast<unsigned long>(arg2)); + break; + + case SYS_preadv: + *result = gkfs::hook::hook_preadv(static_cast<unsigned long>(arg0), + reinterpret_cast<const struct iovec*>(arg1), + static_cast<unsigned long>(arg2), + static_cast<unsigned long>(arg3), + static_cast<unsigned long>(arg4)); + break; + case SYS_pwrite64: *result = gkfs::hook::hook_pwrite(static_cast<unsigned int>(arg0), reinterpret_cast<const char*>(arg1), @@ -914,4 +928,4 @@ void stop_interception() { } } // namespace preload -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/client/preload.cpp b/src/client/preload.cpp index ad09cff3291be0ce05df43a4135e746eba5be6d9..e4374db766330d833080198f0afdd2e2b119d7f3 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -36,6 +36,8 @@ namespace { // make sure that things are only initialized once pthread_once_t init_env_thread = PTHREAD_ONCE_INIT; +pthread_t mapper; +bool forwarding_running; inline void exit_error_msg(int errcode, const string& msg) { @@ -98,9 +100,22 @@ void init_ld_environment_() { } /* Setup distributor */ + #ifdef GKFS_ENABLE_FORWARDING + try { + gkfs::util::load_forwarding_map(); + + LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id()); + } catch (std::exception& e){ + exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what())); + } + + auto forwarder_dist = std::make_shared<gkfs::rpc::ForwarderDistributor>(CTX->fwd_host_id(), CTX->hosts().size()); + CTX->distributor(forwarder_dist); + #else auto simple_hash_dist = std::make_shared<gkfs::rpc::SimpleHashDistributor>(CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(simple_hash_dist); + #endif LOG(INFO, "Retrieving file system configuration..."); @@ -111,6 +126,35 @@ void init_ld_environment_() { LOG(INFO, "Environment initialization successful."); } +#ifdef GKFS_ENABLE_FORWARDING +void *forwarding_mapper(void *p) { + while (forwarding_running) { + try { + gkfs::util::load_forwarding_map(); + + LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id()); + } catch (std::exception& e){ + exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what())); + } + + // Sleeps for 10 seconds + sleep(10); + } +} +#endif + +void init_forwarding_mapper() { + forwarding_running = true; + + pthread_create(&mapper, NULL, forwarding_mapper, NULL); +} + +void destroy_forwarding_mapper() { + forwarding_running = false; + + pthread_join(mapper, NULL); +} + void log_prog_name() { std::string line; std::ifstream cmdline("/proc/self/cmdline"); @@ -170,6 +214,8 @@ void init_preload() { CTX->unprotect_user_fds(); + init_forwarding_mapper(); + gkfs::preload::start_interception(); } @@ -177,6 +223,7 @@ void init_preload() { * Called last when preload library is used with the LD_PRELOAD environment variable */ void destroy_preload() { + destroy_forwarding_mapper(); CTX->clear_hosts(); LOG(DEBUG, "Peer information deleted"); diff --git a/src/client/preload_context.cpp b/src/client/preload_context.cpp index ce696e74e01b000bd8c017c7c4d6dcf95b56cb59..9b9f3eca678508206c9f5819adc909f15ffc0bfe 100644 --- a/src/client/preload_context.cpp +++ b/src/client/preload_context.cpp @@ -121,6 +121,13 @@ void PreloadContext::local_host_id(uint64_t id) { local_host_id_ = id; } +uint64_t PreloadContext::fwd_host_id() const { + return fwd_host_id_; +} + +void PreloadContext::fwd_host_id(uint64_t id) { + fwd_host_id_ = id; +} RelativizeStatus PreloadContext::relativize_fd_path(int dirfd, const char* raw_path, std::string& relative_path, @@ -360,4 +367,4 @@ PreloadContext::unprotect_user_fds() { } } // namespace preload -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 4db07f20e7eb71405b93bd22b3766ecc57986850..16d956e9f0f9a5b8c4b68cddf7879d4a28b61af4 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -176,6 +176,72 @@ vector<pair<string, string>> load_hostfile(const std::string& lfpath) { return hosts; } +#ifdef GKFS_ENABLE_FORWARDING +map<string, uint64_t> load_forwarding_map_file(const std::string& lfpath) { + + LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath); + + ifstream lf(lfpath); + if (!lf) { + throw runtime_error(fmt::format("Failed to open forwarding map file '{}': {}", + lfpath, strerror(errno))); + } + map<string, uint64_t> forwarding_map; + const regex line_re("^(\\S+)\\s+(\\S+)$", + regex::ECMAScript | regex::optimize); + string line; + string host; + uint64_t forwarder; + std::smatch match; + while (getline(lf, line)) { + if (!regex_match(line, match, line_re)) { + + LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", + lfpath, line); + + throw runtime_error( + fmt::format("unrecognized line format: '{}'", line)); + } + host = match[1]; + forwarder = std::stoi(match[2].str()); + forwarding_map[host] = forwarder; + } + return forwarding_map; +} +#endif + +#ifdef GKFS_ENABLE_FORWARDING +void load_forwarding_map() { + string forwarding_map_file; + + forwarding_map_file = gkfs::env::get_var(gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path); + + map<string, uint64_t> forwarding_map; + + while (forwarding_map.size() == 0) { + try { + forwarding_map = load_forwarding_map_file(forwarding_map_file); + } catch (const exception& e) { + auto emsg = fmt::format("Failed to load forwarding map file: {}", e.what()); + throw runtime_error(emsg); + } + } + + //if (forwarding_map.size() == 0) { + // throw runtime_error(fmt::format("Forwarding map file is empty: '{}'", forwarding_map_file)); + //} + + auto local_hostname = get_my_hostname(true); + + if (forwarding_map.find(local_hostname) == forwarding_map.end()) { + throw runtime_error(fmt::format("Unable to determine the forwarder for host: '{}'", local_hostname)); + } + LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname, forwarding_map[local_hostname]); + + CTX->fwd_host_id(forwarding_map[local_hostname]); +} +#endif + void load_hosts() { string hostfile; @@ -239,4 +305,4 @@ void load_hosts() { } } // namespace util -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index e4ba89177c10f43f57806913a89844e6f43347a1..4df524b9dbdc0e9c7dc1e616046a254e140043af 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -12,6 +12,7 @@ set(DAEMON_SRC handler/srv_metadata.cpp handler/srv_data.cpp handler/srv_management.cpp + scheduler/agios.cpp ) set(DAEMON_HEADERS ../../include/config.hpp @@ -23,6 +24,7 @@ set(DAEMON_HEADERS ../../include/global/path_util.hpp ../../include/daemon/daemon.hpp ../../include/daemon/util.hpp + ../../include/daemon/scheduler/agios.hpp ../../include/daemon/ops/metadentry.hpp ../../include/daemon/classes/fs_data.hpp ../../include/daemon/classes/rpc_data.hpp @@ -40,6 +42,7 @@ target_link_libraries(gkfs_daemon env_util spdlog fmt::fmt + ${AGIOS_LIBRARIES} # margo libs ${ABT_LIBRARIES} mercury @@ -53,6 +56,7 @@ target_link_libraries(gkfs_daemon target_include_directories(gkfs_daemon PRIVATE + ${AGIOS_INCLUDE_DIRS} ${ABT_INCLUDE_DIRS} ${MARGO_INCLUDE_DIRS} ) diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 5a709f0e385a6b3da6bd1c04dcff8b758e70e4e2..187aa292608e13ae287a4a4275d953e5d3b1f1e6 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -24,6 +24,9 @@ #include <daemon/backend/metadata/db.hpp> #include <daemon/backend/data/chunk_storage.hpp> #include <daemon/util.hpp> +#ifdef GKFS_ENABLE_AGIOS +#include <daemon/scheduler/agios.hpp> +#endif #include <boost/filesystem.hpp> #include <boost/program_options.hpp> @@ -209,6 +212,22 @@ void init_environment() { } GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready."); } +#ifdef GKFS_ENABLE_AGIOS +/** + * Initialize the AGIOS scheduling library + */ +void agios_initialize() { + char configuration[] = "/tmp/agios.conf"; + + if (!agios_init(NULL, NULL, configuration, 0)) { + GKFS_DATA->spdlogger()->error("{}() Failed to initialize AGIOS scheduler: '{}'", __func__, configuration); + + agios_exit(); + + throw; + } +} +#endif /** * Destroys the margo, argobots, and mercury environments diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 553a9eaa5391e2e7ff9235f9c069e94cc0e7eb3f..2c3226abbd2db39cc4328693883e5330b28e9be0 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -21,6 +21,13 @@ #include <global/rpc/distributor.hpp> #include <global/chunk_calc_util.hpp> +#ifdef GKFS_ENABLE_AGIOS +#include <daemon/scheduler/agios.hpp> + +#define AGIOS_READ 0 +#define AGIOS_WRITE 1 +#define AGIOS_SERVER_ID_IGNORE 0 +#endif using namespace std; @@ -142,6 +149,36 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto bulk_size = margo_bulk_get_size(in.bulk_handle); GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, in.path, bulk_size, in.offset); + #ifdef GKFS_ENABLE_AGIOS + int *data; + ABT_eventual eventual = ABT_EVENTUAL_NULL; + + /* creating eventual */ + ABT_eventual_create(sizeof(int64_t), &eventual); + + unsigned long long int request_id = generate_unique_id(); + char *agios_path = (char*) in.path; + + // We should call AGIOS before chunking (as that is an internal way to handle the requests) + if (!agios_add_request(agios_path, AGIOS_WRITE, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, agios_eventual_callback, eventual)) { + GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS", __func__); + } else { + GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS", __func__, request_id); + } + + /* Block until the eventual is signaled */ + ABT_eventual_wait(eventual, (void **)&data); + + unsigned long long int result = *data; + GKFS_DATA->spdlogger()->debug("{}() request {} was unblocked (offset = {})!", __func__, result, in.offset); + + ABT_eventual_free(&eventual); + + // Let AGIOS knows it can release the request, as it is completed + if (!agios_release_request(agios_path, AGIOS_WRITE, in.total_chunk_size, in.offset)) { + GKFS_DATA->spdlogger()->error("{}() Failed to release request from AGIOS", __func__); + } + #endif /* * 2. Set up buffers for pull bulk transfers */ @@ -199,8 +236,10 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { // Start to look for a chunk that hashes to this host with the first chunk in the buffer for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host + #ifndef GKFS_ENABLE_FORWARDING if (distributor.locate_data(in.path, chnk_id_file) != host_id) continue; + #endif chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { @@ -347,6 +386,36 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto bulk_size = margo_bulk_get_size(in.bulk_handle); GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, in.path, bulk_size, in.offset); + #ifdef GKFS_ENABLE_AGIOS + int *data; + ABT_eventual eventual = ABT_EVENTUAL_NULL; + + /* creating eventual */ + ABT_eventual_create(sizeof(int64_t), &eventual); + + unsigned long long int request_id = generate_unique_id(); + char *agios_path = (char*) in.path; + + // We should call AGIOS before chunking (as that is an internal way to handle the requests) + if (!agios_add_request(agios_path, AGIOS_READ, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, agios_eventual_callback, eventual)) { + GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS", __func__); + } else { + GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS", __func__, request_id); + } + + /* block until the eventual is signaled */ + ABT_eventual_wait(eventual, (void **)&data); + + unsigned long long int result = *data; + GKFS_DATA->spdlogger()->debug("{}() request {} was unblocked (offset = {})!", __func__, result, in.offset); + + ABT_eventual_free(&eventual); + + // let AGIOS knows it can release the request, as it is completed + if (!agios_release_request(agios_path, AGIOS_READ, in.total_chunk_size, in.offset)) { + GKFS_DATA->spdlogger()->error("{}() Failed to release request from AGIOS", __func__); + } + #endif /* * 2. Set up buffers for pull bulk transfers @@ -367,9 +436,11 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { GKFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } + #ifndef GKFS_ENABLE_FORWARDING auto const host_id = in.host_id; auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + #endif auto path = make_shared<string>(in.path); // chnk_ids used by this host @@ -397,8 +468,10 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { // Start to look for a chunk that hashes to this host with the first chunk in the buffer for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host + #ifndef GKFS_ENABLE_FORWARDING if (distributor.locate_data(in.path, chnk_id_file) != host_id) continue; + #endif chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { @@ -570,4 +643,13 @@ static hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat) +#ifdef GKFS_ENABLE_AGIOS +void *agios_eventual_callback(int64_t request_id, void *info) { + GKFS_DATA->spdlogger()->debug("{}() custom callback request {} is ready", __func__, request_id); + + ABT_eventual_set((ABT_eventual) info, &request_id, sizeof(int64_t)); + + return 0; +} +#endif diff --git a/src/daemon/scheduler/agios.cpp b/src/daemon/scheduler/agios.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2648c1c93e33e2bad2108b9f15ada9805df5bbbc --- /dev/null +++ b/src/daemon/scheduler/agios.cpp @@ -0,0 +1,10 @@ +#include <daemon/scheduler/agios.hpp> + +unsigned long long int generate_unique_id() { + // Calculates the hash of this request + timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + unsigned long long int id = ts.tv_sec*1000000000L + ts.tv_nsec; + + return id; +} \ No newline at end of file diff --git a/src/global/rpc/distributor.cpp b/src/global/rpc/distributor.cpp index ab059785302aec0a56fa224a5632c0d6ac066d44..cbb2f428f4947b41861b6dedf0e12429f7774ecd 100644 --- a/src/global/rpc/distributor.cpp +++ b/src/global/rpc/distributor.cpp @@ -68,5 +68,30 @@ locate_directory_metadata(const string& path) const { return {localhost_}; } +ForwarderDistributor:: +ForwarderDistributor(host_t fwhost, unsigned int hosts_size) : + fwd_host_(fwhost), + hosts_size_(hosts_size) +{} + +host_t ForwarderDistributor:: +localhost() const { + return fwd_host_; +} + +host_t ForwarderDistributor:: +locate_data(const std::string& path, const chunkid_t& chnk_id) const { + return fwd_host_; +} + +host_t ForwarderDistributor:: +locate_file_metadata(const std::string& path) const { + return str_hash(path) % hosts_size_; +} + +std::vector<host_t> ForwarderDistributor:: +locate_directory_metadata(const std::string& path) const { + return {fwd_host_}; +} } // namespace rpc -} // namespace gkfs \ No newline at end of file +} // namespace gkfs