Commits on Source (2)
......@@ -309,8 +309,7 @@ include_directories(
add_subdirectory(src)
add_subdirectory(include)
add_subdirectory(src/proxy)
add_subdirectory(marc)
#add_subdirectory(marc)
### Mark any CMake variables imported from {fmt} and spdlog as advanced, so
### that they don't appear in cmake-gui or ccmake. Similarly for FETCHCONTENT
......
Subproject commit d8755039c82501323cdc9a4f5a2b922b9b3bcac7
Subproject commit 7a234d854ae24bb33b46319f6735e44ab5dc33c7
......@@ -32,8 +32,11 @@ add_subdirectory(common)
add_subdirectory(daemon)
# Client library
add_subdirectory(client)
# Proxy
add_subdirectory(proxy)
target_sources(gkfs_daemon PUBLIC config.hpp version.hpp.in)
target_sources(gkfs_proxy PUBLIC config.hpp version.hpp.in)
if(GKFS_ENABLE_FORWARDING)
target_sources(gkfwd_daemon PUBLIC config.hpp version.hpp.in)
......
......@@ -16,22 +16,19 @@
#include <common/common_defs.hpp>
namespace gkfs {
namespace rpc {
namespace gkfs::rpc {
ssize_t
forward_write_proxy(const std::string& path, const void* buf, bool append_flag,
off64_t in_offset, size_t write_size,
int64_t updated_metadentry_size);
std::pair<int, ssize_t>
forward_write_proxy(const std::string& path, const void* buf, off64_t offset,
size_t write_size);
ssize_t
std::pair<int, ssize_t>
forward_read_proxy(const std::string& path, void* buf, off64_t offset,
size_t read_size);
std::pair<int, ChunkStat>
forward_get_chunk_stat_proxy();
} // namespace rpc
} // namespace gkfs
} // namespace gkfs::rpc
#endif // GEKKOFS_FORWARD_DATA_PROXY_HPP
......@@ -3153,7 +3153,7 @@ struct update_metadentry_size_proxy {
explicit output(const rpc_update_metadentry_size_out_t& out) {
m_err = out.err;
m_ret_size = out.ret_size;
m_ret_size = out.ret_offset;
}
int32_t
......
......@@ -31,6 +31,12 @@ target_sources(
rpc/rpc_util.hpp
)
target_sources(gkfs_proxy
PUBLIC
common_defs.hpp
rpc/rpc_types.hpp
rpc/rpc_util.hpp)
if(GKFS_ENABLE_FORWARDING)
target_sources(
gkfwd_daemon PUBLIC cmake_configure.hpp.in common_defs.hpp
......
################################################################################
# Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain #
# Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany #
# #
# This software was partially supported by the #
# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). #
# #
# This software was partially supported by the #
# ADA-FS project under the SPPEXA project funded by the DFG. #
# #
# This file is part of GekkoFS. #
# #
# GekkoFS is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# GekkoFS is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with GekkoFS. If not, see <https://www.gnu.org/licenses/>. #
# #
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
target_sources(
gkfs_proxy
PUBLIC proxy.hpp
env.hpp
proxy.hpp
proxy_data.hpp
util.hpp
rpc/forward_data.hpp
rpc/forward_metadata.hpp
rpc/rpc_defs.hpp
rpc/rpc_util.hpp
)
\ No newline at end of file
......@@ -32,4 +32,6 @@ add_subdirectory(common)
add_subdirectory(daemon)
# Client library
add_subdirectory(client)
# Proxy
add_subdirectory(proxy)
......@@ -884,7 +884,7 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
pair<int, long> ret_offset;
if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) {
ret_offset = gkfs::rpc::forward_update_metadentry_size_proxy(
*path, count, offset, append_flag);
*path, count, offset, is_append);
} else {
ret_offset = gkfs::rpc::forward_update_metadentry_size(
*path, count, offset, is_append);
......@@ -913,8 +913,7 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
pair<int, long> ret_write;
if(gkfs::config::proxy::fwd_io && CTX->use_proxy() &&
count > gkfs::config::proxy::fwd_io_count_threshold) {
ret_write = gkfs::rpc::forward_write_proxy(*path, buf, append_flag,
offset, count, updated_size);
ret_write = gkfs::rpc::forward_write_proxy(*path, buf, offset, count);
} else {
ret_write = gkfs::rpc::forward_write(*path, buf, offset, count);
}
......
......@@ -30,24 +30,21 @@ namespace gkfs::rpc {
* @param path
* @param buf
* @param append_flag
* @param in_offset
* @param offset
* @param write_size
* @param updated_metadentry_size
* @return
*/
ssize_t
forward_write_proxy(const string& path, const void* buf, bool append_flag,
off64_t in_offset, size_t write_size,
int64_t updated_metadentry_size) {
pair<int, ssize_t>
forward_write_proxy(const string& path, const void* buf, off64_t offset,
size_t write_size) {
LOG(DEBUG, "Using write proxy for path '{}' offset '{}' size '{}' ...",
path, in_offset, write_size);
path, offset, write_size);
// TODO mostly copy pasta from forward_data.
assert(write_size > 0);
// Calculate chunkid boundaries and numbers so that daemons know in
// which interval to look for chunks
off64_t offset =
append_flag ? in_offset : (updated_metadentry_size - write_size);
// some helper variables for async RPC
std::vector<hermes::mutable_buffer> bufseq{
......@@ -64,12 +61,11 @@ forward_write_proxy(const string& path, const void* buf, bool append_flag,
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to expose buffers for RMA");
errno = EBUSY;
return -1;
return make_pair(EBUSY, 0);
}
auto endp = CTX->proxy_host();
bool error = false;
auto err = 0;
ssize_t out_size = 0;
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -90,22 +86,22 @@ forward_write_proxy(const string& path, const void* buf, bool append_flag,
if(out.err()) {
LOG(ERROR, "Daemon reported error: {}", out.err());
errno = out.err();
error = true;
err = out.err();
}
out_size = out.io_size();
} catch(const std::exception& ex) {
LOG(ERROR, "While RPC send or getting RPC output. Err: '{}'",
ex.what());
errno = EBUSY;
error = true;
err = EBUSY;
}
return error ? -1 : out_size;
if(err)
return make_pair(err, 0);
else
return make_pair(0, out_size);
}
ssize_t
pair<int, ssize_t>
forward_read_proxy(const string& path, void* buf, const off64_t offset,
const size_t read_size) {
LOG(DEBUG, "Using read proxy for path '{}' offset '{}' size '{}' ...", path,
......@@ -127,11 +123,11 @@ forward_read_proxy(const string& path, void* buf, const off64_t offset,
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to expose buffers for RMA");
errno = EBUSY;
return -1;
return make_pair(EBUSY, 0);
}
auto endp = CTX->proxy_host();
bool error = false;
auto err = 0;
ssize_t out_size = 0;
try {
......@@ -153,19 +149,20 @@ forward_read_proxy(const string& path, void* buf, const off64_t offset,
if(out.err()) {
LOG(ERROR, "Daemon reported error: {}", out.err());
errno = out.err();
error = true;
err = out.err();
}
out_size = out.io_size();
} catch(const std::exception& ex) {
LOG(ERROR, "While RPC send or getting RPC output. Err: '{}'",
ex.what());
errno = EBUSY;
error = true;
err = EBUSY;
}
return error ? -1 : out_size;
if(err)
return make_pair(err, 0);
else
return make_pair(0, out_size);
}
pair<int, ChunkStat>
......
......@@ -436,8 +436,9 @@ rpc_srv_update_metadentry_size(hg_handle_t handle) {
out.err = EBUSY;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}' ret_size '{}'",
__func__, out.err, out.ret_size);
GKFS_DATA->spdlogger()->debug(
"{}() Sending output err '{}' ret_offset '{}'", __func__, out.err,
out.ret_offset);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
......
set(PROXY_SRC
../common/rpc/rpc_util.cpp
################################################################################
# Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain #
# Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany #
# #
# This software was partially supported by the #
# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). #
# #
# This software was partially supported by the #
# ADA-FS project under the SPPEXA project funded by the DFG. #
# #
# This file is part of GekkoFS. #
# #
# GekkoFS is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# GekkoFS is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with GekkoFS. If not, see <https://www.gnu.org/licenses/>. #
# #
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
# ##############################################################################
# This builds the `gkfs_proxy` executable: the main GekkoFS client that forwards requests to daemons.
# ##############################################################################
add_executable(gkfs_proxy)
target_sources(gkfs_proxy
PRIVATE
env.cpp
proxy.cpp
proxy_data.cpp
......@@ -8,48 +41,39 @@ set(PROXY_SRC
rpc/srv_metadata.cpp
rpc/forward_data.cpp
rpc/forward_metadata.cpp
)
set(PROXY_HEADERS
../../include/config.hpp
../../include/common/common_defs.hpp
../../include/common/rpc/rpc_types.hpp
../../include/common/rpc/rpc_util.hpp
../../include/proxy/env.hpp
../../include/proxy/proxy.hpp
../../include/proxy/proxy_data.hpp
../../include/proxy/util.hpp
../../include/proxy/rpc/forward_data.hpp
../../include/proxy/rpc/forward_metadata.hpp
../../include/proxy/rpc/rpc_defs.hpp
../../include/proxy/rpc/rpc_util.hpp
)
add_executable(gkfs_proxy ${PROXY_SRC} ${PROXY_HEADERS})
target_link_libraries(gkfs_proxy
PUBLIC
# internal libs
../common/rpc/rpc_util.cpp
PUBLIC ${CMAKE_SOURCE_DIR}/include/config.hpp
${CMAKE_SOURCE_DIR}/include/version.hpp.in
)
target_link_libraries(
gkfs_proxy
PUBLIC # internal libs
distributor
log_util
env_util
spdlog
# external libs
CLI11::CLI11
fmt::fmt
# margo libs
${ABT_LIBRARIES}
mercury
${MARGO_LIBRARIES}
Mercury::Mercury
Argobots::Argobots
Margo::Margo
# others
CLI11
Threads::Threads
PRIVATE
# open issue for std::filesystem https://gitlab.kitware.com/cmake/cmake/-/issues/17834
stdc++fs
)
)
target_include_directories(gkfs_proxy
PRIVATE
${ABT_INCLUDE_DIRS}
${MARGO_INCLUDE_DIRS}
)
#set(PROXY_HEADERS
# ../../include/config.hpp
# ../../include/common/common_defs.hpp
# ../../include/common/rpc/rpc_types.hpp
# ../../include/common/rpc/rpc_util.hpp
# ../../include/proxy/env.hpp
# ../../include/proxy/proxy.hpp
# ../../include/proxy/proxy_data.hpp
# ../../include/proxy/util.hpp
# ../../include/proxy/rpc/forward_data.hpp
# ../../include/proxy/rpc/forward_metadata.hpp
# ../../include/proxy/rpc/rpc_defs.hpp
# ../../include/proxy/rpc/rpc_util.hpp
# )
install(TARGETS gkfs_proxy
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
install(TARGETS gkfs_proxy RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
......@@ -20,7 +20,7 @@
#include <common/rpc/distributor.hpp>
#include <common/rpc/rpc_util.hpp>
#include <CLI11/CLI11.hpp>
#include <CLI/CLI.hpp>
#include <iostream>
#include <csignal>
......
......@@ -253,7 +253,7 @@ forward_update_metadentry_size(const string& path, const size_t size,
rpc_update_metadentry_size_in_t daemon_in{};
rpc_update_metadentry_size_out_t daemon_out{};
int err = 0;
off64_t ret_size = 0;
off64_t ret_offset = 0;
// fill in
daemon_in.path = path.c_str();
daemon_in.size = size;
......@@ -278,10 +278,10 @@ forward_update_metadentry_size(const string& path, const size_t size,
ret = margo_get_output(rpc_handle, &daemon_out);
if(ret == HG_SUCCESS) {
PROXY_DATA->log()->debug(
"{}() Got response success: err {} ret_size {}", __func__,
daemon_out.err, daemon_out.ret_size);
"{}() Got response success: err {} ret_offset {}", __func__,
daemon_out.err, daemon_out.ret_offset);
err = daemon_out.err;
ret_size = daemon_out.ret_size;
ret_offset = daemon_out.ret_offset;
margo_free_output(rpc_handle, &daemon_out);
} else {
// something is wrong
......@@ -296,7 +296,7 @@ forward_update_metadentry_size(const string& path, const size_t size,
/* clean up resources consumed by this rpc */
margo_destroy(rpc_handle);
return make_pair(err, ret_size);
return make_pair(err, ret_offset);
}
pair<int, size_t>
......
......@@ -106,12 +106,12 @@ proxy_rpc_srv_update_metadentry_size(hg_handle_t handle) {
client_in.path, client_in.size, client_in.offset, client_in.append);
try {
auto [err, ret_size] = gkfs::rpc::forward_update_metadentry_size(
auto [err, ret_offset] = gkfs::rpc::forward_update_metadentry_size(
client_in.path, client_in.size, client_in.offset,
client_in.append);
client_out.err = 0;
client_out.ret_size = ret_size;
client_out.ret_offset = ret_offset;
} catch(const std::exception& e) {
PROXY_DATA->log()->error(
"{}() Failed to update metadentry size RPC: '{}'", __func__,
......@@ -119,8 +119,8 @@ proxy_rpc_srv_update_metadentry_size(hg_handle_t handle) {
client_out.err = EBUSY;
}
PROXY_DATA->log()->debug("{}() Sending output err '{}' size '{}'", __func__,
client_out.err, client_out.ret_size);
PROXY_DATA->log()->debug("{}() Sending output err '{}' ret_offset '{}'",
__func__, client_out.err, client_out.ret_offset);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
......