From fa27234ca7171874dc063beb791401fb02106e53 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Sun, 11 Apr 2021 13:18:41 +0200 Subject: [PATCH 1/5] First implementation of staging API --- CMakeLists.txt | 3 + include/api/connection.hpp | 57 +++++ include/api/gekkofs.hpp | 36 +++ include/api/info.hpp | 89 ++++++++ include/api/staging.hpp | 47 ++++ src/api/CMakeLists.txt | 132 +++++++++++ src/api/connection.cpp | 424 ++++++++++++++++++++++++++++++++++++ src/api/info.cpp | 98 +++++++++ src/api/libgkfs_staging.cpp | 45 ++++ 9 files changed, 931 insertions(+) create mode 100644 include/api/connection.hpp create mode 100644 include/api/gekkofs.hpp create mode 100644 include/api/info.hpp create mode 100644 include/api/staging.hpp create mode 100644 src/api/CMakeLists.txt create mode 100644 src/api/connection.cpp create mode 100644 src/api/info.cpp create mode 100644 src/api/libgkfs_staging.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 41f0b6673..821d3bad3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -242,6 +242,9 @@ add_subdirectory(src/daemon) # Client library add_subdirectory(src/client) +# APIs +add_subdirectory(src/api) + option(GKFS_BUILD_TESTS "Build GekkoFS self tests" OFF) include(CMakeDependentOption) diff --git a/include/api/connection.hpp b/include/api/connection.hpp new file mode 100644 index 000000000..9ebcf723a --- /dev/null +++ b/include/api/connection.hpp @@ -0,0 +1,57 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#ifndef GEKKOFS_API_FS_HPP +#define GEKKOFS_API_FS_HPP + +#include + +namespace gkfs::api::fs { + +class connection { + +public: + connection(); + connection(const std::string& hostfile); + ~connection(); + + connection(connection&& rhs) noexcept = default; + connection& + operator=(connection&& rhs) noexcept = default; + connection(const connection& other) = delete; + connection& + operator=(const connection& rhs) = delete; + +private: + class impl; + std::unique_ptr pimpl_; +}; + +} // namespace gkfs::api::fs + +#endif // GEKKOFS_API_FS_HPP diff --git a/include/api/gekkofs.hpp b/include/api/gekkofs.hpp new file mode 100644 index 000000000..10f428129 --- /dev/null +++ b/include/api/gekkofs.hpp @@ -0,0 +1,36 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#ifndef GEKKOFS_API_HPP +#define GEKKOFS_API_HPP + +#include "info.hpp" +#include "connection.hpp" +#include "staging.hpp" + +#endif // GEKKOFS_API_HPP diff --git a/include/api/info.hpp b/include/api/info.hpp new file mode 100644 index 000000000..931084e1d --- /dev/null +++ b/include/api/info.hpp @@ -0,0 +1,89 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#ifndef GEKKOFS_INFO_HPP +#define GEKKOFS_INFO_HPP + +#include + +namespace gkfs::api::fs { + +// TODO: identical to preload_context.hpp FsConfig. Refactor? +struct config { + // configurable metadata + bool atime_state_; + bool mtime_state_; + bool ctime_state_; + bool link_cnt_state_; + bool blocks_state_; + + uid_t uid_; + gid_t gid_; + + std::string rootdir_; + std::string mountdir_; +}; + +class host { +public: + host(); + ~host(); + host(const std::string& hostname, const std::string& uri); + host(host&& rhs) noexcept; + host(const host& other); + + host& + operator=(host&& rhs) noexcept; + host& + operator=(const host& other); + + void + swap(host& other); + + std::string + hostname() const; + std::string + uri() const; + +private: + class impl; + std::unique_ptr pimpl_; +}; + +} // namespace gkfs::api::fs + +namespace gkfs::api::rpc { + +struct config { + std::string protocol_string_; + bool auto_sm_enabled_{false}; +}; + +} // namespace gkfs::api::rpc + +#endif // GEKKOFS_INFO_HPP diff --git a/include/api/staging.hpp b/include/api/staging.hpp new file mode 100644 index 000000000..e108ac829 --- /dev/null +++ b/include/api/staging.hpp @@ -0,0 +1,47 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#ifndef GEKKOFS_API_STAGING_HPP +#define GEKKOFS_API_STAGING_HPP + +#include + +namespace gkfs::api::staging { + +void +put(const std::filesystem::path& pathname, const void* buf, size_t count, + off_t offset); + +void +put(const gkfs::api::fs::connection& conn, + const std::filesystem::path& pathname, const void* buf, size_t count, + off_t offset); + +} // namespace gkfs::api::staging + +#endif // GEKKOFS_API_STAGING_HPP diff --git a/src/api/CMakeLists.txt b/src/api/CMakeLists.txt new file mode 100644 index 000000000..48b603efc --- /dev/null +++ b/src/api/CMakeLists.txt @@ -0,0 +1,132 @@ +################################################################################ +# Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS' POSIX interface. # +# # +# GekkoFS' POSIX interface is free software: you can redistribute it and/or # +# modify it under the terms of the GNU Lesser General Public License as # +# published by the Free Software Foundation, either version 3 of the License, # +# or (at your option) any later version. # +# # +# GekkoFS' POSIX interface is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU Lesser General Public License for more details. # +# # +# You should have received a copy of the GNU Lesser General Public License # +# along with GekkoFS' POSIX interface. If not, see # +# . # +# # +# SPDX-License-Identifier: LGPL-3.0-or-later # +################################################################################ + +set(PRELOAD_SRC + gkfs_functions.cpp + hooks.cpp + intercept.cpp + logging.cpp + open_file_map.cpp + open_dir.cpp + path.cpp + preload.cpp + preload_context.cpp + preload_util.cpp + ../global/path_util.cpp + ../global/rpc/rpc_util.cpp + rpc/rpc_types.cpp + rpc/forward_data.cpp + rpc/forward_management.cpp + rpc/forward_metadata.cpp + syscalls/detail/syscall_info.c + ) +set(PRELOAD_HEADERS + ../../include/client/gkfs_functions.hpp + ../../include/config.hpp + ../../include/client/env.hpp + ../../include/client/hooks.hpp + ../../include/client/intercept.hpp + ../../include/client/logging.hpp + ../../include/client/make_array.hpp + ../../include/client/open_file_map.hpp + ../../include/client/open_dir.hpp + ../../include/client/path.hpp + ../../include/client/preload.hpp + ../../include/client/preload_context.hpp + ../../include/client/preload_util.hpp + ../../include/client/rpc/rpc_types.hpp + ../../include/client/rpc/forward_management.hpp + ../../include/client/rpc/forward_metadata.hpp + ../../include/client/rpc/forward_data.hpp + ../../include/client/syscalls/args.hpp + ../../include/client/syscalls/decoder.hpp + ../../include/client/syscalls/errno.hpp + ../../include/client/syscalls/rets.hpp + ../../include/client/syscalls/syscall.hpp + ../../include/client/syscalls/detail/syscall_info.h + ../../include/global/cmake_configure.hpp + ../../include/global/global_defs.hpp + ../../include/global/path_util.hpp + ../../include/global/rpc/rpc_types.hpp + ../../include/global/rpc/rpc_util.hpp + ) +set(PRELOAD_LINK_LIBRARIES + # internal + metadata + distributor + env_util + # external + Syscall_intercept::Syscall_intercept + dl + mercury + hermes + fmt::fmt + Boost::boost + Threads::Threads + Date::TZ + ) +set(PRELOAD_INCLUDE_DIRS + ${ABT_INCLUDE_DIRS} + ${MARGO_INCLUDE_DIRS} + ) + +add_library(gkfs + SHARED + connection.cpp + info.cpp + libgkfs_staging.cpp + ${CMAKE_SOURCE_DIR}/src/global/rpc/rpc_util.cpp # TODO: create convenience target + ${CMAKE_SOURCE_DIR}/src/client/rpc/rpc_types.cpp # TODO: use own RPC types + ${CMAKE_SOURCE_DIR}/include/api/connection.hpp + ${CMAKE_SOURCE_DIR}/include/api/gekkofs.hpp + ${CMAKE_SOURCE_DIR}/include/api/info.hpp + ${CMAKE_SOURCE_DIR}/include/api/staging.hpp) + +target_link_libraries(gkfs + PRIVATE + distributor + env_util + PUBLIC + hermes + ) + +target_include_directories(gkfs + PRIVATE + ${CMAKE_SOURCE_DIR}/include/api/ + ${PRELOAD_INCLUDE_DIRS}) + +if(GKFS_ENABLE_CODE_COVERAGE) + target_code_coverage(gkfs AUTO) +endif() + + +install(TARGETS gkfs + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/gkfs) diff --git a/src/api/connection.cpp b/src/api/connection.cpp new file mode 100644 index 000000000..70f4cf4d6 --- /dev/null +++ b/src/api/connection.cpp @@ -0,0 +1,424 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#include +#include +#include +#include + +#include +#include + +#include +#include // TODO: rpc_types.hpp might need to move +#include // TODO: independent api/env.hpp + +#include +#include +#include + + +//////////////////////////////////////////////////////////////////////////////// +// helper types and functions // +//////////////////////////////////////////////////////////////////////////////// +namespace { + +using namespace gkfs::api; + +struct peer_list { + + using endpoint_type = hermes::endpoint; + + peer_list(uint64_t localhost_id, std::vector endpoints) + : localhost_id_(localhost_id), endpoints_(std::move(endpoints)) {} + + uint64_t + localhost_id() const { + return localhost_id_; + } + + hermes::endpoint + localhost() const { + return endpoints_.at(localhost_id_); + } + + std::size_t + size() const { + return endpoints_.size(); + } + + uint64_t localhost_id_ = 0; + std::vector endpoints_; +}; + +// TODO: mostly identical to load_hostfile() in preload_util.cpp (could not +// reuse due to CTX modification in extract_protocol(), which is called from +// load_hostfile()... Consider refactoring. +// std::tuple>> + +std::vector +load_hostfile(const std::string_view& hostfile) { + + // LOG(DEBUG, "Loading hosts_ file: \"{}\"", hostfile); + + std::ifstream lf(std::string{hostfile}); + if(!lf) { + throw std::runtime_error(fmt::format("{}", strerror(errno))); + } + // std::vector> hosts_; + std::vector hosts; + const std::regex line_re(R"(^(\S+)\s+(\S+)$)", + std::regex::ECMAScript | std::regex::optimize); + std::string line; + std::string host; + std::string uri; + std::smatch match; + while(getline(lf, line)) { + if(!regex_match(line, match, line_re)) { + + // LOG(ERROR, "Unrecognized line format: [hostfile: '{}', + // line: '{}']", + // hostfile, line); + + throw std::runtime_error( + fmt::format("invalid line: '{}'", line)); + } + host = match[1]; + uri = match[2]; + hosts.emplace_back(host, uri); + } + if(hosts.empty()) { + throw std::runtime_error( + "hostfile exists but no addresses could be extracted"); + } + + return hosts; +} + +std::vector +find_hosts(const std::string_view& hostfile) { + try { + return ::load_hostfile(hostfile); + } catch(const std::exception& ex) { + throw std::runtime_error(fmt::format("Error loading hostfile '{}': {}", + hostfile, ex.what())); + } +} + +rpc::config +fetch_rpc_config(const std::string& uri) { + + rpc::config rpc_info; + + if(uri.rfind("://") == std::string::npos) { + // invalid format. kill client + throw std::runtime_error( + fmt::format("Invalid format for URI: '{}'", uri)); + } + std::string protocol{}; + + if(uri.find(gkfs::rpc::protocol::ofi_sockets) != std::string::npos) { + protocol = gkfs::rpc::protocol::ofi_sockets; + } else if(uri.find(gkfs::rpc::protocol::ofi_psm2) != std::string::npos) { + protocol = gkfs::rpc::protocol::ofi_psm2; + } else if(uri.find(gkfs::rpc::protocol::ofi_verbs) != std::string::npos) { + protocol = gkfs::rpc::protocol::ofi_verbs; + } + // check for shared memory protocol. Can be plain shared memory or real ofi + // protocol + auto_sm + if(uri.find(gkfs::rpc::protocol::na_sm) != std::string::npos) { + if(protocol.empty()) + protocol = gkfs::rpc::protocol::na_sm; + else + rpc_info.auto_sm_enabled_ = true; + } + if(protocol.empty()) { + // unsupported protocol. kill client + throw std::runtime_error(fmt::format( + "Unsupported RPC protocol found in hosts_ file with URI: '{}'", + uri)); + } + // LOG(INFO, + // "RPC protocol '{}' extracted from hosts_ file. Using auto_sm is + // '{}'", protocol, CTX->auto_sm()); + rpc_info.protocol_string_ = protocol; + + return rpc_info; +} + + +/** + * Looks up a host endpoint via Hermes + * @param uri + * @param max_retries + * @return hermes endpoint, if successful + * @throws std::runtime_error + */ +// TODO: almost identical to lookup_endpoint() in preload_util.cpp, generalize +hermes::endpoint +lookup_endpoint(const std::unique_ptr& rpc_service, + const std::string_view& uri, std::size_t max_retries = 3) { + + // LOG(DEBUG, "Looking up address \"{}\"", uri); + + std::random_device rd; // obtain a random number from hardware + std::size_t attempts = 0; + std::string error_msg; + + do { + try { + return rpc_service->lookup(std::string{uri}); + } catch(const std::exception& ex) { + error_msg = ex.what(); + + // LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}] ", + // uri, attempts + 1, max_retries); + + // Wait a random amount of time and try again + std::mt19937 g(rd()); // seed the random generator + std::uniform_int_distribution<> distr( + 50, 50 * (attempts + 2)); // define the range + std::this_thread::sleep_for(std::chrono::milliseconds(distr(g))); + continue; + } + } while(++attempts < max_retries); + + throw std::runtime_error( + fmt::format("Endpoint for address '{}' could not be found ({})", + uri, error_msg)); +} + + +// TODO: mostly identical to init_hermes_client() in preload.cpp +// TODO: consider refactoring them +std::unique_ptr +create_rpc_service(bool use_auto_sm, const std::string_view& protocol_string) { + + try { + + hermes::engine_options opts{}; + + if(use_auto_sm) { + opts |= hermes::use_auto_sm; + } + + if(gkfs::rpc::protocol::ofi_psm2 == protocol_string) { + opts |= hermes::force_no_block_progress; + } + + return std::make_unique( + hermes::get_transport_type(std::string{protocol_string}), opts); + } catch(const std::exception& ex) { + throw std::runtime_error(fmt::format( + "Failed to initialize RPC subsystem: {}\n", ex.what())); + } +} + +std::unique_ptr +start_rpc_service(const rpc::config& rpc_config) { + auto rpc_service = create_rpc_service(rpc_config.auto_sm_enabled_, + rpc_config.protocol_string_); + rpc_service->run(); + return rpc_service; +} + +// TODO: mostly identical to connec_to_hosts() in rpc_util.cpp +peer_list +resolve(const std::unique_ptr& rpc_service, + const std::vector& hosts) { + + const auto localhost_name = gkfs::rpc::get_my_hostname(true); + uint64_t localhost_id = 0; + bool localhost_found = false; + + std::vector endpoints; + endpoints.resize(hosts.size()); + + std::vector host_ids(hosts.size()); + // populate vector with [0, ..., host_size - 1] + std::iota(std::begin(host_ids), std::end(host_ids), 0); + /* + * Shuffle hosts_ to balance addr lookups to all hosts_ + * Too many concurrent lookups send to same host + * could overwhelm the server, + * returning error when addr lookup + */ + std::random_device rd; // obtain a random number from hardware + std::mt19937 g(rd()); // seed the random generator + std::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts_ + // vector + // lookup addresses and put abstract server addresses into rpc_addresses + + for(const auto& id : host_ids) { + const auto& hostname = hosts.at(id).hostname(); + const auto& uri = hosts.at(id).uri(); + + endpoints[id] = ::lookup_endpoint(rpc_service, uri); + + if(!localhost_found && hostname == localhost_name) { + // LOG(DEBUG, "Found local host: {}", hostname); + localhost_id = id; + localhost_found = true; + } + + // LOG(DEBUG, "Found peer: {}", + // endpoints[id].to_string()); + } + + return peer_list{localhost_id, endpoints}; +} + +// TODO: mostly identical to gkfs::rpc::forward_get_fs_config() in +// TODO: forward_management.cpp, consider refactoring them +fs::config +fetch_fs_config(const std::unique_ptr& rpc_service, + const peer_list& peers) { + + const auto endp = peers.localhost(); + gkfs::rpc::fs_config::output out; + + try { + // LOG(DEBUG, "Retrieving file system configurations from + // daemon"); + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that + // we can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + out = rpc_service->post(endp).get().at(0); + } catch(const std::exception& ex) { + throw std::runtime_error( + "Error retrieving gekkofs_instance configurations from " + "daemon"); + } + + return {out.atime_state(), out.mtime_state(), + out.ctime_state(), out.link_cnt_state(), + out.blocks_state(), out.uid(), + out.gid(), out.rootdir(), + out.mountdir()}; +} + + +} // namespace + +namespace gkfs::api::fs { + +class connection::impl { + +public: + impl(const std::string_view& hostfile) + : hosts_(find_hosts(hostfile)), + rpc_config_(fetch_rpc_config(hosts_.at(0).uri())), + rpc_service_(start_rpc_service(rpc_config_)), + peers_(resolve(rpc_service_, hosts_)), + fs_config_(fetch_fs_config(rpc_service_, peers_)), + distributor_(std::make_unique( + peers_.localhost_id(), peers_.size())) { + + using namespace std::string_literals; + +#if 0 + // read host information + std::vector> hosts_{}; + + try { + // LOG(INFO, "Loading peer addresses..."); + std::tie(rpc_config_, hosts_) = ::find_hosts(); + } catch(const std::exception& e) { + throw std::runtime_error( + fmt::format("Failed to load hosts_ addresses:" + " {}", + e.what())); + } +#endif + + // rpc_service_ = create_rpc_service(); + // rpc_service_->run(); + +#if 0 + try { + ping(hosts_); + } catch(const std::exception& e) { + throw std::runtime_error( + fmt::format("Failed to connect to hosts_: {}\n", e.what())); + } +#endif + +#if 0 + distributor_ = std::make_unique( + local_host_id_, host_endpoints_.size()); +#endif + + // LOG(INFO, "Retrieving file system configuration..."); +#if 0 + try { + fs_config_ = fetch_fs_info(); + } catch(const std::exception& ex) { + throw std::runtime_error( + fmt::format("Unable to fetch file system " + "configurations from daemon process " + "through RPC: {}", + ex.what())); + } +#endif + + // LOG(INFO, "Environment initialization successful."); + } + + impl(const impl& other) = delete; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) = delete; + impl& + operator=(impl&& rhs) = default; + ~impl() = default; + +private: + //////////////////////////////////////////////////////////////////////////// + // members // + //////////////////////////////////////////////////////////////////////////// + std::vector hosts_; + rpc::config rpc_config_; + std::unique_ptr rpc_service_; + peer_list peers_; + std::vector host_endpoints_; + fs::config fs_config_; + std::unique_ptr distributor_; +}; + +connection::connection() + : pimpl_(new connection::impl(gkfs::env::get_var( + gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path))) {} + +connection::connection(const std::string& hostfile) + : pimpl_(new connection::impl(hostfile)) {} + +connection::~connection() = default; + +} // namespace gkfs::api::fs diff --git a/src/api/info.cpp b/src/api/info.cpp new file mode 100644 index 000000000..0534a2b8d --- /dev/null +++ b/src/api/info.cpp @@ -0,0 +1,98 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS' POSIX interface. + + GekkoFS' POSIX interface is free software: you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GekkoFS' POSIX interface is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with GekkoFS' POSIX interface. If not, see + . + + SPDX-License-Identifier: LGPL-3.0-or-later +*/ + +#include +#include +#include +#include +#include + +namespace gkfs::api::fs { + +class host::impl { + friend class host; + +public: + impl() = default; + impl(std::string hostname, std::string uri) + : hostname_(std::move(hostname)), uri_(std::move(uri)) {} + +private: + std::string hostname_; + std::string uri_; + hermes::endpoint endpoint_; +}; + +host::host() : pimpl_(new host::impl()) {} +host::host(const host& other) : pimpl_(new host::impl(*other.pimpl_)) {} + +host::host(const std::string& hostname, const std::string& uri) + : pimpl_(new host::impl(hostname, uri)) {} + +host::~host() = default; +host::host(host&&) noexcept = default; + +host& +host::operator=(host&& rhs) noexcept = default; + +host& +host::operator=(const host& other) { + host tmp(other); + this->swap(tmp); + return *this; +} + +void +host::swap(host& other) { + pimpl_.swap(other.pimpl_); +} + +std::string +host::hostname() const { + return pimpl_->hostname_; +} + +std::string +host::uri() const { + return pimpl_->uri_; +} + +} // namespace gkfs::api::fs + +namespace std { + +using gkfs::api::fs::host; + +template <> +void +swap(host& a, host& b) noexcept { + a.swap(b); +} + +} // namespace std \ No newline at end of file diff --git a/src/api/libgkfs_staging.cpp b/src/api/libgkfs_staging.cpp new file mode 100644 index 000000000..1d6031cd3 --- /dev/null +++ b/src/api/libgkfs_staging.cpp @@ -0,0 +1,45 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS' POSIX interface. + + GekkoFS' POSIX interface is free software: you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GekkoFS' POSIX interface is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with GekkoFS' POSIX interface. If not, see + . + + SPDX-License-Identifier: LGPL-3.0-or-later +*/ + +#include +#include +#include + +namespace gkfs::api { + +void +put(const std::filesystem::path& pathname, const void* buf, size_t count, + off_t offset) {} + +void +put(const gkfs::api::fs::connection& conn, + const std::filesystem::path& pathname, const void* buf, size_t count, + off_t offset) {} + +} // namespace gkfs::api \ No newline at end of file -- GitLab From d00e29e80c9699c95ede58880db03122252b3850 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Sun, 11 Apr 2021 18:37:00 +0200 Subject: [PATCH 2/5] Add API tests --- CMake/FindFilesystem.cmake | 247 +++++++++++++++++++++++++++++++++++++ CMakeLists.txt | 3 + src/api/connection.cpp | 3 +- tests/CMakeLists.txt | 2 + tests/config.hpp.in | 38 ++++++ tests/unit/CMakeLists.txt | 10 +- tests/unit/test_api.cpp | 218 ++++++++++++++++++++++++++++++++ 7 files changed, 518 insertions(+), 3 deletions(-) create mode 100644 CMake/FindFilesystem.cmake create mode 100644 tests/config.hpp.in create mode 100644 tests/unit/test_api.cpp diff --git a/CMake/FindFilesystem.cmake b/CMake/FindFilesystem.cmake new file mode 100644 index 000000000..a152e5229 --- /dev/null +++ b/CMake/FindFilesystem.cmake @@ -0,0 +1,247 @@ +# Distributed under the OSI-approved BSD 3-Clause License. See accompanying +# file Copyright.txt or https://cmake.org/licensing for details. + +#[=======================================================================[.rst: + +FindFilesystem +############## + +This module supports the C++17 standard library's filesystem utilities. Use the +:imp-target:`std::filesystem` imported target to + +Options +******* + +The ``COMPONENTS`` argument to this module supports the following values: + +.. find-component:: Experimental + :name: fs.Experimental + + Allows the module to find the "experimental" Filesystem TS version of the + Filesystem library. This is the library that should be used with the + ``std::experimental::filesystem`` namespace. + +.. find-component:: Final + :name: fs.Final + + Finds the final C++17 standard version of the filesystem library. + +If no components are provided, behaves as if the +:find-component:`fs.Final` component was specified. + +If both :find-component:`fs.Experimental` and :find-component:`fs.Final` are +provided, first looks for ``Final``, and falls back to ``Experimental`` in case +of failure. If ``Final`` is found, :imp-target:`std::filesystem` and all +:ref:`variables ` will refer to the ``Final`` version. + + +Imported Targets +**************** + +.. imp-target:: std::filesystem + + The ``std::filesystem`` imported target is defined when any requested + version of the C++ filesystem library has been found, whether it is + *Experimental* or *Final*. + + If no version of the filesystem library is available, this target will not + be defined. + + .. note:: + This target has ``cxx_std_17`` as an ``INTERFACE`` + :ref:`compile language standard feature `. Linking + to this target will automatically enable C++17 if no later standard + version is already required on the linking target. + + +.. _fs.variables: + +Variables +********* + +.. variable:: CXX_FILESYSTEM_IS_EXPERIMENTAL + + Set to ``TRUE`` when the :find-component:`fs.Experimental` version of C++ + filesystem library was found, otherwise ``FALSE``. + +.. variable:: CXX_FILESYSTEM_HAVE_FS + + Set to ``TRUE`` when a filesystem header was found. + +.. variable:: CXX_FILESYSTEM_HEADER + + Set to either ``filesystem`` or ``experimental/filesystem`` depending on + whether :find-component:`fs.Final` or :find-component:`fs.Experimental` was + found. + +.. variable:: CXX_FILESYSTEM_NAMESPACE + + Set to either ``std::filesystem`` or ``std::experimental::filesystem`` + depending on whether :find-component:`fs.Final` or + :find-component:`fs.Experimental` was found. + + +Examples +******** + +Using `find_package(Filesystem)` with no component arguments: + +.. code-block:: cmake + + find_package(Filesystem REQUIRED) + + add_executable(my-program main.cpp) + target_link_libraries(my-program PRIVATE std::filesystem) + + +#]=======================================================================] + + +if(TARGET std::filesystem) + # This module has already been processed. Don't do it again. + return() +endif() + +cmake_minimum_required(VERSION 3.10) + +include(CMakePushCheckState) +include(CheckIncludeFileCXX) + +# If we're not cross-compiling, try to run test executables. +# Otherwise, assume that compile + link is a sufficient check. +if(CMAKE_CROSSCOMPILING) + include(CheckCXXSourceCompiles) + macro(_cmcm_check_cxx_source code var) + check_cxx_source_compiles("${code}" ${var}) + endmacro() +else() + include(CheckCXXSourceRuns) + macro(_cmcm_check_cxx_source code var) + check_cxx_source_runs("${code}" ${var}) + endmacro() +endif() + +cmake_push_check_state() + +set(CMAKE_REQUIRED_QUIET ${Filesystem_FIND_QUIETLY}) + +# All of our tests required C++17 or later +set(CMAKE_CXX_STANDARD 17) + +# Normalize and check the component list we were given +set(want_components ${Filesystem_FIND_COMPONENTS}) +if(Filesystem_FIND_COMPONENTS STREQUAL "") + set(want_components Final) +endif() + +# Warn on any unrecognized components +set(extra_components ${want_components}) +list(REMOVE_ITEM extra_components Final Experimental) +foreach(component IN LISTS extra_components) + message(WARNING "Extraneous find_package component for Filesystem: ${component}") +endforeach() + +# Detect which of Experimental and Final we should look for +set(find_experimental TRUE) +set(find_final TRUE) +if(NOT "Final" IN_LIST want_components) + set(find_final FALSE) +endif() +if(NOT "Experimental" IN_LIST want_components) + set(find_experimental FALSE) +endif() + +if(find_final) + check_include_file_cxx("filesystem" _CXX_FILESYSTEM_HAVE_HEADER) + mark_as_advanced(_CXX_FILESYSTEM_HAVE_HEADER) + if(_CXX_FILESYSTEM_HAVE_HEADER) + # We found the non-experimental header. Don't bother looking for the + # experimental one. + set(find_experimental FALSE) + endif() +else() + set(_CXX_FILESYSTEM_HAVE_HEADER FALSE) +endif() + +if(find_experimental) + check_include_file_cxx("experimental/filesystem" _CXX_FILESYSTEM_HAVE_EXPERIMENTAL_HEADER) + mark_as_advanced(_CXX_FILESYSTEM_HAVE_EXPERIMENTAL_HEADER) +else() + set(_CXX_FILESYSTEM_HAVE_EXPERIMENTAL_HEADER FALSE) +endif() + +if(_CXX_FILESYSTEM_HAVE_HEADER) + set(_have_fs TRUE) + set(_fs_header filesystem) + set(_fs_namespace std::filesystem) + set(_is_experimental FALSE) +elseif(_CXX_FILESYSTEM_HAVE_EXPERIMENTAL_HEADER) + set(_have_fs TRUE) + set(_fs_header experimental/filesystem) + set(_fs_namespace std::experimental::filesystem) + set(_is_experimental TRUE) +else() + set(_have_fs FALSE) +endif() + +set(CXX_FILESYSTEM_HAVE_FS ${_have_fs} CACHE BOOL "TRUE if we have the C++ filesystem headers") +set(CXX_FILESYSTEM_HEADER ${_fs_header} CACHE STRING "The header that should be included to obtain the filesystem APIs") +set(CXX_FILESYSTEM_NAMESPACE ${_fs_namespace} CACHE STRING "The C++ namespace that contains the filesystem APIs") +set(CXX_FILESYSTEM_IS_EXPERIMENTAL ${_is_experimental} CACHE BOOL "TRUE if the C++ filesystem library is the experimental version") + +set(_found FALSE) + +if(CXX_FILESYSTEM_HAVE_FS) + # We have some filesystem library available. Do link checks + string(CONFIGURE [[ + #include + #include <@CXX_FILESYSTEM_HEADER@> + + int main() { + auto cwd = @CXX_FILESYSTEM_NAMESPACE@::current_path(); + printf("%s", cwd.c_str()); + return EXIT_SUCCESS; + } + ]] code @ONLY) + + # Check a simple filesystem program without any linker flags + _cmcm_check_cxx_source("${code}" CXX_FILESYSTEM_NO_LINK_NEEDED) + + set(can_link ${CXX_FILESYSTEM_NO_LINK_NEEDED}) + + if(NOT CXX_FILESYSTEM_NO_LINK_NEEDED) + set(prev_libraries ${CMAKE_REQUIRED_LIBRARIES}) + # Add the libstdc++ flag + set(CMAKE_REQUIRED_LIBRARIES ${prev_libraries} -lstdc++fs) + _cmcm_check_cxx_source("${code}" CXX_FILESYSTEM_STDCPPFS_NEEDED) + set(can_link ${CXX_FILESYSTEM_STDCPPFS_NEEDED}) + if(NOT CXX_FILESYSTEM_STDCPPFS_NEEDED) + # Try the libc++ flag + set(CMAKE_REQUIRED_LIBRARIES ${prev_libraries} -lc++fs) + _cmcm_check_cxx_source("${code}" CXX_FILESYSTEM_CPPFS_NEEDED) + set(can_link ${CXX_FILESYSTEM_CPPFS_NEEDED}) + endif() + endif() + + if(can_link) + add_library(std::filesystem INTERFACE IMPORTED) + set_property(TARGET std::filesystem APPEND PROPERTY INTERFACE_COMPILE_FEATURES cxx_std_17) + set(_found TRUE) + + if(CXX_FILESYSTEM_NO_LINK_NEEDED) + # Nothing to add... + elseif(CXX_FILESYSTEM_STDCPPFS_NEEDED) + set_property(TARGET std::filesystem APPEND PROPERTY INTERFACE_LINK_LIBRARIES -lstdc++fs) + elseif(CXX_FILESYSTEM_CPPFS_NEEDED) + set_property(TARGET std::filesystem APPEND PROPERTY INTERFACE_LINK_LIBRARIES -lc++fs) + endif() + endif() +endif() + +cmake_pop_check_state() + +set(Filesystem_FOUND ${_found} CACHE BOOL "TRUE if we can run a program using std::filesystem" FORCE) + +if(Filesystem_FIND_REQUIRED AND NOT Filesystem_FOUND) + message(FATAL_ERROR "Cannot run simple program using std::filesystem") +endif() diff --git a/CMakeLists.txt b/CMakeLists.txt index 821d3bad3..fb1313d67 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -132,9 +132,12 @@ find_package(Syscall_intercept REQUIRED) find_package(Boost 1.53 REQUIRED COMPONENTS program_options + # required by Boost::process used in unit tests for APIs + filesystem ) find_package(Threads REQUIRED) +find_package(Filesystem REQUIRED) find_package(Date REQUIRED) diff --git a/src/api/connection.cpp b/src/api/connection.cpp index 70f4cf4d6..47d3b0e00 100644 --- a/src/api/connection.cpp +++ b/src/api/connection.cpp @@ -105,8 +105,7 @@ load_hostfile(const std::string_view& hostfile) { // line: '{}']", // hostfile, line); - throw std::runtime_error( - fmt::format("invalid line: '{}'", line)); + throw std::runtime_error(fmt::format("invalid line: '{}'", line)); } host = match[1]; uri = match[2]; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index fd3ca201e..8b9ee0888 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -35,6 +35,8 @@ add_custom_target(check --output-on-failure ) +configure_file(config.hpp.in config.hpp @ONLY) + # integration tests add_subdirectory(integration) diff --git a/tests/config.hpp.in b/tests/config.hpp.in new file mode 100644 index 000000000..4ff6fc31b --- /dev/null +++ b/tests/config.hpp.in @@ -0,0 +1,38 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#ifndef GKFS_TESTS_CONFIG_HPP +#define GKFS_TESTS_CONFIG_HPP + +namespace cmake { + +constexpr auto binary_dir = "@CMAKE_BINARY_DIR@"; + +} // namespace cmake + +#endif // GKFS_TESTS_CONFIG_HPP diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 0c9de75b9..5e904547b 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -57,18 +57,26 @@ target_link_libraries(catch2_main Catch2::Catch2 ) -# define executables for tests and make them depend on the convenience +# define executables for tests and make them depend on the convenience # library (and Catch2 transitively) and fmt add_executable(tests test_example_00.cpp test_example_01.cpp test_utils_arithmetic.cpp + test_api.cpp ) +target_include_directories(tests + PRIVATE + ${CMAKE_BINARY_DIR}) + target_link_libraries(tests catch2_main fmt::fmt + std::filesystem arithmetic + gkfs + Boost::filesystem ) # Catch2's contrib folder includes some helper functions diff --git a/tests/unit/test_api.cpp b/tests/unit/test_api.cpp new file mode 100644 index 000000000..10c7e522e --- /dev/null +++ b/tests/unit/test_api.cpp @@ -0,0 +1,218 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#include +#include + +#include +#include +#include +#include +#include // uuid class +#include // generators +#include // streaming operators etc. + +#include + +using namespace std::literals; +namespace bp = boost::process; +namespace boost_fs = boost::filesystem; +namespace std_fs = std::filesystem; + + +// A temporary file with RAII removal +struct temporary_file { + + explicit temporary_file(std_fs::path filename) + : filename_(std::move(filename)), ofs_(filename_) {} + + temporary_file(std_fs::path filename, const std::string_view& text) + : filename_(std::move(filename)), ofs_(filename_) { + write(text); + } + + ~temporary_file() { + ofs_.close(); + std_fs::remove(filename_); + } + + void + write(const std::string_view& text) { + ofs_ << text; + ofs_.flush(); + } + + std_fs::path + filename() const { + return filename_; + } + + std_fs::path filename_; + std::ofstream ofs_; +}; + +// A temporary daemon with RAII shutdown and environment removal +struct test_daemon { + + static std::tuple + generate_environment() { + + static boost::uuids::random_generator generator; + boost::uuids::uuid uuid = generator(); + boost::random::mt19937 gen; + boost::random::uniform_int_distribution<> dist(1025, 65535); + + const auto basedir = to_string(uuid); + const long port = dist(gen); + + CAPTURE(basedir, port); + return {basedir, port}; + } + + test_daemon(const std_fs::path& rootdir, const std_fs::path& mountdir, + const std::string& interface) + : abs_rootdir(std_fs::absolute(std_fs::current_path() / rootdir)), + abs_mountdir(std_fs::absolute(std_fs::current_path() / mountdir)) { + + auto env = boost::this_process::environment(); + env["GKFS_LOG_LEVEL"] = "100"; + env["GKFS_DAEMON_LOG_PATH"] = "/dev/stderr"; + + if(!std_fs::exists(abs_rootdir)) { + std_fs::create_directories(abs_rootdir); + } + + if(!std_fs::exists(abs_mountdir)) { + std_fs::create_directories(abs_mountdir); + } + + auto search_paths = boost::this_process::path(); + search_paths.emplace_back(boost_fs::path{cmake::binary_dir} / + boost_fs::path{"src/daemon"}); + + daemon_ = bp::child(bp::search_path("gkfs_daemon", search_paths), + bp::args({"--rootdir"s, abs_rootdir, "--mountdir"s, + abs_mountdir, "--listen"s, interface})); + + // since we don't have a way to check whether the daemon has finished + // its startup process, wait for a 100ms, otherwise any connections + // to it will fail + std::this_thread::sleep_for(500ms); + } + + ~test_daemon() { + ::kill(daemon_.native_handle(), SIGTERM); + + if(!daemon_.wait_for(5s)) { + daemon_.terminate(); + } + + std_fs::remove_all(abs_rootdir); + std_fs::remove_all(abs_mountdir); + } + + bp::child daemon_; + std_fs::path abs_rootdir; + std_fs::path abs_mountdir; +}; + +SCENARIO(" API connections to gkfs work as expected ", "[api][connection]") { + + // TODO: tests for connecting non-existing daemons are missing. The reason + // TODO: is that since we don't have timeouts yet in Hermes, if the + // TODO: daemon is missing the connection RPC hangs forever while waiting + // TODO: for a reply that will never arrive + + using namespace std::string_literals; + using Catch::Matchers::Contains; + using namespace gkfs::api; + + GIVEN(" a fs::connection ") { + + WHEN(" hostfile does not exist ") { + + THEN(" an exception is thrown ") { + REQUIRE_THROWS_WITH(fs::connection(), + Contains("No such file or directory"s)); + } + } + + WHEN(" hostfile is empty ") { + + temporary_file tmp{"empty.hostfile"}; + + THEN(" an exception is thrown ") { + REQUIRE_THROWS_WITH( + fs::connection(tmp.filename()), + Contains("hostfile exists but no addresses could be " + "extracted"s)); + } + } + + WHEN(" hostfile is invalid ") { + + temporary_file tmp{"invalid.hostfile", + "foobarbaz\nbarfoobaz\nbazbarfoo\n"}; + + THEN(" an exception is thrown ") { + REQUIRE_THROWS_WITH(fs::connection(tmp.filename()), + Contains("invalid line: 'foobarbaz'"s)); + } + } + + WHEN(" hostfile is partially invalid ") { + + temporary_file tmp{"pinvalid.hostfile", + "host.example.com " + "ofi+sockets://127.0.0.1:11586\n" + "foobarbaz\nbarfoobaz\nbazbarfoo\n"}; + + THEN(" an exception is thrown ") { + REQUIRE_THROWS_WITH(fs::connection(tmp.filename()), + Contains("invalid line: 'foobarbaz'"s)); + } + } + + WHEN(" hostfile is valid ") { + + const auto [testdir, port] = test_daemon::generate_environment(); + + test_daemon daemon(testdir / "rootdir", testdir / "mountdir", + fmt::format("lo:{}", port)); + + temporary_file tmp{ + "valid.hostfile", + fmt::format("localhost ofi+sockets://127.0.0.1:{}\n", + port)}; + + THEN(" a connection instance is constructed ") { + REQUIRE_NOTHROW(fs::connection(tmp.filename())); + } + } + } +} -- GitLab From 0d1b7ff58894f5efe3ba0816d2bb19db4540a785 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Sat, 17 Apr 2021 11:51:22 +0200 Subject: [PATCH 3/5] First implementation of put() in gkfs::api::staging --- include/api/connection.hpp | 13 +- include/api/detail/connection_impl.hpp | 144 +++++ include/api/env.hpp | 58 ++ include/api/staging.hpp | 50 +- include/client/env.hpp | 10 +- include/config.hpp | 1 + src/api/CMakeLists.txt | 8 +- src/api/connection.cpp | 390 +----------- src/api/detail/connection_impl.cpp | 811 +++++++++++++++++++++++++ src/api/libgkfs_staging.cpp | 54 +- 10 files changed, 1149 insertions(+), 390 deletions(-) create mode 100644 include/api/detail/connection_impl.hpp create mode 100644 include/api/env.hpp create mode 100644 src/api/detail/connection_impl.cpp diff --git a/include/api/connection.hpp b/include/api/connection.hpp index 9ebcf723a..1c5457bfa 100644 --- a/include/api/connection.hpp +++ b/include/api/connection.hpp @@ -26,10 +26,11 @@ SPDX-License-Identifier: GPL-3.0-or-later */ -#ifndef GEKKOFS_API_FS_HPP -#define GEKKOFS_API_FS_HPP +#ifndef GEKKOFS_API_CONNECTION_HPP +#define GEKKOFS_API_CONNECTION_HPP #include +#include namespace gkfs::api::fs { @@ -37,7 +38,7 @@ class connection { public: connection(); - connection(const std::string& hostfile); + explicit connection(const std::string& hostfile); ~connection(); connection(connection&& rhs) noexcept = default; @@ -47,6 +48,10 @@ public: connection& operator=(const connection& rhs) = delete; + void + put(const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename); + private: class impl; std::unique_ptr pimpl_; @@ -54,4 +59,4 @@ private: } // namespace gkfs::api::fs -#endif // GEKKOFS_API_FS_HPP +#endif // GEKKOFS_API_CONNECTION_HPP \ No newline at end of file diff --git a/include/api/detail/connection_impl.hpp b/include/api/detail/connection_impl.hpp new file mode 100644 index 000000000..067f60b09 --- /dev/null +++ b/include/api/detail/connection_impl.hpp @@ -0,0 +1,144 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#ifndef GEKKOFS_API_DETAIL_CONNECTION_IMPL_HPP +#define GEKKOFS_API_DETAIL_CONNECTION_IMPL_HPP + +#include +#include +#include + +// forward declarations +namespace gkfs::metadata { +class Metadata; +} + +namespace gkfs::rpc { +class Distributor; +} + +namespace gkfs::api::fs { + +namespace detail { + +struct peer_list { + + using endpoint_type = hermes::endpoint; + + peer_list(uint64_t localhost_id, std::vector endpoints) + : localhost_id_(localhost_id), endpoints_(std::move(endpoints)) {} + + uint64_t + localhost_id() const { + return localhost_id_; + } + + hermes::endpoint + localhost() const { + return endpoints_.at(localhost_id_); + } + + std::size_t + size() const { + return endpoints_.size(); + } + + hermes::endpoint + at(std::size_t n) const { + return endpoints_.at(n); + } + + uint64_t localhost_id_ = 0; + std::vector endpoints_; +}; + +} // namespace detail + +class connection::impl { + +public: + explicit impl(const std::string_view& hostfile); + impl(const impl& other) = delete; + impl(impl&& rhs) noexcept; + impl& + operator=(const impl& other) = delete; + impl& + operator=(impl&& rhs) noexcept; + ~impl(); + + void + put(const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename) const; + + std::string + stat(const std::string& path) const; + + void + create(const std::string& gkfs_filename, mode_t mode) const; + + ssize_t + pwrite(const std::string& gkfs_filename, const void* buffer, size_t size, + off_t offset) const; + +private: + void + check_parent_directory(const std::string& path) const; + + std::optional + fetch_metadata(const std::string& path, bool follow_links = true) const; + + int + forward_create_rpc(const std::string& gkfs_filename, + const mode_t mode) const; + + std::tuple + forward_update_metadentry_size_rpc(const std::string& gkfs_filename, + size_t size, off_t offset, + bool is_append) const; + + std::tuple + forward_write_rpc(const std::string& gkfs_filename, const void* buffer, + off_t offset, size_t write_size, size_t updated_metadentry_size, + bool is_append) const; + +private: + //////////////////////////////////////////////////////////////////////////// + // members // + //////////////////////////////////////////////////////////////////////////// + std::vector hosts_; + rpc::config rpc_config_; + std::unique_ptr rpc_service_; + detail::peer_list peers_; + std::vector host_endpoints_; + fs::config fs_config_; + std::unique_ptr distributor_; +}; + +} // namespace gkfs::api::fs + +#endif // GEKKOFS_API_DETAIL_CONNECTION_IMPL_HPP \ No newline at end of file diff --git a/include/api/env.hpp b/include/api/env.hpp new file mode 100644 index 000000000..5f3d02861 --- /dev/null +++ b/include/api/env.hpp @@ -0,0 +1,58 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS' POSIX interface. + + GekkoFS' POSIX interface is free software: you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public License as + published by the Free Software Foundation, either version 3 of the License, + or (at your option) any later version. + + GekkoFS' POSIX interface is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with GekkoFS' POSIX interface. If not, see + . + + SPDX-License-Identifier: LGPL-3.0-or-later +*/ + +#ifndef GKFS_API_ENV +#define GKFS_API_ENV + +#include + +#define ADD_PREFIX(str) API_ENV_PREFIX str + +/* Environment variables for the GekkoFS client */ +namespace gkfs::env { + +[[maybe_unused]] static constexpr auto LOG = ADD_PREFIX("LOG"); + +#ifdef GKFS_DEBUG_BUILD +[[maybe_unused]] static constexpr auto LOG_DEBUG_VERBOSITY = ADD_PREFIX("LOG_DEBUG_VERBOSITY"); +#endif + +[[maybe_unused]] static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); +[[maybe_unused]] static constexpr auto LOG_OUTPUT_TRUNC = ADD_PREFIX("LOG_OUTPUT_TRUNC"); +[[maybe_unused]] static constexpr auto CWD = ADD_PREFIX("CWD"); +static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE"); +#ifdef GKFS_ENABLE_FORWARDING +static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE"); +#endif + +} // namespace gkfs::env + +#undef ADD_PREFIX + +#endif // GKFS_API_ENV \ No newline at end of file diff --git a/include/api/staging.hpp b/include/api/staging.hpp index e108ac829..2f074d103 100644 --- a/include/api/staging.hpp +++ b/include/api/staging.hpp @@ -33,8 +33,56 @@ namespace gkfs::api::staging { +/** + * Transfer data into a GekkoFS file. + * + * This function transfers data from the provided buffer into a target + * file in GekkoFS in one operation. If the file does not exist, it will be + * created. This function creates an internal connection to the filesystem and + * destroys it after the operation completes. + * + * @param buffer The buffer with the data contents + * @param size The amount of data to transfer + * @param gkfs_pathname The target filename inside GekkoFS + * @param offset The offset in the file where the data will be placed. + */ void -put(const std::filesystem::path& pathname, const void* buf, size_t count, +put(const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename, off_t offset); + +/** + * Transfer data into a GekkoFS file. + * + * This function transfers data from the provided buffer into a target + * file in GekkoFS in one operation. If the file does not exist, it will be + * created. The function is capable of reusing an existing connection + * to the filesystem, which might be beneficial for performance reasons, but + * leaves the responsibility of creating and destroying the connection to the + * user. + * + * @param conn An existing connection for GekkoFS + * @param buffer The buffer with the data contents + * @param size The amount of data to transfer + * @param gkfs_pathname The target filename inside GekkoFS + */ +void +put(const gkfs::api::fs::connection& conn, const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename); + +/** + * Transfer data into a GekkoFS file. + * + * This function creates an internal connection to a running GekkoFS + * filesystem and transfers data from the provided buffer into a target + * file. + * + * @param buffer The buffer with the data contents + * @param size The amount of data to write + * @param gkfs_pathname The target filename inside GekkoFS + * @param offset + */ +void +put(const void* buffer, size_t size, const std::filesystem::path& gkfs_filename, off_t offset); void diff --git a/include/client/env.hpp b/include/client/env.hpp index 64dd5467d..9b68d104b 100644 --- a/include/client/env.hpp +++ b/include/client/env.hpp @@ -37,16 +37,16 @@ /* Environment variables for the GekkoFS client */ namespace gkfs::env { -static constexpr auto LOG = ADD_PREFIX("LOG"); +[[maybe_unused]] [[maybe_unused]] static constexpr auto LOG = ADD_PREFIX("LOG"); #ifdef GKFS_DEBUG_BUILD -static constexpr auto LOG_DEBUG_VERBOSITY = ADD_PREFIX("LOG_DEBUG_VERBOSITY"); +[[maybe_unused]] [[maybe_unused]] static constexpr auto LOG_DEBUG_VERBOSITY = ADD_PREFIX("LOG_DEBUG_VERBOSITY"); static constexpr auto LOG_SYSCALL_FILTER = ADD_PREFIX("LOG_SYSCALL_FILTER"); #endif -static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); -static constexpr auto LOG_OUTPUT_TRUNC = ADD_PREFIX("LOG_OUTPUT_TRUNC"); -static constexpr auto CWD = ADD_PREFIX("CWD"); +[[maybe_unused]] static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); +[[maybe_unused]] static constexpr auto LOG_OUTPUT_TRUNC = ADD_PREFIX("LOG_OUTPUT_TRUNC"); +[[maybe_unused]] static constexpr auto CWD = ADD_PREFIX("CWD"); static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE"); #ifdef GKFS_ENABLE_FORWARDING static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE"); diff --git a/include/config.hpp b/include/config.hpp index 094c6077e..4ff8ac156 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -34,6 +34,7 @@ // environment prefixes (are concatenated in env module at compile time) #define CLIENT_ENV_PREFIX "LIBGKFS_" #define DAEMON_ENV_PREFIX "GKFS_" +#define API_ENV_PREFIX "GKFS_API_" namespace gkfs::config { diff --git a/src/api/CMakeLists.txt b/src/api/CMakeLists.txt index 48b603efc..38248789c 100644 --- a/src/api/CMakeLists.txt +++ b/src/api/CMakeLists.txt @@ -101,17 +101,23 @@ add_library(gkfs connection.cpp info.cpp libgkfs_staging.cpp + detail/connection_impl.cpp ${CMAKE_SOURCE_DIR}/src/global/rpc/rpc_util.cpp # TODO: create convenience target ${CMAKE_SOURCE_DIR}/src/client/rpc/rpc_types.cpp # TODO: use own RPC types + ${CMAKE_SOURCE_DIR}/src/global/path_util.cpp # TODO: convenience target? + ${CMAKE_SOURCE_DIR}/include/api/env.hpp ${CMAKE_SOURCE_DIR}/include/api/connection.hpp + ${CMAKE_SOURCE_DIR}/include/api/detail/connection_impl.hpp ${CMAKE_SOURCE_DIR}/include/api/gekkofs.hpp ${CMAKE_SOURCE_DIR}/include/api/info.hpp - ${CMAKE_SOURCE_DIR}/include/api/staging.hpp) + ${CMAKE_SOURCE_DIR}/include/api/staging.hpp + ) target_link_libraries(gkfs PRIVATE distributor env_util + metadata PUBLIC hermes ) diff --git a/src/api/connection.cpp b/src/api/connection.cpp index 47d3b0e00..8e01c10f6 100644 --- a/src/api/connection.cpp +++ b/src/api/connection.cpp @@ -26,391 +26,20 @@ SPDX-License-Identifier: GPL-3.0-or-later */ -#include -#include -#include -#include +#include +#include #include #include - -#include -#include // TODO: rpc_types.hpp might need to move -#include // TODO: independent api/env.hpp +#include #include -#include -#include +namespace gkfs::api::fs { //////////////////////////////////////////////////////////////////////////////// -// helper types and functions // +// implementation of the public API // //////////////////////////////////////////////////////////////////////////////// -namespace { - -using namespace gkfs::api; - -struct peer_list { - - using endpoint_type = hermes::endpoint; - - peer_list(uint64_t localhost_id, std::vector endpoints) - : localhost_id_(localhost_id), endpoints_(std::move(endpoints)) {} - - uint64_t - localhost_id() const { - return localhost_id_; - } - - hermes::endpoint - localhost() const { - return endpoints_.at(localhost_id_); - } - - std::size_t - size() const { - return endpoints_.size(); - } - - uint64_t localhost_id_ = 0; - std::vector endpoints_; -}; - -// TODO: mostly identical to load_hostfile() in preload_util.cpp (could not -// reuse due to CTX modification in extract_protocol(), which is called from -// load_hostfile()... Consider refactoring. -// std::tuple>> - -std::vector -load_hostfile(const std::string_view& hostfile) { - - // LOG(DEBUG, "Loading hosts_ file: \"{}\"", hostfile); - - std::ifstream lf(std::string{hostfile}); - if(!lf) { - throw std::runtime_error(fmt::format("{}", strerror(errno))); - } - // std::vector> hosts_; - std::vector hosts; - const std::regex line_re(R"(^(\S+)\s+(\S+)$)", - std::regex::ECMAScript | std::regex::optimize); - std::string line; - std::string host; - std::string uri; - std::smatch match; - while(getline(lf, line)) { - if(!regex_match(line, match, line_re)) { - - // LOG(ERROR, "Unrecognized line format: [hostfile: '{}', - // line: '{}']", - // hostfile, line); - - throw std::runtime_error(fmt::format("invalid line: '{}'", line)); - } - host = match[1]; - uri = match[2]; - hosts.emplace_back(host, uri); - } - if(hosts.empty()) { - throw std::runtime_error( - "hostfile exists but no addresses could be extracted"); - } - - return hosts; -} - -std::vector -find_hosts(const std::string_view& hostfile) { - try { - return ::load_hostfile(hostfile); - } catch(const std::exception& ex) { - throw std::runtime_error(fmt::format("Error loading hostfile '{}': {}", - hostfile, ex.what())); - } -} - -rpc::config -fetch_rpc_config(const std::string& uri) { - - rpc::config rpc_info; - - if(uri.rfind("://") == std::string::npos) { - // invalid format. kill client - throw std::runtime_error( - fmt::format("Invalid format for URI: '{}'", uri)); - } - std::string protocol{}; - - if(uri.find(gkfs::rpc::protocol::ofi_sockets) != std::string::npos) { - protocol = gkfs::rpc::protocol::ofi_sockets; - } else if(uri.find(gkfs::rpc::protocol::ofi_psm2) != std::string::npos) { - protocol = gkfs::rpc::protocol::ofi_psm2; - } else if(uri.find(gkfs::rpc::protocol::ofi_verbs) != std::string::npos) { - protocol = gkfs::rpc::protocol::ofi_verbs; - } - // check for shared memory protocol. Can be plain shared memory or real ofi - // protocol + auto_sm - if(uri.find(gkfs::rpc::protocol::na_sm) != std::string::npos) { - if(protocol.empty()) - protocol = gkfs::rpc::protocol::na_sm; - else - rpc_info.auto_sm_enabled_ = true; - } - if(protocol.empty()) { - // unsupported protocol. kill client - throw std::runtime_error(fmt::format( - "Unsupported RPC protocol found in hosts_ file with URI: '{}'", - uri)); - } - // LOG(INFO, - // "RPC protocol '{}' extracted from hosts_ file. Using auto_sm is - // '{}'", protocol, CTX->auto_sm()); - rpc_info.protocol_string_ = protocol; - - return rpc_info; -} - - -/** - * Looks up a host endpoint via Hermes - * @param uri - * @param max_retries - * @return hermes endpoint, if successful - * @throws std::runtime_error - */ -// TODO: almost identical to lookup_endpoint() in preload_util.cpp, generalize -hermes::endpoint -lookup_endpoint(const std::unique_ptr& rpc_service, - const std::string_view& uri, std::size_t max_retries = 3) { - - // LOG(DEBUG, "Looking up address \"{}\"", uri); - - std::random_device rd; // obtain a random number from hardware - std::size_t attempts = 0; - std::string error_msg; - - do { - try { - return rpc_service->lookup(std::string{uri}); - } catch(const std::exception& ex) { - error_msg = ex.what(); - - // LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}] ", - // uri, attempts + 1, max_retries); - - // Wait a random amount of time and try again - std::mt19937 g(rd()); // seed the random generator - std::uniform_int_distribution<> distr( - 50, 50 * (attempts + 2)); // define the range - std::this_thread::sleep_for(std::chrono::milliseconds(distr(g))); - continue; - } - } while(++attempts < max_retries); - - throw std::runtime_error( - fmt::format("Endpoint for address '{}' could not be found ({})", - uri, error_msg)); -} - - -// TODO: mostly identical to init_hermes_client() in preload.cpp -// TODO: consider refactoring them -std::unique_ptr -create_rpc_service(bool use_auto_sm, const std::string_view& protocol_string) { - - try { - - hermes::engine_options opts{}; - - if(use_auto_sm) { - opts |= hermes::use_auto_sm; - } - - if(gkfs::rpc::protocol::ofi_psm2 == protocol_string) { - opts |= hermes::force_no_block_progress; - } - - return std::make_unique( - hermes::get_transport_type(std::string{protocol_string}), opts); - } catch(const std::exception& ex) { - throw std::runtime_error(fmt::format( - "Failed to initialize RPC subsystem: {}\n", ex.what())); - } -} - -std::unique_ptr -start_rpc_service(const rpc::config& rpc_config) { - auto rpc_service = create_rpc_service(rpc_config.auto_sm_enabled_, - rpc_config.protocol_string_); - rpc_service->run(); - return rpc_service; -} - -// TODO: mostly identical to connec_to_hosts() in rpc_util.cpp -peer_list -resolve(const std::unique_ptr& rpc_service, - const std::vector& hosts) { - - const auto localhost_name = gkfs::rpc::get_my_hostname(true); - uint64_t localhost_id = 0; - bool localhost_found = false; - - std::vector endpoints; - endpoints.resize(hosts.size()); - - std::vector host_ids(hosts.size()); - // populate vector with [0, ..., host_size - 1] - std::iota(std::begin(host_ids), std::end(host_ids), 0); - /* - * Shuffle hosts_ to balance addr lookups to all hosts_ - * Too many concurrent lookups send to same host - * could overwhelm the server, - * returning error when addr lookup - */ - std::random_device rd; // obtain a random number from hardware - std::mt19937 g(rd()); // seed the random generator - std::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts_ - // vector - // lookup addresses and put abstract server addresses into rpc_addresses - - for(const auto& id : host_ids) { - const auto& hostname = hosts.at(id).hostname(); - const auto& uri = hosts.at(id).uri(); - - endpoints[id] = ::lookup_endpoint(rpc_service, uri); - - if(!localhost_found && hostname == localhost_name) { - // LOG(DEBUG, "Found local host: {}", hostname); - localhost_id = id; - localhost_found = true; - } - - // LOG(DEBUG, "Found peer: {}", - // endpoints[id].to_string()); - } - - return peer_list{localhost_id, endpoints}; -} - -// TODO: mostly identical to gkfs::rpc::forward_get_fs_config() in -// TODO: forward_management.cpp, consider refactoring them -fs::config -fetch_fs_config(const std::unique_ptr& rpc_service, - const peer_list& peers) { - - const auto endp = peers.localhost(); - gkfs::rpc::fs_config::output out; - - try { - // LOG(DEBUG, "Retrieving file system configurations from - // daemon"); - // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that - // we can retry for RPC_TRIES (see old commits with margo) - // TODO(amiranda): hermes will eventually provide a post(endpoint) - // returning one result and a broadcast(endpoint_set) returning a - // result_set. When that happens we can remove the .at(0) :/ - out = rpc_service->post(endp).get().at(0); - } catch(const std::exception& ex) { - throw std::runtime_error( - "Error retrieving gekkofs_instance configurations from " - "daemon"); - } - - return {out.atime_state(), out.mtime_state(), - out.ctime_state(), out.link_cnt_state(), - out.blocks_state(), out.uid(), - out.gid(), out.rootdir(), - out.mountdir()}; -} - - -} // namespace - -namespace gkfs::api::fs { - -class connection::impl { - -public: - impl(const std::string_view& hostfile) - : hosts_(find_hosts(hostfile)), - rpc_config_(fetch_rpc_config(hosts_.at(0).uri())), - rpc_service_(start_rpc_service(rpc_config_)), - peers_(resolve(rpc_service_, hosts_)), - fs_config_(fetch_fs_config(rpc_service_, peers_)), - distributor_(std::make_unique( - peers_.localhost_id(), peers_.size())) { - - using namespace std::string_literals; - -#if 0 - // read host information - std::vector> hosts_{}; - - try { - // LOG(INFO, "Loading peer addresses..."); - std::tie(rpc_config_, hosts_) = ::find_hosts(); - } catch(const std::exception& e) { - throw std::runtime_error( - fmt::format("Failed to load hosts_ addresses:" - " {}", - e.what())); - } -#endif - - // rpc_service_ = create_rpc_service(); - // rpc_service_->run(); - -#if 0 - try { - ping(hosts_); - } catch(const std::exception& e) { - throw std::runtime_error( - fmt::format("Failed to connect to hosts_: {}\n", e.what())); - } -#endif - -#if 0 - distributor_ = std::make_unique( - local_host_id_, host_endpoints_.size()); -#endif - - // LOG(INFO, "Retrieving file system configuration..."); -#if 0 - try { - fs_config_ = fetch_fs_info(); - } catch(const std::exception& ex) { - throw std::runtime_error( - fmt::format("Unable to fetch file system " - "configurations from daemon process " - "through RPC: {}", - ex.what())); - } -#endif - - // LOG(INFO, "Environment initialization successful."); - } - - impl(const impl& other) = delete; - impl(impl&& rhs) = default; - impl& - operator=(const impl& other) = delete; - impl& - operator=(impl&& rhs) = default; - ~impl() = default; - -private: - //////////////////////////////////////////////////////////////////////////// - // members // - //////////////////////////////////////////////////////////////////////////// - std::vector hosts_; - rpc::config rpc_config_; - std::unique_ptr rpc_service_; - peer_list peers_; - std::vector host_endpoints_; - fs::config fs_config_; - std::unique_ptr distributor_; -}; - connection::connection() : pimpl_(new connection::impl(gkfs::env::get_var( gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path))) {} @@ -420,4 +49,11 @@ connection::connection(const std::string& hostfile) connection::~connection() = default; -} // namespace gkfs::api::fs +void +connection::put(const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename) { + pimpl_->put(buffer, size, gkfs_filename); +} + + +} // namespace gkfs::api::fs \ No newline at end of file diff --git a/src/api/detail/connection_impl.cpp b/src/api/detail/connection_impl.cpp new file mode 100644 index 000000000..170d2909a --- /dev/null +++ b/src/api/detail/connection_impl.cpp @@ -0,0 +1,811 @@ +/* + Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + This file is part of GekkoFS. + + GekkoFS is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + GekkoFS is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GekkoFS. If not, see . + + SPDX-License-Identifier: GPL-3.0-or-later +*/ + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include // TODO: rpc_types.hpp might need to move +#include +#include + + +#include +#include +#include + + +//////////////////////////////////////////////////////////////////////////////// +// helper types and functions // +//////////////////////////////////////////////////////////////////////////////// +namespace { + +using namespace gkfs::api; + +// TODO: mostly identical to load_hostfile() in preload_util.cpp (could not +// reuse due to CTX modification in extract_protocol(), which is called from +// load_hostfile()... Consider refactoring. +// std::tuple>> + +std::vector +load_hostfile(const std::string_view& hostfile) { + + // LOG(DEBUG, "Loading hosts_ file: \"{}\"", hostfile); + + std::ifstream lf(std::string{hostfile}); + if(!lf) { + throw std::runtime_error(fmt::format("{}", strerror(errno))); + } + // std::vector> hosts_; + std::vector hosts; + const std::regex line_re(R"(^(\S+)\s+(\S+)$)", + std::regex::ECMAScript | std::regex::optimize); + std::string line; + std::string host; + std::string uri; + std::smatch match; + while(getline(lf, line)) { + if(!regex_match(line, match, line_re)) { + + // LOG(ERROR, "Unrecognized line format: [hostfile: '{}', + // line: '{}']", + // hostfile, line); + + throw std::runtime_error(fmt::format("invalid line: '{}'", line)); + } + host = match[1]; + uri = match[2]; + hosts.emplace_back(host, uri); + } + if(hosts.empty()) { + throw std::runtime_error( + "hostfile exists but no addresses could be extracted"); + } + + return hosts; +} + +std::vector +find_hosts(const std::string_view& hostfile) { + try { + return ::load_hostfile(hostfile); + } catch(const std::exception& ex) { + throw std::runtime_error(fmt::format("Error loading hostfile '{}': {}", + hostfile, ex.what())); + } +} + +rpc::config +fetch_rpc_config(const std::string& uri) { + + rpc::config rpc_info; + + if(uri.rfind("://") == std::string::npos) { + // invalid format. kill client + throw std::runtime_error( + fmt::format("Invalid format for URI: '{}'", uri)); + } + std::string protocol{}; + + if(uri.find(gkfs::rpc::protocol::ofi_sockets) != std::string::npos) { + protocol = gkfs::rpc::protocol::ofi_sockets; + } else if(uri.find(gkfs::rpc::protocol::ofi_psm2) != std::string::npos) { + protocol = gkfs::rpc::protocol::ofi_psm2; + } else if(uri.find(gkfs::rpc::protocol::ofi_verbs) != std::string::npos) { + protocol = gkfs::rpc::protocol::ofi_verbs; + } + // check for shared memory protocol. Can be plain shared memory or real ofi + // protocol + auto_sm + if(uri.find(gkfs::rpc::protocol::na_sm) != std::string::npos) { + if(protocol.empty()) + protocol = gkfs::rpc::protocol::na_sm; + else + rpc_info.auto_sm_enabled_ = true; + } + if(protocol.empty()) { + // unsupported protocol. kill client + throw std::runtime_error(fmt::format( + "Unsupported RPC protocol found in hosts_ file with URI: '{}'", + uri)); + } + // LOG(INFO, + // "RPC protocol '{}' extracted from hosts_ file. Using auto_sm is + // '{}'", protocol, CTX->auto_sm()); + rpc_info.protocol_string_ = protocol; + + return rpc_info; +} + + +/** + * Looks up a host endpoint via Hermes + * @param uri + * @param max_retries + * @return hermes endpoint, if successful + * @throws std::runtime_error + */ +// TODO: almost identical to lookup_endpoint() in preload_util.cpp, generalize +hermes::endpoint +lookup_endpoint(const std::unique_ptr& rpc_service, + const std::string_view& uri, std::size_t max_retries = 3) { + + // LOG(DEBUG, "Looking up address \"{}\"", uri); + + std::random_device rd; // obtain a random number from hardware + std::size_t attempts = 0; + std::string error_msg; + + do { + try { + return rpc_service->lookup(std::string{uri}); + } catch(const std::exception& ex) { + error_msg = ex.what(); + + // LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}] ", + // uri, attempts + 1, max_retries); + + // Wait a random amount of time and try again + std::mt19937 g(rd()); // seed the random generator + std::uniform_int_distribution<> distr( + 50, 50 * (attempts + 2)); // define the range + std::this_thread::sleep_for(std::chrono::milliseconds(distr(g))); + continue; + } + } while(++attempts < max_retries); + + throw std::runtime_error( + fmt::format("Endpoint for address '{}' could not be found ({})", + uri, error_msg)); +} + + +// TODO: mostly identical to init_hermes_client() in preload.cpp +// TODO: consider refactoring them +std::unique_ptr +create_rpc_service(bool use_auto_sm, const std::string_view& protocol_string) { + + try { + + hermes::engine_options opts{}; + + if(use_auto_sm) { + opts |= hermes::use_auto_sm; + } + + if(gkfs::rpc::protocol::ofi_psm2 == protocol_string) { + opts |= hermes::force_no_block_progress; + } + + return std::make_unique( + hermes::get_transport_type(std::string{protocol_string}), opts); + } catch(const std::exception& ex) { + throw std::runtime_error(fmt::format( + "Failed to initialize RPC subsystem: {}\n", ex.what())); + } +} + +std::unique_ptr +start_rpc_service(const rpc::config& rpc_config) { + auto rpc_service = create_rpc_service(rpc_config.auto_sm_enabled_, + rpc_config.protocol_string_); + rpc_service->run(); + return rpc_service; +} + +// TODO: mostly identical to connec_to_hosts() in rpc_util.cpp +fs::detail::peer_list +resolve(const std::unique_ptr& rpc_service, + const std::vector& hosts) { + + const auto localhost_name = gkfs::rpc::get_my_hostname(true); + uint64_t localhost_id = 0; + bool localhost_found = false; + + std::vector endpoints; + endpoints.resize(hosts.size()); + + std::vector host_ids(hosts.size()); + // populate vector with [0, ..., host_size - 1] + std::iota(std::begin(host_ids), std::end(host_ids), 0); + /* + * Shuffle hosts_ to balance addr lookups to all hosts_ + * Too many concurrent lookups send to same host + * could overwhelm the server, + * returning error when addr lookup + */ + std::random_device rd; // obtain a random number from hardware + std::mt19937 g(rd()); // seed the random generator + std::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts_ + // vector + // lookup addresses and put abstract server addresses into rpc_addresses + + for(const auto& id : host_ids) { + const auto& hostname = hosts.at(id).hostname(); + const auto& uri = hosts.at(id).uri(); + + endpoints[id] = ::lookup_endpoint(rpc_service, uri); + + if(!localhost_found && hostname == localhost_name) { + // LOG(DEBUG, "Found local host: {}", hostname); + localhost_id = id; + localhost_found = true; + } + + // LOG(DEBUG, "Found peer: {}", + // endpoints[id].to_string()); + } + + return fs::detail::peer_list{localhost_id, endpoints}; +} + +// TODO: mostly identical to gkfs::rpc::forward_get_fs_config() in +// TODO: forward_management.cpp, consider refactoring them +fs::config +fetch_fs_config(const std::unique_ptr& rpc_service, + const fs::detail::peer_list& peers) { + + const auto endp = peers.localhost(); + gkfs::rpc::fs_config::output out; + + try { + // LOG(DEBUG, "Retrieving file system configurations from + // daemon"); + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that + // we can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + out = rpc_service->post(endp).get().at(0); + } catch(const std::exception& ex) { + throw std::runtime_error( + "Error retrieving gekkofs_instance configurations from " + "daemon"); + } + + return {out.atime_state(), out.mtime_state(), + out.ctime_state(), out.link_cnt_state(), + out.blocks_state(), out.uid(), + out.gid(), out.rootdir(), + out.mountdir()}; +} + + +// TODO: mostly identical to forward_create in forward_metadata.cpp; refactor? + + +} // namespace + + +namespace gkfs::api::fs { + +connection::impl::impl(const std::string_view& hostfile) + : hosts_(find_hosts(hostfile)), + rpc_config_(fetch_rpc_config(hosts_.at(0).uri())), + rpc_service_(start_rpc_service(rpc_config_)), + peers_(resolve(rpc_service_, hosts_)), + fs_config_(fetch_fs_config(rpc_service_, peers_)), + distributor_(std::make_unique( + peers_.localhost_id(), peers_.size())) { + + using namespace std::string_literals; + +#if 0 + // read host information + std::vector> hosts_{}; + + try { + // LOG(INFO, "Loading peer addresses..."); + std::tie(rpc_config_, hosts_) = ::find_hosts(); + } catch(const std::exception& e) { + throw std::runtime_error( + fmt::format("Failed to load hosts_ addresses:" + " {}", + e.what())); + } +#endif + + // rpc_service_ = create_rpc_service(); + // rpc_service_->run(); + +#if 0 + try { + ping(hosts_); + } catch(const std::exception& e) { + throw std::runtime_error( + fmt::format("Failed to connect to hosts_: {}\n", e.what())); + } +#endif + +#if 0 + distributor_ = std::make_unique( + local_host_id_, host_endpoints_.size()); +#endif + + // LOG(INFO, "Retrieving file system configuration..."); +#if 0 + try { + fs_config_ = fetch_fs_info(); + } catch(const std::exception& ex) { + throw std::runtime_error( + fmt::format("Unable to fetch file system " + "configurations from daemon process " + "through RPC: {}", + ex.what())); + } +#endif + + // LOG(INFO, "Environment initialization successful."); +} + +// connection::impl::impl(const impl& other) = delete; +connection::impl::impl(impl&& rhs) noexcept = default; +// connection::impl& +// connection::impl::operator=(const impl& other) = delete; +connection::impl& +connection::impl::operator=(impl&& rhs) noexcept = default; +connection::impl::~impl() = default; + +void +connection::impl::put(const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename) const { + + create(gkfs_filename, 0660 | S_IFREG); + pwrite(gkfs_filename, buffer, size, 0); +} + +/** + * Send an RPC for a stat request + * + * @param path + * @param attr + * @return the attributes requested + */ +std::string +connection::impl::stat(const std::string& filename) const { + + auto endp = peers_.at(distributor_->locate_file_metadata(filename)); + + // LOG(DEBUG, "Sending RPC ..."); + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we + // can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + auto out = rpc_service_->post(endp, filename).get().at(0); + // LOG(DEBUG, "Got response success: {}", out.err()); + + if(int error_code = out.err()) { + throw std::runtime_error(fmt::format("Failed to get metadata for " + "'{}': {}", + filename, strerror(error_code))); + } + + return out.db_val(); +} + + +// TODO: mostly identical to gkfs_create in gkfs_functions.cpp; +// refactor? +void +connection::impl::create(const std::string& gkfs_filename, mode_t mode) const { + + // file type must be set + switch(mode & S_IFMT) { + case 0: + mode |= S_IFREG; + break; + case S_IFREG: // intentionally fall-through + case S_IFDIR: + break; + case S_IFCHR: // intentionally fall-through + case S_IFBLK: + case S_IFIFO: + case S_IFSOCK: + // LOG(WARNING, "Unsupported node type"); + throw std::runtime_error(strerror(ENOTSUP)); + default: + // LOG(WARNING, "Unrecognized node type"); + throw std::runtime_error(strerror(EINVAL)); + } + + // TODO: the connection should allow the API to check if + // the daemons support CREATE_CHECK_PARENTS. + // Another option is to offer an option to disable the check + // For now we are assuming it doesn't matter + if(false) { // TODO: if(connection.options().fs_has_create_check_parents)... + check_parent_directory(gkfs_filename); + } + + if(auto err = forward_create_rpc(gkfs_filename, mode)) { + throw std::runtime_error( + fmt::format("Error creating file: {}", strerror(err))); + } +} + +// TODO: mostly identical to forward_write in forward_data.cpp, refactor +ssize_t +connection::impl::pwrite(const std::string& gkfs_filename, const void* buffer, + size_t size, off_t offset) const { + + const auto [update_error, updated_size] = + forward_update_metadentry_size_rpc(gkfs_filename, size, offset, + false); + + if(update_error) { + // LOG(ERROR, "update_metadentry_size() failed with update_error + // '{}'", update_error); + throw std::runtime_error(fmt::format("Error writing to file: {}", + strerror(update_error))); + } + + const auto [write_error, written_size] = forward_write_rpc( + gkfs_filename, buffer, offset, size, updated_size, false); + + if(write_error) { + // LOG(WARNING, "gkfs::rpc::forward_write() failed with + // update_error + // '{}'", update_error); + throw std::runtime_error(fmt::format("Error writing to file: {}", + strerror(write_error))); + } + + return written_size; +} + +//////////////////////////////////////////////////////////////////////////////// +// private helpers // +//////////////////////////////////////////////////////////////////////////////// +/** + * Checks if metadata for parent directory exists (can be disabled with + * CREATE_CHECK_PARENTS). errno may be set + * @param path + * @return 0 on success, -1 on failure + */ +void +connection::impl::check_parent_directory(const std::string& path) const { + + auto p_comp = gkfs::path::dirname(path); + auto md = fetch_metadata(p_comp, false); + if(!md) { + if(errno == ENOENT) { + // LOG(DEBUG, "Parent component does not exist: '{}'", + // p_comp); + throw std::runtime_error( + fmt::format("Parent '{}' does not exist.", p_comp)); + } else { + // LOG(ERROR, "Failed to get metadata for parent + // component '{}': {}", + // path, strerror(errno)); + throw std::runtime_error( + fmt::format("Failed to get metadata for '{}': {}", p_comp, + strerror(errno))); + } + } + + if(!S_ISDIR(md->mode())) { + // LOG(DEBUG, "Parent component is not a directory: '{}'", + // p_comp); + throw std::runtime_error(fmt::format("Parent '{}' is not a " + "directory", + p_comp)); + } +} + +// TODO: very similar to get_metadata() from preload_util.hpp. Consider +// refactoring +/** + * Retrieve metadata from daemon and return Metadata object + * errno may be set + * @param path + * @param follow_links + * @return Metadata + */ +std::optional +connection::impl::fetch_metadata(const std::string& path, + bool follow_links) const { + std::string attr; + + attr = stat(path); + + //#ifdef HAS_SYMLINKS + if(false) { // TODO: if(connection.options().fs_has_symlinks)... + if(follow_links) { + gkfs::metadata::Metadata md{attr}; + while(md.is_link()) { + attr = stat(md.target_path()); + return gkfs::metadata::Metadata{attr}; + } + } + } + //#endif + return gkfs::metadata::Metadata{attr}; +} + +// TODO: mostly identical to forward_create in forward_metadata.cpp; refactor? +int +connection::impl::forward_create_rpc(const std::string& gkfs_filename, + const mode_t mode) const { + + const auto endp = + peers_.at(distributor_->locate_file_metadata(gkfs_filename)); + + try { + // LOG(DEBUG, "Sending RPC ..."); + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we + // can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + auto out = + rpc_service_->post(endp, gkfs_filename, mode) + .get() + .at(0); + // LOG(DEBUG, "Got response success: {}", out.err()); + + return out.err(); + } catch(const std::exception& ex) { + // LOG(ERROR, "while getting rpc output"); + throw std::runtime_error(fmt::format("Error retrieving RPC output: " + "{}", + ex.what())); + } +} + +// TODO: mostly identical to forward_update_metadentry_size_rpc, refactor +std::tuple +connection::impl::forward_update_metadentry_size_rpc( + const std::string& gkfs_filename, size_t size, off_t offset, + bool is_append) const { + + const auto endp = + peers_.at(distributor_->locate_file_metadata(gkfs_filename)); + + try { + // LOG(DEBUG, "Sending RPC ..."); + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we + // can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + auto out = rpc_service_ + ->post( + endp, gkfs_filename, size, offset, is_append) + .get() + .at(0); + + // LOG(DEBUG, "Got response success: {}", out.err()); + + if(out.err()) + return {out.err(), 0}; + else + return {0, out.ret_size()}; + } catch(const std::exception& ex) { + throw std::runtime_error(fmt::format("Error retrieving RPC output: " + "{}", + ex.what())); + } +} + +// TODO: mostly identical to forward_write in forward_data.cpp, refactor +std::tuple +connection::impl::forward_write_rpc(const std::string& gkfs_filename, + const void* buffer, off_t start_offset, + size_t write_size, + size_t updated_metadentry_size, + bool is_append) const { + + using namespace gkfs::utils::arithmetic; + + assert(write_size > 0); + + // Calculate chunkid boundaries and numbers so that daemons know in + // which interval to look for chunks + off64_t offset = is_append ? start_offset + : static_cast(updated_metadentry_size - + write_size); + + auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize); + auto chnk_end = block_index((offset + write_size) - 1, + gkfs::config::rpc::chunksize); + + // Collect all chunk ids within count that have the same destination so + // that those are send in one rpc bulk transfer + std::map> target_chnks{}; + // contains the target ids, used to access the target_chnks map. + // First idx is chunk with potential offset + std::vector targets{}; + + // targets for the first and last chunk as they need special treatment + uint64_t chnk_start_target = 0; + uint64_t chnk_end_target = 0; + + for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) { + auto target = distributor_->locate_data(gkfs_filename, chnk_id); + + if(target_chnks.count(target) == 0) { + target_chnks.insert( + std::make_pair(target, std::vector{chnk_id})); + targets.push_back(target); + } else { + target_chnks[target].push_back(chnk_id); + } + + // set first and last chnk targets + if(chnk_id == chnk_start) { + chnk_start_target = target; + } + + if(chnk_id == chnk_end) { + chnk_end_target = target; + } + } + + // some helper variables for async RPC + std::vector bufseq{ + hermes::mutable_buffer{const_cast(buffer), write_size}, + }; + + // expose user buffers so that they can serve as RDMA data sources + // (these are automatically "unexposed" when the destructor is called) + hermes::exposed_memory local_buffers; + + try { + local_buffers = + rpc_service_->expose(bufseq, hermes::access_mode::read_only); + + } catch(const std::exception& ex) { + // LOG(ERROR, "Failed to expose buffers for RMA"); + throw std::runtime_error(fmt::format("Error exposing buffers for " + "RDMA: {}", + ex.what())); + } + + std::vector> handles; + + // Issue non-blocking RPC requests and wait for the result later + // + // TODO(amiranda): This could be simplified by adding a vector of inputs + // to async_engine::broadcast(). This would allow us to avoid manually + // looping over handles as we do below + for(const auto& target : targets) { + + // total chunk_size for target + auto total_chunk_size = + target_chnks[target].size() * gkfs::config::rpc::chunksize; + + // receiver of first chunk must subtract the offset from first chunk + if(target == chnk_start_target) { + total_chunk_size -= + block_overrun(offset, gkfs::config::rpc::chunksize); + } + + // receiver of last chunk must subtract + if(target == chnk_end_target && + !is_aligned(offset + write_size, gkfs::config::rpc::chunksize)) { + total_chunk_size -= block_underrun(offset + write_size, + gkfs::config::rpc::chunksize); + } + + auto endp = peers_.at(target); + + try { + + // LOG(DEBUG, "Sending RPC ..."); + + gkfs::rpc::write_data::input in( + gkfs_filename, + // first offset in targets is the chunk with + // a potential offset + block_overrun(offset, gkfs::config::rpc::chunksize), target, + peers_.size(), + // number of chunks handled by that destination + target_chnks[target].size(), + // chunk start id of this write + chnk_start, + // chunk end id of this write + chnk_end, + // total size to write + total_chunk_size, local_buffers); + + // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that + // we can retry for RPC_TRIES (see old commits with margo) + // TODO(amiranda): hermes will eventually provide a post(endpoint) + // returning one result and a broadcast(endpoint_set) returning a + // result_set. When that happens we can remove the .at(0) :/ + handles.emplace_back( + rpc_service_->post(endp, in)); + + // LOG(DEBUG, + // "host: {}, path: \"{}\", chunks: {}, size: {}, + // offset: {}", target, path, in.chunk_n(), + // total_chunk_size, in.offset()); + + } catch(const std::exception& ex) { + // LOG(ERROR, + // "Unable to send non-blocking rpc for " + // "path \"{}\" [peer: {}]", + // path, target); + throw std::runtime_error(fmt::format("Error sending non-block RPC" + " for '{}' [peer: {}]: {}", + gkfs_filename, target, + ex.what())); + } + } + + // Wait for RPC responses and then get response and add it to out_size + // which is the written size All potential outputs are served to free + // resources regardless of errors, although an errorcode is set. + auto err = 0; + ssize_t out_size = 0; + std::size_t idx = 0; + + for(const auto& h : handles) { + try { + // XXX We might need a timeout here to not wait forever for an + // output that never comes? + auto out = h.get().at(0); + + if(out.err() != 0) { + // LOG(ERROR, "Daemon reported error: {}", + // out.err()); + err = out.err(); + } + + out_size += static_cast(out.io_size()); + + } catch(const std::exception& ex) { + // LOG(ERROR, "Failed to get rpc output for path \"{}\" + // [peer: {}]", + // path, targets[idx]); + throw std::runtime_error(fmt::format("Error fetching RPC " + "response for path '{}' " + "[peer: {}]: {}", + gkfs_filename, targets[idx], + ex.what())); + } + + idx++; + } + + /* + * Typically file systems return the size even if only a part of it was + * written. In our case, we do not keep track which daemon fully wrote its + * workload. Thus, we always return size 0 on error. + */ + return {err, err ? out_size : 0}; +} + + +} // namespace gkfs::api::fs \ No newline at end of file diff --git a/src/api/libgkfs_staging.cpp b/src/api/libgkfs_staging.cpp index 1d6031cd3..da972b397 100644 --- a/src/api/libgkfs_staging.cpp +++ b/src/api/libgkfs_staging.cpp @@ -31,7 +31,57 @@ #include #include -namespace gkfs::api { +#include + +#include + +#include +#include + +#include // TODO: api/rpc? + +namespace gkfs::api::staging { + +/** + * Transfer data into a GekkoFS file. + * + * This function transfers data from the provided buffer into a target + * file in GekkoFS in one operation. If the file does not exist, it will be + * created. This function creates an internal connection to the filesystem and + * destroys it after the operation completes. + * + * @param buffer The buffer with the data contents + * @param size The amount of data to transfer + * @param gkfs_pathname The target filename inside GekkoFS + */ +void +put(const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename, off_t offset = 0) { + + fs::connection fc; + + + fc.put(buffer, size, gkfs_filename); +} + +/** + * Transfer data into a GekkoFS file. + * + * This function transfers data from the provided buffer into a target + * file in GekkoFS in one operation. If the file does not exist, it will be + * created. The function is capable of reusing an existing connection + * to the filesystem, which might be beneficial for performance reasons, but + * leaves the responsibility of creating and destroying the connection to the + * user. + * + * @param conn An existing connection for GekkoFS + * @param buffer The buffer with the data contents + * @param size The amount of data to transfer + * @param gkfs_pathname The target filename inside GekkoFS + */ +void +put(const gkfs::api::fs::connection& conn, const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename) {} void put(const std::filesystem::path& pathname, const void* buf, size_t count, @@ -42,4 +92,4 @@ put(const gkfs::api::fs::connection& conn, const std::filesystem::path& pathname, const void* buf, size_t count, off_t offset) {} -} // namespace gkfs::api \ No newline at end of file +} // namespace gkfs::api::staging \ No newline at end of file -- GitLab From 1631ade617cd04d1f53c0411f8e7b49300527dce Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 20 Apr 2021 09:05:13 +0200 Subject: [PATCH 4/5] Update Hermes submodule --- external/hermes | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/hermes b/external/hermes index 0af45bfa6..97820ed81 160000 --- a/external/hermes +++ b/external/hermes @@ -1 +1 @@ -Subproject commit 0af45bfa667f7ff9c78167ef94d975bffbd879f0 +Subproject commit 97820ed81aba9ef7c05891ce80c53a8533b78609 -- GitLab From 32f164145204d70f93bfbcb52e4fb6406b960af5 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 20 Apr 2021 09:06:08 +0200 Subject: [PATCH 5/5] Add tests for gkfs::staging::put() --- .gitlab-ci.yml | 3 +- include/api/connection.hpp | 4 +- include/api/detail/connection_impl.hpp | 8 +- include/api/env.hpp | 6 +- include/api/staging.hpp | 38 +-- include/client/env.hpp | 8 +- include/config.hpp | 2 +- src/api/connection.cpp | 7 +- src/api/detail/connection_impl.cpp | 13 +- src/api/libgkfs_staging.cpp | 33 +- tests/unit/test_api.cpp | 407 +++++++++++++++++++++++-- 11 files changed, 438 insertions(+), 91 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index c191dfeed..c69636d72 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -109,6 +109,7 @@ integration tests: --build-dir ${BUILD_PATH} --exclusions "${CI_PROJECT_DIR}/scripts/ci/.coverage-exclusions" artifacts: + when: always # when: on_failure paths: # - "${INTEGRATION_TESTS_RUN_PATH}" @@ -127,7 +128,7 @@ unit: --build-dir ${BUILD_PATH} --exclusions "${CI_PROJECT_DIR}/scripts/ci/.coverage-exclusions" artifacts: - # when: on_failure + when: always paths: - ${BUILD_PATH} # - Testing diff --git a/include/api/connection.hpp b/include/api/connection.hpp index 1c5457bfa..47a9fe3c0 100644 --- a/include/api/connection.hpp +++ b/include/api/connection.hpp @@ -48,9 +48,9 @@ public: connection& operator=(const connection& rhs) = delete; - void + std::size_t put(const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename); + const std::filesystem::path& gkfs_filename, off_t start_offset) const; private: class impl; diff --git a/include/api/detail/connection_impl.hpp b/include/api/detail/connection_impl.hpp index 067f60b09..13104d37c 100644 --- a/include/api/detail/connection_impl.hpp +++ b/include/api/detail/connection_impl.hpp @@ -91,9 +91,9 @@ public: operator=(impl&& rhs) noexcept; ~impl(); - void + std::size_t put(const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename) const; + const std::filesystem::path& gkfs_filename, off_t start_offset) const; std::string stat(const std::string& path) const; @@ -123,8 +123,8 @@ private: std::tuple forward_write_rpc(const std::string& gkfs_filename, const void* buffer, - off_t offset, size_t write_size, size_t updated_metadentry_size, - bool is_append) const; + off_t offset, size_t write_size, + size_t updated_metadentry_size, bool is_append) const; private: //////////////////////////////////////////////////////////////////////////// diff --git a/include/api/env.hpp b/include/api/env.hpp index 5f3d02861..f84253e6e 100644 --- a/include/api/env.hpp +++ b/include/api/env.hpp @@ -40,11 +40,13 @@ namespace gkfs::env { [[maybe_unused]] static constexpr auto LOG = ADD_PREFIX("LOG"); #ifdef GKFS_DEBUG_BUILD -[[maybe_unused]] static constexpr auto LOG_DEBUG_VERBOSITY = ADD_PREFIX("LOG_DEBUG_VERBOSITY"); +[[maybe_unused]] static constexpr auto LOG_DEBUG_VERBOSITY = + ADD_PREFIX("LOG_DEBUG_VERBOSITY"); #endif [[maybe_unused]] static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); -[[maybe_unused]] static constexpr auto LOG_OUTPUT_TRUNC = ADD_PREFIX("LOG_OUTPUT_TRUNC"); +[[maybe_unused]] static constexpr auto LOG_OUTPUT_TRUNC = + ADD_PREFIX("LOG_OUTPUT_TRUNC"); [[maybe_unused]] static constexpr auto CWD = ADD_PREFIX("CWD"); static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE"); #ifdef GKFS_ENABLE_FORWARDING diff --git a/include/api/staging.hpp b/include/api/staging.hpp index 2f074d103..4e3f5cfee 100644 --- a/include/api/staging.hpp +++ b/include/api/staging.hpp @@ -44,11 +44,13 @@ namespace gkfs::api::staging { * @param buffer The buffer with the data contents * @param size The amount of data to transfer * @param gkfs_pathname The target filename inside GekkoFS - * @param offset The offset in the file where the data will be placed. + * @param start_offset The offset at which data will be written + * @returns The number of bytes written + * @throws std::runtime_error An error occurred */ -void -put(const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename, off_t offset); +std::size_t +put(const void* buffer, size_t size, const std::filesystem::path& gkfs_filename, + off_t start_offset); /** * Transfer data into a GekkoFS file. @@ -64,31 +66,13 @@ put(const void* buffer, size_t size, * @param buffer The buffer with the data contents * @param size The amount of data to transfer * @param gkfs_pathname The target filename inside GekkoFS + * @param start_offset The offset at which data should be written + * @returns The number of bytes written + * @throws std::runtime_error An error occurred */ -void +std::size_t put(const gkfs::api::fs::connection& conn, const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename); - -/** - * Transfer data into a GekkoFS file. - * - * This function creates an internal connection to a running GekkoFS - * filesystem and transfers data from the provided buffer into a target - * file. - * - * @param buffer The buffer with the data contents - * @param size The amount of data to write - * @param gkfs_pathname The target filename inside GekkoFS - * @param offset - */ -void -put(const void* buffer, size_t size, const std::filesystem::path& gkfs_filename, - off_t offset); - -void -put(const gkfs::api::fs::connection& conn, - const std::filesystem::path& pathname, const void* buf, size_t count, - off_t offset); + const std::filesystem::path& gkfs_filename, off_t start_offset); } // namespace gkfs::api::staging diff --git a/include/client/env.hpp b/include/client/env.hpp index 9b68d104b..26f9f5ac9 100644 --- a/include/client/env.hpp +++ b/include/client/env.hpp @@ -37,15 +37,17 @@ /* Environment variables for the GekkoFS client */ namespace gkfs::env { -[[maybe_unused]] [[maybe_unused]] static constexpr auto LOG = ADD_PREFIX("LOG"); +[[maybe_unused]] static constexpr auto LOG = ADD_PREFIX("LOG"); #ifdef GKFS_DEBUG_BUILD -[[maybe_unused]] [[maybe_unused]] static constexpr auto LOG_DEBUG_VERBOSITY = ADD_PREFIX("LOG_DEBUG_VERBOSITY"); +[[maybe_unused]] static constexpr auto LOG_DEBUG_VERBOSITY = + ADD_PREFIX("LOG_DEBUG_VERBOSITY"); static constexpr auto LOG_SYSCALL_FILTER = ADD_PREFIX("LOG_SYSCALL_FILTER"); #endif [[maybe_unused]] static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); -[[maybe_unused]] static constexpr auto LOG_OUTPUT_TRUNC = ADD_PREFIX("LOG_OUTPUT_TRUNC"); +[[maybe_unused]] static constexpr auto LOG_OUTPUT_TRUNC = + ADD_PREFIX("LOG_OUTPUT_TRUNC"); [[maybe_unused]] static constexpr auto CWD = ADD_PREFIX("CWD"); static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE"); #ifdef GKFS_ENABLE_FORWARDING diff --git a/include/config.hpp b/include/config.hpp index 4ff8ac156..1c7c855d7 100644 --- a/include/config.hpp +++ b/include/config.hpp @@ -34,7 +34,7 @@ // environment prefixes (are concatenated in env module at compile time) #define CLIENT_ENV_PREFIX "LIBGKFS_" #define DAEMON_ENV_PREFIX "GKFS_" -#define API_ENV_PREFIX "GKFS_API_" +#define API_ENV_PREFIX "GKFS_API_" namespace gkfs::config { diff --git a/src/api/connection.cpp b/src/api/connection.cpp index 8e01c10f6..fff8f977d 100644 --- a/src/api/connection.cpp +++ b/src/api/connection.cpp @@ -49,10 +49,11 @@ connection::connection(const std::string& hostfile) connection::~connection() = default; -void +std::size_t connection::put(const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename) { - pimpl_->put(buffer, size, gkfs_filename); + const std::filesystem::path& gkfs_filename, + off_t start_offset) const { + return pimpl_->put(buffer, size, gkfs_filename, start_offset); } diff --git a/src/api/detail/connection_impl.cpp b/src/api/detail/connection_impl.cpp index 170d2909a..4255c8822 100644 --- a/src/api/detail/connection_impl.cpp +++ b/src/api/detail/connection_impl.cpp @@ -377,12 +377,13 @@ connection::impl& connection::impl::operator=(impl&& rhs) noexcept = default; connection::impl::~impl() = default; -void +std::size_t connection::impl::put(const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename) const { + const std::filesystem::path& gkfs_filename, + off_t start_offset) const { create(gkfs_filename, 0660 | S_IFREG); - pwrite(gkfs_filename, buffer, size, 0); + return pwrite(gkfs_filename, buffer, size, start_offset); } /** @@ -459,6 +460,10 @@ ssize_t connection::impl::pwrite(const std::string& gkfs_filename, const void* buffer, size_t size, off_t offset) const { + if(size == 0) { + return 0; + } + const auto [update_error, updated_size] = forward_update_metadentry_size_rpc(gkfs_filename, size, offset, false); @@ -804,7 +809,7 @@ connection::impl::forward_write_rpc(const std::string& gkfs_filename, * written. In our case, we do not keep track which daemon fully wrote its * workload. Thus, we always return size 0 on error. */ - return {err, err ? out_size : 0}; + return {err, err ? 0 : out_size}; } diff --git a/src/api/libgkfs_staging.cpp b/src/api/libgkfs_staging.cpp index da972b397..e4fcb2889 100644 --- a/src/api/libgkfs_staging.cpp +++ b/src/api/libgkfs_staging.cpp @@ -53,15 +53,16 @@ namespace gkfs::api::staging { * @param buffer The buffer with the data contents * @param size The amount of data to transfer * @param gkfs_pathname The target filename inside GekkoFS + * @param start_offset The offset at which data will be written + * @returns The number of bytes written + * @throws std::runtime_error An error occurred */ -void -put(const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename, off_t offset = 0) { +std::size_t +put(const void* buffer, size_t size, const std::filesystem::path& gkfs_filename, + off_t start_offset = 0) { fs::connection fc; - - - fc.put(buffer, size, gkfs_filename); + return fc.put(buffer, size, gkfs_filename, start_offset); } /** @@ -78,18 +79,14 @@ put(const void* buffer, size_t size, * @param buffer The buffer with the data contents * @param size The amount of data to transfer * @param gkfs_pathname The target filename inside GekkoFS + * @param start_offset The offset at which data should be written + * @returns The number of bytes written + * @throws std::runtime_error An error occurred */ -void -put(const gkfs::api::fs::connection& conn, const void* buffer, size_t size, - const std::filesystem::path& gkfs_filename) {} - -void -put(const std::filesystem::path& pathname, const void* buf, size_t count, - off_t offset) {} - -void -put(const gkfs::api::fs::connection& conn, - const std::filesystem::path& pathname, const void* buf, size_t count, - off_t offset) {} +std::size_t +put(const gkfs::api::fs::connection& fc, const void* buffer, size_t size, + const std::filesystem::path& gkfs_filename, off_t start_offset) { + return fc.put(buffer, size, gkfs_filename, start_offset); +} } // namespace gkfs::api::staging \ No newline at end of file diff --git a/tests/unit/test_api.cpp b/tests/unit/test_api.cpp index 10c7e522e..223e5ffe7 100644 --- a/tests/unit/test_api.cpp +++ b/tests/unit/test_api.cpp @@ -28,22 +28,27 @@ #include #include +#include #include #include #include #include +#include #include // uuid class #include // generators #include // streaming operators etc. #include +#include using namespace std::literals; namespace bp = boost::process; namespace boost_fs = boost::filesystem; namespace std_fs = std::filesystem; +constexpr auto default_hostfile = "gkfs_hosts.txt"; + // A temporary file with RAII removal struct temporary_file { @@ -67,41 +72,80 @@ struct temporary_file { ofs_.flush(); } + void + write(const std::vector& data) { + for(const auto n : data) { + ofs_ << n; + } + ofs_.flush(); + } + std_fs::path filename() const { return filename_; } + std::size_t + size() const { + return std_fs::file_size(filename_); + }; + std_fs::path filename_; std::ofstream ofs_; }; +struct gkfs_config { + gkfs_config(std_fs::path cwd, std_fs::path hosts_file, std_fs::path rootdir, + std_fs::path mountdir, std::string interface) + : cwd_(std::move(cwd)), hosts_file_(std::move(hosts_file)), + rootdir_(std::move(rootdir)), mountdir_(std::move(mountdir)), + interface_(std::move(interface)) {} + + const std_fs::path cwd_; + const std_fs::path hosts_file_; + const std_fs::path rootdir_; + const std_fs::path mountdir_; + const std::string interface_; +}; + // A temporary daemon with RAII shutdown and environment removal struct test_daemon { static std::tuple - generate_environment() { + generate_environment(bool unique_port = true) { static boost::uuids::random_generator generator; boost::uuids::uuid uuid = generator(); - boost::random::mt19937 gen; - boost::random::uniform_int_distribution<> dist(1025, 65535); + const auto basedir = "test-api-" + to_string(uuid); + + // return 0 if a unique_port port was requested: this will force the + // kernel to provide a port that is unused when the daemon tries to bind + // to the interface. This helps avoid race conditions where two + // daemons are assigned the same port when creating + // several test_daemons concurrently + if(unique_port) { + return {basedir, 0}; + } - const auto basedir = to_string(uuid); - const long port = dist(gen); + // otherwise just return a randomly generated port + boost::random::mt19937 rng; + boost::random::uniform_int_distribution<> dist(1025, 65535); + const long port = dist(rng); - CAPTURE(basedir, port); return {basedir, port}; } - test_daemon(const std_fs::path& rootdir, const std_fs::path& mountdir, - const std::string& interface) - : abs_rootdir(std_fs::absolute(std_fs::current_path() / rootdir)), - abs_mountdir(std_fs::absolute(std_fs::current_path() / mountdir)) { + explicit test_daemon(const gkfs_config& cfg) + : cfg_(cfg), + abs_rootdir(std_fs::absolute(std_fs::current_path() / cfg.rootdir_)), + abs_mountdir( + std_fs::absolute(std_fs::current_path() / cfg.mountdir_)) { auto env = boost::this_process::environment(); - env["GKFS_LOG_LEVEL"] = "100"; - env["GKFS_DAEMON_LOG_PATH"] = "/dev/stderr"; + bp::environment env_ = env; + env_["GKFS_LOG_LEVEL"] = "100"; + env_["GKFS_DAEMON_LOG_PATH"] = cfg.cwd_ / "test_daemon.log"; + env_["GKFS_HOSTS_FILE"] = cfg.hosts_file_; if(!std_fs::exists(abs_rootdir)) { std_fs::create_directories(abs_rootdir); @@ -115,19 +159,42 @@ struct test_daemon { search_paths.emplace_back(boost_fs::path{cmake::binary_dir} / boost_fs::path{"src/daemon"}); - daemon_ = bp::child(bp::search_path("gkfs_daemon", search_paths), - bp::args({"--rootdir"s, abs_rootdir, "--mountdir"s, - abs_mountdir, "--listen"s, interface})); + //log_watcher lw; + //bp::async_pipe async_log_output = lw.create_async_pipe(); + + boost::asio::io_context ioc; + bp::async_pipe ap(ioc); + + daemon_ = + bp::child(bp::search_path("gkfs_daemon", search_paths), + bp::args({"--rootdir"s, abs_rootdir, "--mountdir"s, + abs_mountdir, "--listen"s, cfg.interface_}), + env_, /*bp::std_out > async_log_output*/bp::std_err > ap); + + std::this_thread::sleep_for(2s); - // since we don't have a way to check whether the daemon has finished - // its startup process, wait for a 100ms, otherwise any connections - // to it will fail - std::this_thread::sleep_for(500ms); +#if 0 + const std::regex re( + R"(^.*?\[I\]\[main\] Startup successful\. Daemon is ready\.$)", + std::regex::ECMAScript | std::regex::optimize); + + std::ofstream log_file(cfg.cwd_ / "test_daemon.log"); + log_file.setf(std::ios::unitbuf); + + if(!lw.search_for(async_log_output, log_file, re, 5s)) { + terminate(); + throw std::runtime_error("gkfs_daemon failed to start"); + }; +#endif } - ~test_daemon() { + void + terminate() { + + // attempt to terminate the daemon gracefully... ::kill(daemon_.native_handle(), SIGTERM); + // terminate forcibly if(!daemon_.wait_for(5s)) { daemon_.terminate(); } @@ -136,16 +203,133 @@ struct test_daemon { std_fs::remove_all(abs_mountdir); } + ~test_daemon() { + terminate(); + } + + int + port() const { + // parse the generated hostfile to see what port the kernel gave us + std::ifstream ifs{cfg_.hosts_file_}; + + if(!ifs) { + throw std::runtime_error(fmt::format("Error opening hostfile " + "'{}': {}", + cfg_.hosts_file_.string(), + strerror(errno))); + } + + const std::regex re(R"(^(\S+)\s+(\S+):(\d+)$)", + std::regex::ECMAScript | std::regex::optimize); + + std::string line; + std::smatch match; + + while(getline(ifs, line)) { + if(!regex_match(line, match, re)) { + throw std::runtime_error( + fmt::format("unrecognized line format: '{}'", line)); + } + + return std::stoi(match[3].str()); + } + + throw std::runtime_error("Could not extract port from hostfile"); + } + bp::child daemon_; + gkfs_config cfg_; std_fs::path abs_rootdir; std_fs::path abs_mountdir; }; +struct md5sum { + + explicit md5sum(const std::filesystem::path& filename, + const std::optional& gkfs_config = {}) { + + auto env = boost::this_process::environment(); + bp::environment env_ = env; + + if(gkfs_config) { + env_["LD_PRELOAD"] = std_fs::path{cmake::binary_dir} / + "src/client/libgkfs_intercept.so"; + env_["LIBGKFS_HOSTS_FILE"] = gkfs_config->cwd_ / default_hostfile; + env_["LIBGKFS_LOG_OUTPUT"] = gkfs_config->cwd_ / "test_client.log"; + } + + bp::ipstream is; + proc_ = bp::child(bp::search_path("md5sum"), filename.string(), + bp::std_err > stdout, bp::std_out > is, env_); + + std::string line; + + while(proc_.running() && std::getline(is, line) && !line.empty()) { + output_.push_back(line); + } + + proc_.wait(); + } + + std::string + operator()() const { + + if(proc_.exit_code()) { + throw std::runtime_error( + fmt::format("ERROR: md5sum: {}", fmt::join(output_, "\n"))); + } + + if(output_.size() != 1) { + throw std::runtime_error(fmt::format( + "ERROR: md5sum: program called with more than one file : {}", + fmt::join(output_, "\n"))); + } + + const std::regex regex(R"(^(\S+)\s+(\S+)$)", + std::regex::ECMAScript | std::regex::optimize); + std::string host; + std::string uri; + std::smatch match; + if(!regex_match(output_[0], match, regex)) { + throw std::runtime_error(fmt::format("ERROR: md5sum: unknown " + "output format: {}", + output_[0])); + } + + return match[1]; + } + + bp::child proc_; + std::vector output_; +}; + + +std::vector +generate_random_data(int n) { + + const std::string chars{ + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"}; + + boost::random::mt19937 rng; + boost::random::uniform_int_distribution<> dist( + 0, static_cast(chars.size() - 1)); + std::vector data; + data.reserve(n); + + for(int i = 0; i < n; ++i) { + data.push_back(chars[dist(rng)]); + } + + return data; +} + SCENARIO(" API connections to gkfs work as expected ", "[api][connection]") { - // TODO: tests for connecting non-existing daemons are missing. The reason + // TODO: tests for connecting non-existing daemons are missing. The + // reason // TODO: is that since we don't have timeouts yet in Hermes, if the - // TODO: daemon is missing the connection RPC hangs forever while waiting + // TODO: daemon is missing the connection RPC hangs forever while + // waiting // TODO: for a reply that will never arrive using namespace std::string_literals; @@ -201,18 +385,189 @@ SCENARIO(" API connections to gkfs work as expected ", "[api][connection]") { WHEN(" hostfile is valid ") { const auto [testdir, port] = test_daemon::generate_environment(); + gkfs_config cfg{testdir, testdir / default_hostfile, + testdir / "rootdir", testdir / "mountdir", + fmt::format("lo:{}", port)}; - test_daemon daemon(testdir / "rootdir", testdir / "mountdir", - fmt::format("lo:{}", port)); + test_daemon daemon(cfg); temporary_file tmp{ "valid.hostfile", fmt::format("localhost ofi+sockets://127.0.0.1:{}\n", - port)}; + daemon.port())}; THEN(" a connection instance is constructed ") { - REQUIRE_NOTHROW(fs::connection(tmp.filename())); + REQUIRE_NOTHROW(fs::connection(testdir / default_hostfile)); + } + } + } +} + +SCENARIO(" Staging files with a standalone connection as expected ", + "[api][staging][put][standalone]") { + + using namespace gkfs::api; + + GIVEN(" an active gkfs daemon ") { + + const auto [testdir, port] = test_daemon::generate_environment(); + + gkfs_config cfg{testdir, testdir / default_hostfile, + testdir / "rootdir", testdir / "mountdir", + fmt::format("lo:{}", port)}; + test_daemon daemon(cfg); + + AND_GIVEN(" no previously established connection to the daemon ") { + WHEN(" staging a file into gkfs ") { + AND_WHEN(" the file size equals 0 ") { + + int file_size = 0; + + CAPTURE(file_size); + + // generate a file with random data + temporary_file f{testdir / "vfs_testfile"}; + std::vector data = generate_random_data(file_size); + f.write(data); + + // make sure the API connects to our test_daemon + auto env = boost::this_process::environment(); + env["GKFS_API_HOSTS_FILE"] = testdir / default_hostfile; + + // upload the file + auto bytes_written = + staging::put(static_cast(data.data()), + data.size(), "/gkfs_testfile", 0); + + THEN(" no data is written ") { + REQUIRE(bytes_written == 0); + } + } + + AND_WHEN(" the file size is not 0 ") { + auto cs = gkfs::config::rpc::chunksize; + + int file_size = + GENERATE_COPY(5, 100, cs, std::floor(cs * 1.5), + cs * 2 - 1, cs * 2, 500000); + CAPTURE(file_size); + + // generate a file with random data + temporary_file f{testdir / "vfs_testfile"}; + std::vector data = generate_random_data(file_size); + f.write(data); + + // make sure the API connects to our test_daemon + auto env = boost::this_process::environment(); + env["GKFS_API_HOSTS_FILE"] = testdir / default_hostfile; + + // upload the file + auto bytes_written = + staging::put(static_cast(data.data()), + data.size(), "/gkfs_testfile", 0); + + THEN(" data contents are identical ") { + const auto vfs_digest = md5sum(f.filename())(); + const auto gkfs_digest = + md5sum(cfg.mountdir_ / "gkfs_testfile", cfg)(); + REQUIRE(vfs_digest == gkfs_digest); + } + + AND_THEN( + " the number of written bytes reported is correct") { + REQUIRE(bytes_written == data.size()); + } + } } } } } + +SCENARIO(" Staging files reusing a connection works as expected ", + "[api][staging][put][reusable]") { + + using namespace gkfs::api; + + GIVEN(" an active gkfs daemon ") { + + const auto [testdir, port] = test_daemon::generate_environment(); + + gkfs_config cfg{testdir, testdir / default_hostfile, + testdir / "rootdir", testdir / "mountdir", + fmt::format("lo:{}", port)}; + test_daemon daemon(cfg); + + AND_GIVEN(" an already established connection to the daemon ") { + + temporary_file tmp{ + "hostfile", + fmt::format("localhost ofi+sockets://127.0.0.1:{}\n", + daemon.port())}; + + fs::connection fsc{tmp.filename()}; + + WHEN(" staging a file into gkfs ") { + + AND_WHEN(" the file size equals 0 ") { + + int file_size = 0; + + CAPTURE(file_size); + + // generate a file with random data + temporary_file f{testdir / "vfs_testfile"}; + std::vector data = generate_random_data(file_size); + f.write(data); + + // make sure the API connects to our test_daemon + auto env = boost::this_process::environment(); + env["GKFS_API_HOSTS_FILE"] = testdir / default_hostfile; + + // upload the file + auto bytes_written = staging::put( + fsc, static_cast(data.data()), + data.size(), "/gkfs_testfile", 0); + + THEN(" no data is written ") { + REQUIRE(bytes_written == 0); + } + } + + AND_WHEN(" the file size is not 0 ") { + auto cs = gkfs::config::rpc::chunksize; + + int file_size = + GENERATE_COPY(5, 100, cs, std::floor(cs * 1.5), + cs * 2 - 1, cs * 2, 500000); + CAPTURE(file_size); + + // generate a file with random data + temporary_file f{testdir / "vfs_testfile"}; + std::vector data = generate_random_data(file_size); + f.write(data); + + // make sure the API connects to our test_daemon + auto env = boost::this_process::environment(); + env["GKFS_API_HOSTS_FILE"] = testdir / default_hostfile; + + // upload the file + auto bytes_written = staging::put( + fsc, static_cast(data.data()), + data.size(), "/gkfs_testfile", 0); + + THEN(" data contents are identical ") { + const auto vfs_digest = md5sum(f.filename())(); + const auto gkfs_digest = + md5sum(cfg.mountdir_ / "gkfs_testfile", cfg)(); + REQUIRE(vfs_digest == gkfs_digest); + } + + AND_THEN( + " the number of written bytes reported is correct") { + REQUIRE(bytes_written == data.size()); + } + } + } + } + } +} \ No newline at end of file -- GitLab