Skip to content
This diff is collapsed.
...@@ -134,11 +134,12 @@ hook_stat(const char* path, struct stat* buf) { ...@@ -134,11 +134,12 @@ hook_stat(const char* path, struct stat* buf) {
int int
hook_statx(int dirfd, const char* path, int flags, unsigned int mask, hook_statx(int dirfd, const char* path, int flags, unsigned int mask,
struct ::statx* buf) { struct ::statx* buf) {
bool follow = (flags & AT_SYMLINK_NOFOLLOW) == 0;
LOG(DEBUG, LOG(DEBUG,
"{}() called with dirfd: '{}', path: \"{}\", flags: '{}', mask: '{}', buf: '{}'", "{}() called with dirfd: '{}', path: \"{}\", flags: '{}', mask: '{}', buf: '{}'",
__func__, dirfd, path, flags, mask, fmt::ptr(buf)); __func__, dirfd, path, flags, mask, fmt::ptr(buf));
std::string resolved; std::string resolved;
auto rstatus = CTX->relativize_fd_path(dirfd, path, resolved); auto rstatus = CTX->relativize_fd_path(dirfd, path, resolved);
switch(rstatus) { switch(rstatus) {
...@@ -154,8 +155,8 @@ hook_statx(int dirfd, const char* path, int flags, unsigned int mask, ...@@ -154,8 +155,8 @@ hook_statx(int dirfd, const char* path, int flags, unsigned int mask,
return -ENOTDIR; return -ENOTDIR;
case gkfs::preload::RelativizeStatus::internal: case gkfs::preload::RelativizeStatus::internal:
return with_errno(gkfs::syscall::gkfs_statx(dirfd, resolved.c_str(), return with_errno(gkfs::syscall::gkfs_statx(
flags, mask, buf)); dirfd, resolved.c_str(), flags, mask, buf, follow));
default: default:
LOG(ERROR, "{}() relativize status unknown: {}", __func__); LOG(ERROR, "{}() relativize status unknown: {}", __func__);
...@@ -174,7 +175,7 @@ hook_lstat(const char* path, struct stat* buf) { ...@@ -174,7 +175,7 @@ hook_lstat(const char* path, struct stat* buf) {
std::string rel_path; std::string rel_path;
if(CTX->relativize_path(path, rel_path)) { if(CTX->relativize_path(path, rel_path)) {
return with_errno(gkfs::syscall::gkfs_stat(rel_path, buf)); return with_errno(gkfs::syscall::gkfs_stat(rel_path, buf, false));
} }
return gsl::narrow_cast<int>( return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_lstat, rel_path.c_str(), buf)); syscall_no_intercept_wrapper(SYS_lstat, rel_path.c_str(), buf));
...@@ -193,7 +194,7 @@ hook_fstat(unsigned int fd, struct stat* buf) { ...@@ -193,7 +194,7 @@ hook_fstat(unsigned int fd, struct stat* buf) {
// We can change file_map and recall // We can change file_map and recall
auto md = gkfs::utils::get_metadata(path, false); auto md = gkfs::utils::get_metadata(path, false);
if(md.has_value() && md.value().blocks() == -1) { if(md.has_value() && md.value().blocks() == -1) {
path = md.value().rename_path(); path = md.value().target_path();
} }
#endif #endif
return with_errno(gkfs::syscall::gkfs_stat(path, buf)); return with_errno(gkfs::syscall::gkfs_stat(path, buf));
...@@ -204,10 +205,11 @@ hook_fstat(unsigned int fd, struct stat* buf) { ...@@ -204,10 +205,11 @@ hook_fstat(unsigned int fd, struct stat* buf) {
int int
hook_fstatat(int dirfd, const char* cpath, struct stat* buf, int flags) { hook_fstatat(int dirfd, const char* cpath, struct stat* buf, int flags) {
bool follow = (flags & AT_SYMLINK_NOFOLLOW) == 0;
LOG(DEBUG, "{}() called with path: \"{}\", fd: {}, buf: {}, flags: {}", LOG(DEBUG, "{}() called with path: \"{}\", fd: {}, buf: {}, flags: {}",
__func__, cpath, dirfd, fmt::ptr(buf), flags); __func__, cpath, dirfd, fmt::ptr(buf), flags);
std::string resolved; std::string resolved;
auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved, flags); auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved, flags);
switch(rstatus) { switch(rstatus) {
...@@ -223,7 +225,7 @@ hook_fstatat(int dirfd, const char* cpath, struct stat* buf, int flags) { ...@@ -223,7 +225,7 @@ hook_fstatat(int dirfd, const char* cpath, struct stat* buf, int flags) {
return -ENOTDIR; return -ENOTDIR;
case gkfs::preload::RelativizeStatus::internal: case gkfs::preload::RelativizeStatus::internal:
return with_errno(gkfs::syscall::gkfs_stat(resolved, buf)); return with_errno(gkfs::syscall::gkfs_stat(resolved, buf, follow));
default: default:
LOG(ERROR, "{}() relativize status unknown: {}", __func__); LOG(ERROR, "{}() relativize status unknown: {}", __func__);
...@@ -383,10 +385,17 @@ hook_symlinkat(const char* oldname, int newdfd, const char* newname) { ...@@ -383,10 +385,17 @@ hook_symlinkat(const char* oldname, int newdfd, const char* newname) {
LOG(DEBUG, "{}() called with oldname: \"{}\", newfd: {}, newname: \"{}\"", LOG(DEBUG, "{}() called with oldname: \"{}\", newfd: {}, newname: \"{}\"",
__func__, oldname, newdfd, newname); __func__, oldname, newdfd, newname);
#ifdef HAS_SYMLINKS
bool internal1 = false;
#endif
std::string oldname_resolved; std::string oldname_resolved;
if(CTX->relativize_path(oldname, oldname_resolved)) { if(CTX->relativize_path(oldname, oldname_resolved)) {
#ifdef HAS_SYMLINKS
internal1 = true;
#else
LOG(WARNING, "{}() operation not supported", __func__); LOG(WARNING, "{}() operation not supported", __func__);
return -ENOTSUP; return -ENOTSUP;
#endif
} }
std::string newname_resolved; std::string newname_resolved;
...@@ -405,8 +414,17 @@ hook_symlinkat(const char* oldname, int newdfd, const char* newname) { ...@@ -405,8 +414,17 @@ hook_symlinkat(const char* oldname, int newdfd, const char* newname) {
return -ENOTDIR; return -ENOTDIR;
case gkfs::preload::RelativizeStatus::internal: case gkfs::preload::RelativizeStatus::internal:
#ifdef HAS_SYMLINKS
if(internal1) { // Parameters are inverted
return with_errno(gkfs::syscall::gkfs_mk_symlink(
newname_resolved, oldname_resolved));
}
LOG(WARNING, "{}() operation not supported", __func__);
return -ENOTSUP;
#else
LOG(WARNING, "{}() operation not supported", __func__); LOG(WARNING, "{}() operation not supported", __func__);
return -ENOTSUP; return -ENOTSUP;
#endif
default: default:
LOG(ERROR, "{}() relativize status unknown", __func__); LOG(ERROR, "{}() relativize status unknown", __func__);
...@@ -704,7 +722,8 @@ hook_chdir(const char* path) { ...@@ -704,7 +722,8 @@ hook_chdir(const char* path) {
// path falls in our namespace // path falls in our namespace
auto md = gkfs::utils::get_metadata(rel_path); auto md = gkfs::utils::get_metadata(rel_path);
if(!md) { if(!md) {
LOG(ERROR, "{}() path {} errno {}", __func__, path, errno); LOG(ERROR, "{}() path {} / {} errno {}", __func__, path, rel_path,
errno);
return -errno; return -errno;
} }
...@@ -802,6 +821,11 @@ hook_readlinkat(int dirfd, const char* cpath, char* buf, int bufsiz) { ...@@ -802,6 +821,11 @@ hook_readlinkat(int dirfd, const char* cpath, char* buf, int bufsiz) {
return -ENOTDIR; return -ENOTDIR;
case gkfs::preload::RelativizeStatus::internal: case gkfs::preload::RelativizeStatus::internal:
#ifdef HAS_SYMLINKS
return with_errno(
gkfs::syscall::gkfs_readlink(resolved, buf, bufsiz));
#endif
return -EINVAL; return -EINVAL;
default: default:
...@@ -861,22 +885,78 @@ hook_fcntl(unsigned int fd, unsigned int cmd, unsigned long arg) { ...@@ -861,22 +885,78 @@ hook_fcntl(unsigned int fd, unsigned int cmd, unsigned long arg) {
ret |= O_RDWR; ret |= O_RDWR;
} }
return ret; return ret;
case F_SETFL:
LOG(DEBUG, "{}() F_SETFL on fd {}", __func__, fd);
// get flags from arg and setup
if(arg & O_RDONLY) {
CTX->file_map()->get(fd)->set_flag(
gkfs::filemap::OpenFile_flags::rdonly, true);
}
if(arg & O_WRONLY) {
CTX->file_map()->get(fd)->set_flag(
gkfs::filemap::OpenFile_flags::wronly, true);
}
if(arg & O_RDWR) {
CTX->file_map()->get(fd)->set_flag(
gkfs::filemap::OpenFile_flags::rdwr, true);
}
if(arg & O_APPEND) {
CTX->file_map()->get(fd)->set_flag(
gkfs::filemap::OpenFile_flags::append, true);
}
if(arg & O_NONBLOCK) {
LOG(DEBUG, "[GKFS] F_SETFL {} NONBLOCK", fd);
}
if(arg & O_ASYNC) {
LOG(DEBUG, "[GKFS] F_SETFL {} ASYNC", fd);
}
if(arg & O_CLOEXEC) {
LOG(DEBUG, "[GKFS] F_SETFL {} CLOEXEC", fd);
CTX->file_map()->get(fd)->set_flag(
gkfs::filemap::OpenFile_flags::cloexec, true);
}
return 0;
case F_SETFD: case F_SETFD:
LOG(DEBUG, "{}() [fd: {}, cmd: F_SETFD, FD_CLOEXEC: {}]", __func__, LOG(DEBUG, "{}() [fd: {}, cmd: F_SETFD, FD_CLOEXEC: {}]", __func__,
fd, (arg & FD_CLOEXEC)); fd, (arg & FD_CLOEXEC));
CTX->file_map()->get(fd)->set_flag( CTX->file_map()->get(fd)->set_flag(
gkfs::filemap::OpenFile_flags::cloexec, (arg & FD_CLOEXEC)); gkfs::filemap::OpenFile_flags::cloexec, (arg & FD_CLOEXEC));
return 0; return 0;
#ifdef ENABLE_LOCKING
case F_GETLK: case F_GETLK:
LOG(ERROR, "{}() F_GETLK on fd (Not Supported) {}", __func__, fd); LOG(ERROR, "{}() F_GETLK on fd (on underlying fd) {}", __func__,
return 0; fd);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fcntl, fd, cmd, arg));
case F_SETLK: case F_SETLK:
LOG(ERROR, "{}() F_SETLK on fd (Not Supported) {}", __func__, fd); LOG(ERROR, "{}() F_SETLK on fd (on underlying fd) {}", __func__,
return 0; fd);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fcntl, fd, cmd, arg));
case F_SETLKW:
LOG(ERROR, "{}() F_SETLKW on fd (on underlying fd) {}", __func__,
fd);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fcntl, fd, cmd, arg));
case F_GETOWN:
case __F_GETOWN_EX:
LOG(ERROR, "{}() F_GETOWN on fd (on underlying fd) {}", __func__,
fd);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fcntl, fd, cmd, arg));
case F_SETOWN:
case __F_SETOWN_EX:
LOG(ERROR, "{}() F_SETOWN on fd (on underlying fd) {}", __func__,
fd);
return gsl::narrow_cast<int>(
syscall_no_intercept_wrapper(SYS_fcntl, fd, cmd, arg));
#endif
default: default:
LOG(ERROR, "{}() unrecognized command {} on fd {}", __func__, cmd, LOG(ERROR, "{}() unrecognized command {} on fd {}", __func__, cmd,
fd); fd);
......
...@@ -92,6 +92,62 @@ get_current_syscall_info() { ...@@ -92,6 +92,62 @@ get_current_syscall_info() {
return saved_syscall_info; return saved_syscall_info;
} }
std::vector<int>
get_open_fds() {
std::vector<int> fds;
const int buffer_size = 4096;
char buffer[buffer_size];
// Open /proc/self/fd directory using raw syscall
int dir_fd = syscall_no_intercept_wrapper(SYS_open, "/proc/self/fd",
O_RDONLY | O_DIRECTORY, 0);
if(dir_fd < 0) {
return fds;
}
while(true) {
// Read directory entries using getdents64 syscall
int nread = syscall_no_intercept_wrapper(SYS_getdents64, dir_fd, buffer,
buffer_size);
if(nread <= 0)
break;
for(int bpos = 0; bpos < nread;) {
auto* dent = reinterpret_cast<linux_dirent64*>(buffer + bpos);
// Skip . and .. entries
const std::string d_name(dent->d_name);
if(d_name == "." || d_name == "..") {
bpos += dent->d_reclen;
continue;
}
try {
int fd = std::stoi(d_name);
// Skip the directory FD itself
if(fd != dir_fd) {
fds.push_back(fd);
}
} catch(const std::exception&) {
// Ignore non-integer entries
}
bpos += dent->d_reclen;
}
}
// Close directory using raw syscall
syscall_no_intercept_wrapper(SYS_close, dir_fd);
// Filter out non-open FDs (optional safety check)
fds.erase(std::remove_if(fds.begin(), fds.end(),
[](int fd) {
return syscall(SYS_fcntl, fd, F_GETFD) < 0;
}),
fds.end());
return fds;
}
/* /*
* hook_internal -- interception hook for internal syscalls * hook_internal -- interception hook for internal syscalls
...@@ -512,15 +568,18 @@ hook(long syscall_number, long arg0, long arg1, long arg2, long arg3, long arg4, ...@@ -512,15 +568,18 @@ hook(long syscall_number, long arg0, long arg1, long arg2, long arg3, long arg4,
*result = gkfs::hook::hook_close(static_cast<int>(arg0)); *result = gkfs::hook::hook_close(static_cast<int>(arg0));
break; break;
#ifdef SYS_close_range #ifdef SYS_close_range
case SYS_close_range: case SYS_close_range: {
for(auto i = arg0; i <= arg1; i++) { auto fds = get_open_fds();
if(i >= GKFS_MAX_OPEN_FDS) for(auto fd : fds) {
break; if(fd < static_cast<int>(arg0) || fd > static_cast<int>(arg1))
if(CTX->file_map()->exist(i)) { continue;
gkfs::syscall::gkfs_close(i); if(CTX->file_map()->exist(fd)) {
} gkfs::syscall::gkfs_close(fd);
} else
close(fd);
*result = 0; *result = 0;
} }
}
*result = 0; *result = 0;
break; break;
#endif // SYS_close_range #endif // SYS_close_range
......
...@@ -58,6 +58,7 @@ ...@@ -58,6 +58,7 @@
extern "C" { extern "C" {
#include <sys/stat.h> #include <sys/stat.h>
#include <dlfcn.h>
} }
using namespace std; using namespace std;
...@@ -111,16 +112,25 @@ match_components(const string& path, unsigned int& path_components, ...@@ -111,16 +112,25 @@ match_components(const string& path, unsigned int& path_components,
return matched; return matched;
} }
static char* (*real_realpath)(const char* path, char* resolved_path) = nullptr;
string string
follow_symlinks(const string& path) { follow_symlinks(const string& path) {
struct stat st{}; struct stat st{};
if(lstat(path.c_str(), &st) < 0) { auto res = syscall_no_intercept(SYS_lstat, path.c_str(), &st);
if(res < 0) {
LOG(DEBUG, "path \"{}\" does not exist", path); LOG(DEBUG, "path \"{}\" does not exist", path);
return path; return path;
} }
if(S_ISLNK(st.st_mode)) { if(S_ISLNK(st.st_mode)) {
auto link_resolved = ::unique_ptr<char[]>(new char[PATH_MAX]); auto link_resolved = ::unique_ptr<char[]>(new char[PATH_MAX]);
if(realpath(path.c_str(), link_resolved.get()) == nullptr) { if(real_realpath == nullptr) {
real_realpath = reinterpret_cast<char* (*) (const char* path,
char* resolved_path)>(
dlsym(((void*) -1l), "realpath"));
}
if(real_realpath(path.c_str(), link_resolved.get()) == nullptr) {
LOG(ERROR, LOG(ERROR,
"Failed to get realpath for link \"{}\". " "Failed to get realpath for link \"{}\". "
...@@ -198,7 +208,6 @@ resolve_new(const string& path, bool resolve_last_link) { ...@@ -198,7 +208,6 @@ resolve_new(const string& path, bool resolve_last_link) {
} }
#endif #endif
} }
if(resolved.substr(0, CTX->mountdir().size()) == CTX->mountdir()) { if(resolved.substr(0, CTX->mountdir().size()) == CTX->mountdir()) {
resolved.erase(1, CTX->mountdir().size()); resolved.erase(1, CTX->mountdir().size());
LOG(DEBUG, "internal: \"{}\"", resolved); LOG(DEBUG, "internal: \"{}\"", resolved);
......
...@@ -239,6 +239,16 @@ init_environment() { ...@@ -239,6 +239,16 @@ init_environment() {
"Failed to connect to hosts: "s + e.what()); "Failed to connect to hosts: "s + e.what());
} }
LOG(INFO, "Lock-Files : Generator = {} / Consumer = {}",
(bool) gkfs::env::get_var(gkfs::env::PROTECT_FILES_GENERATOR, 0),
(bool) gkfs::env::get_var(gkfs::env::PROTECT_FILES_CONSUMER, 0));
CTX->protect_files_generator(
gkfs::env::get_var(gkfs::env::PROTECT_FILES_GENERATOR, 0));
CTX->protect_files_consumer(
gkfs::env::get_var(gkfs::env::PROTECT_FILES_CONSUMER, 0));
/* Setup distributor */ /* Setup distributor */
auto forwarding_map_file = gkfs::env::get_var( auto forwarding_map_file = gkfs::env::get_var(
gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path); gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
...@@ -329,13 +339,14 @@ init_environment() { ...@@ -329,13 +339,14 @@ init_environment() {
} }
} }
LOG(INFO, "Retrieving file system configuration..."); // LOG(INFO, "Retrieving file system configuration...");
if(!gkfs::rpc::forward_get_fs_config()) { // if(!gkfs::rpc::forward_get_fs_config()) {
exit_error_msg( // exit_error_msg(
EXIT_FAILURE, // EXIT_FAILURE,
"Unable to fetch file system configurations from daemon process through RPC."); // "Unable to fetch file system configurations from daemon
} // process through RPC.");
// }
// Initialize random number generator and seed for replica selection // Initialize random number generator and seed for replica selection
// in case of failure, a new replica will be selected // in case of failure, a new replica will be selected
if(CTX->get_replicas() > 0) { if(CTX->get_replicas() > 0) {
...@@ -359,14 +370,12 @@ init_preload() { ...@@ -359,14 +370,12 @@ init_preload() {
// The original errno value will be restored after initialization to not // The original errno value will be restored after initialization to not
// leak internal error codes // leak internal error codes
auto oerrno = errno; auto oerrno = errno;
if(atomic_exchange(&init, 1) == 0) {
pthread_atfork(&at_fork_syscall, &at_parent_syscall, &at_child_syscall);
}
CTX->enable_interception(); CTX->enable_interception();
gkfs::preload::start_self_interception(); gkfs::preload::start_self_interception();
if(!init) {
init = true;
pthread_atfork(&at_fork_syscall, &at_parent_syscall, &at_child_syscall);
}
CTX->init_logging(); CTX->init_logging();
// from here ownwards it is safe to print messages // from here ownwards it is safe to print messages
LOG(DEBUG, "Logging subsystem initialized"); LOG(DEBUG, "Logging subsystem initialized");
...@@ -384,9 +393,6 @@ init_preload() { ...@@ -384,9 +393,6 @@ init_preload() {
if(gkfs::env::var_is_set(gkfs::env::PROTECT_FD)) { if(gkfs::env::var_is_set(gkfs::env::PROTECT_FD)) {
CTX->protect_fds(true); CTX->protect_fds(true);
LOG(INFO, "Protecting user fds"); LOG(INFO, "Protecting user fds");
} else {
CTX->protect_fds(false);
LOG(INFO, "Not protecting user fds");
} }
if(CTX->protect_fds()) { if(CTX->protect_fds()) {
...@@ -466,12 +472,22 @@ destroy_preload() { ...@@ -466,12 +472,22 @@ destroy_preload() {
extern "C" int extern "C" int
gkfs_init() { gkfs_init() {
CTX->init_logging(); CTX->init_logging();
// from here ownwards it is safe to print messages // from here ownwards it is safe to print messages
LOG(DEBUG, "Logging subsystem initialized"); LOG(DEBUG, "Logging subsystem initialized");
if(gkfs::env::var_is_set(gkfs::env::PROTECT_FD)) {
CTX->protect_fds(true);
LOG(INFO, "Protecting user fds");
}
if(CTX->protect_fds()) {
CTX->protect_user_fds();
}
gkfs::preload::init_environment(); gkfs::preload::init_environment();
if(CTX->protect_fds())
CTX->unprotect_user_fds();
return 0; return 0;
} }
...@@ -489,6 +505,76 @@ gkfs_end() { ...@@ -489,6 +505,76 @@ gkfs_end() {
return 0; return 0;
} }
/**
* @brief Automatically launch init/destroy
* NOTES: this is not called, in the child of a fork
*/
void
init_libc() {
CTX->init_logging();
// from here ownwards it is safe to print messages
LOG(DEBUG, "Logging subsystem initialized");
if(gkfs::env::var_is_set(gkfs::env::PROTECT_FD)) {
CTX->protect_fds(true);
LOG(INFO, "Protecting user fds");
}
if(CTX->protect_fds()) {
CTX->protect_user_fds();
}
if(atomic_exchange(&init, 1) == 0) {
pthread_atfork(&at_fork, &at_parent, &at_child);
}
gkfs::preload::init_environment();
if(CTX->protect_fds()) {
CTX->unprotect_user_fds();
}
CTX->enable_interception();
if(!CTX->init_metrics()) {
exit_error_msg(EXIT_FAILURE,
"Unable to initialize client metrics. Exiting...");
}
}
void
destroy_libc() {
CTX->disable_interception();
#ifdef GKFS_ENABLE_CLIENT_METRICS
LOG(INFO, "Flushing final metrics...");
CTX->write_metrics()->flush_msgpack();
CTX->read_metrics()->flush_msgpack();
LOG(INFO, "Metrics flushed. Total flush operations: {}",
CTX->write_metrics()->flush_count());
CTX->destroy_metrics();
#endif
CTX->clear_hosts();
LOG(DEBUG, "Peer information deleted");
ld_network_service.reset();
LOG(DEBUG, "RPC subsystem shut down");
LOG(INFO, "All subsystems shut down. Client shutdown complete.");
}
void
at_fork() {
destroy_libc();
}
void
at_parent() {
init_libc();
}
void
at_child() {
init_libc();
}
void void
at_fork_syscall() { at_fork_syscall() {
destroy_preload(); destroy_preload();
......
...@@ -85,6 +85,7 @@ PreloadContext::PreloadContext() ...@@ -85,6 +85,7 @@ PreloadContext::PreloadContext()
char host[255]; char host[255];
gethostname(host, 255); gethostname(host, 255);
hostname = host; hostname = host;
cwd_ = gkfs::path::get_sys_cwd();
PreloadContext::set_replicas( PreloadContext::set_replicas(
std::stoi(gkfs::env::get_var(gkfs::env::NUM_REPL, "0"))); std::stoi(gkfs::env::get_var(gkfs::env::NUM_REPL, "0")));
} }
...@@ -659,6 +660,28 @@ PreloadContext::get_replicas() { ...@@ -659,6 +660,28 @@ PreloadContext::get_replicas() {
return replicas_; return replicas_;
} }
bool
PreloadContext::protect_files_generator() const {
return protect_files_generator_;
}
void
PreloadContext::protect_files_generator(bool protect) {
protect_files_generator_ = protect;
}
bool
PreloadContext::protect_files_consumer() const {
return protect_files_consumer_;
}
void
PreloadContext::protect_files_consumer(bool protect) {
protect_files_consumer_ = protect;
}
const std::shared_ptr<messagepack::ClientMetrics> const std::shared_ptr<messagepack::ClientMetrics>
PreloadContext::write_metrics() { PreloadContext::write_metrics() {
return write_metrics_; return write_metrics_;
......
...@@ -166,8 +166,10 @@ load_hostfile(const std::string& path) { ...@@ -166,8 +166,10 @@ load_hostfile(const std::string& path) {
path, strerror(errno))); path, strerror(errno)));
} }
vector<pair<string, string>> hosts; vector<pair<string, string>> hosts;
const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$", const regex line_re(
"^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize); regex::ECMAScript | regex::optimize);
string line; string line;
string host; string host;
string uri; string uri;
...@@ -185,9 +187,25 @@ load_hostfile(const std::string& path) { ...@@ -185,9 +187,25 @@ load_hostfile(const std::string& path) {
throw runtime_error( throw runtime_error(
fmt::format("unrecognized line format: '{}'", line)); fmt::format("unrecognized line format: '{}'", line));
} }
host = match[1]; host = match[1];
uri = match[2]; uri = match[2];
// match[3] that is the proxy (not used here)
hosts.emplace_back(host, uri); hosts.emplace_back(host, uri);
// info will be repeated line per line:
CTX->mountdir(match[4]);
LOG(INFO, "Mountdir: '{}'", CTX->mountdir());
CTX->fs_conf()->rootdir = match[5];
CTX->fs_conf()->atime_state = match[6] == '1';
CTX->fs_conf()->mtime_state = match[7] == '1';
CTX->fs_conf()->ctime_state = match[8] == '1';
CTX->fs_conf()->link_cnt_state = match[9] == '1';
CTX->fs_conf()->blocks_state = match[10] == '1';
// convert match[11] and match[12] to unsigned integers.
CTX->fs_conf()->uid = std::stoi(match[11]);
CTX->fs_conf()->gid = std::stoi(match[12]);
} }
if(hosts.empty()) { if(hosts.empty()) {
throw runtime_error( throw runtime_error(
...@@ -264,6 +282,7 @@ get_metadata(const string& path, bool follow_links) { ...@@ -264,6 +282,7 @@ get_metadata(const string& path, bool follow_links) {
#ifdef HAS_SYMLINKS #ifdef HAS_SYMLINKS
if(follow_links) { if(follow_links) {
gkfs::metadata::Metadata md{attr}; gkfs::metadata::Metadata md{attr};
auto original = md;
while(md.is_link()) { while(md.is_link()) {
if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) { if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) {
err = gkfs::rpc::forward_stat_proxy(md.target_path(), attr); err = gkfs::rpc::forward_stat_proxy(md.target_path(), attr);
......
...@@ -406,9 +406,34 @@ int ...@@ -406,9 +406,34 @@ int
forward_rename(const string& oldpath, const string& newpath, forward_rename(const string& oldpath, const string& newpath,
const gkfs::metadata::Metadata& md) { const gkfs::metadata::Metadata& md) {
auto endp = CTX->hosts().at( auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(oldpath, 0)); CTX->distributor()->locate_file_metadata(oldpath, 0));
if(newpath == "") { // Just cleanup rename status
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::rename>(endp, oldpath, newpath)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
// return out.err() ? out.err() : 0;
return 0;
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
}
try { try {
LOG(DEBUG, "Sending RPC ..."); LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
...@@ -484,9 +509,8 @@ forward_rename(const string& oldpath, const string& newpath, ...@@ -484,9 +509,8 @@ forward_rename(const string& oldpath, const string& newpath,
// returning one result and a broadcast(endpoint_set) returning a // returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/ // result_set. When that happens we can remove the .at(0) :/
// Update new file with target link = oldpath // Update new file with target link = oldpath
auto out = auto out = ld_network_service
ld_network_service ->post<gkfs::rpc::rename>(endp2, newpath, oldpath)
->post<gkfs::rpc::mk_symlink>(endp2, newpath, oldpath)
.get() .get()
.at(0); .at(0);
...@@ -508,7 +532,7 @@ forward_rename(const string& oldpath, const string& newpath, ...@@ -508,7 +532,7 @@ forward_rename(const string& oldpath, const string& newpath,
// returning one result and a broadcast(endpoint_set) returning a // returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/ // result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service auto out = ld_network_service
->post<gkfs::rpc::mk_symlink>(endp, oldpath, newpath) ->post<gkfs::rpc::rename>(endp, oldpath, newpath)
.get() .get()
.at(0); .at(0);
......
...@@ -62,6 +62,9 @@ hermes::detail::register_user_request_types(uint32_t provider_id) { ...@@ -62,6 +62,9 @@ hermes::detail::register_user_request_types(uint32_t provider_id) {
#ifdef HAS_SYMLINKS #ifdef HAS_SYMLINKS
(void) registered_requests().add<gkfs::rpc::mk_symlink>(provider_id); (void) registered_requests().add<gkfs::rpc::mk_symlink>(provider_id);
#endif // HAS_SYMLINKS #endif // HAS_SYMLINKS
#ifdef HAS_RENAME
(void) registered_requests().add<gkfs::rpc::rename>(provider_id);
#endif // HAS_RENAME
(void) registered_requests().add<gkfs::rpc::remove_data>(provider_id); (void) registered_requests().add<gkfs::rpc::remove_data>(provider_id);
(void) registered_requests().add<gkfs::rpc::write_data>(provider_id); (void) registered_requests().add<gkfs::rpc::write_data>(provider_id);
(void) registered_requests().add<gkfs::rpc::read_data>(provider_id); (void) registered_requests().add<gkfs::rpc::read_data>(provider_id);
......
...@@ -163,9 +163,10 @@ Stats::output_map(std::ofstream& output) { ...@@ -163,9 +163,10 @@ Stats::output_map(std::ofstream& output) {
} }
auto chunkMap = auto chunkMap =
[](std::string caption, [](const std::string& caption,
map<unsigned int, const map<unsigned int,
std::set<pair<std::string, unsigned long long>>>& order, std::set<pair<std::string, unsigned long long>>>&
order,
std::ofstream& output) { std::ofstream& output) {
output << caption << std::endl; output << caption << std::endl;
for(auto k : order) { for(auto k : order) {
......
...@@ -178,6 +178,10 @@ register_server_rpcs(margo_instance_id mid) { ...@@ -178,6 +178,10 @@ register_server_rpcs(margo_instance_id mid) {
#ifdef HAS_SYMLINKS #ifdef HAS_SYMLINKS
MARGO_REGISTER(mid, gkfs::rpc::tag::mk_symlink, rpc_mk_symlink_in_t, MARGO_REGISTER(mid, gkfs::rpc::tag::mk_symlink, rpc_mk_symlink_in_t,
rpc_err_out_t, rpc_srv_mk_symlink); rpc_err_out_t, rpc_srv_mk_symlink);
#endif
#ifdef HAS_RENAME
MARGO_REGISTER(mid, gkfs::rpc::tag::rename, rpc_rename_in_t, rpc_err_out_t,
rpc_srv_rename);
#endif #endif
MARGO_REGISTER(mid, gkfs::rpc::tag::write, rpc_write_data_in_t, MARGO_REGISTER(mid, gkfs::rpc::tag::write, rpc_write_data_in_t,
rpc_data_out_t, rpc_srv_write); rpc_data_out_t, rpc_srv_write);
...@@ -584,7 +588,7 @@ agios_initialize() { ...@@ -584,7 +588,7 @@ agios_initialize() {
agios_exit(); agios_exit();
throw; std::terminate();
} }
} }
#endif #endif
...@@ -885,10 +889,9 @@ parse_input(const cli_options& opts, const CLI::App& desc) { ...@@ -885,10 +889,9 @@ parse_input(const cli_options& opts, const CLI::App& desc) {
metadir_path.native()); metadir_path.native());
} else { } else {
// use rootdir as metadata dir // use rootdir as metadata dir
auto metadir = opts.rootdir;
if(GKFS_DATA->enable_forwarding()) { if(GKFS_DATA->enable_forwarding()) {
auto metadir = opts.rootdir;
// As we store normally he metadata to the pfs, we need to put each // As we store normally he metadata to the pfs, we need to put each
// daemon in a separate directory. // daemon in a separate directory.
auto metadir_path = auto metadir_path =
......
...@@ -847,7 +847,66 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) { ...@@ -847,7 +847,66 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
} }
#if defined(HAS_SYMLINKS) || defined(HAS_RENAME) #if defined(HAS_SYMLINKS)
/**
* @brief Serves a request create a symbolic link
* @internal
* The state of this function is unclear and requires a complete refactor.
*
* All exceptions must be caught here and dealt with accordingly. Any errors are
* placed in the response.
* @endinternal
* @param handle Mercury RPC handle (path is the symbolic link)
* @return Mercury error code to Mercury
*/
hg_return_t
rpc_srv_mk_symlink(hg_handle_t handle) {
rpc_mk_symlink_in_t in{};
rpc_err_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
}
GKFS_DATA->spdlogger()->debug(
"{}() Got RPC with path '{}' and target path '{}'", __func__,
in.path, in.target_path);
// do update
try {
gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
md.target_path(in.target_path);
md.mode(S_IFLNK);
md.blocks(0);
GKFS_DATA->spdlogger()->debug(
"{}() Updating path '{}' with metadata '{}'", __func__, in.path,
md.serialize());
gkfs::metadata::update(in.path, md);
out.err = 0;
} catch(const std::exception& e) {
// TODO handle NotFoundException
GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__);
out.err = 1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
#endif // HAS_SYMLINKS
#if defined(HAS_RENAME)
/** /**
* @brief Serves a request create a symbolic link and supports rename * @brief Serves a request create a symbolic link and supports rename
* @internal * @internal
...@@ -856,11 +915,11 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) { ...@@ -856,11 +915,11 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {
* All exceptions must be caught here and dealt with accordingly. Any errors are * All exceptions must be caught here and dealt with accordingly. Any errors are
* placed in the response. * placed in the response.
* @endinteral * @endinteral
* @param handle Mercury RPC handle * @param handle Mercury RPC handle (target_path is the symbolic link)
* @return Mercury error code to Mercury * @return Mercury error code to Mercury
*/ */
hg_return_t hg_return_t
rpc_srv_mk_symlink(hg_handle_t handle) { rpc_srv_rename(hg_handle_t handle) {
rpc_mk_symlink_in_t in{}; rpc_mk_symlink_in_t in{};
rpc_err_out_t out{}; rpc_err_out_t out{};
...@@ -875,17 +934,14 @@ rpc_srv_mk_symlink(hg_handle_t handle) { ...@@ -875,17 +934,14 @@ rpc_srv_mk_symlink(hg_handle_t handle) {
// do update // do update
try { try {
gkfs::metadata::Metadata md = gkfs::metadata::get(in.path); gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
#ifdef HAS_RENAME
if(md.blocks() == -1) {
// We need to fill the rename path as this is an inverse path
// old -> new
md.rename_path(in.target_path);
} else {
#endif // HAS_RENAME
md.target_path(in.target_path); md.target_path(in.target_path);
#ifdef HAS_RENAME // We are reverting the rename so we clean up the target_path
if(strcmp(in.target_path, "") == 0) {
md.blocks(0);
} }
#endif // HAS_RENAME
GKFS_DATA->spdlogger()->debug( GKFS_DATA->spdlogger()->debug(
"{}() Updating path '{}' with metadata '{}'", __func__, in.path, "{}() Updating path '{}' with metadata '{}'", __func__, in.path,
md.serialize()); md.serialize());
...@@ -910,7 +966,7 @@ rpc_srv_mk_symlink(hg_handle_t handle) { ...@@ -910,7 +966,7 @@ rpc_srv_mk_symlink(hg_handle_t handle) {
return HG_SUCCESS; return HG_SUCCESS;
} }
#endif // HAS_SYMLINKS || HAS_RENAME #endif // HAS_RENAME
} // namespace } // namespace
...@@ -938,3 +994,6 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents_extended) ...@@ -938,3 +994,6 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents_extended)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink) DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink)
#endif #endif
#ifdef HAS_RENAME
DEFINE_MARGO_RPC_HANDLER(rpc_srv_rename)
#endif
...@@ -75,8 +75,10 @@ MalleableManager::load_hostfile(const std::string& path) { ...@@ -75,8 +75,10 @@ MalleableManager::load_hostfile(const std::string& path) {
path, strerror(errno))); path, strerror(errno)));
} }
vector<pair<string, string>> hosts; vector<pair<string, string>> hosts;
const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$", const regex line_re(
"^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize); regex::ECMAScript | regex::optimize);
string line; string line;
string host; string host;
string uri; string uri;
......
...@@ -47,6 +47,7 @@ ...@@ -47,6 +47,7 @@
#include <iostream> #include <iostream>
#include <limits> #include <limits>
#include <thread> #include <thread>
#include <unistd.h>
using namespace std; using namespace std;
...@@ -114,6 +115,16 @@ populate_hosts_file() { ...@@ -114,6 +115,16 @@ populate_hosts_file() {
auto line_out = fmt::format("{} {}", hostname, daemon_addr); auto line_out = fmt::format("{} {}", hostname, daemon_addr);
if(!proxy_addr.empty()) if(!proxy_addr.empty())
line_out = fmt::format("{} {}", line_out, proxy_addr); line_out = fmt::format("{} {}", line_out, proxy_addr);
if(proxy_addr.empty())
line_out = fmt::format("{} {}", line_out, "NOPROXY");
line_out = fmt::format(
"{} {} {} {} {} {} {} {} {} {}", line_out, GKFS_DATA->mountdir(),
GKFS_DATA->rootdir(), (int) GKFS_DATA->atime_state(),
(int) GKFS_DATA->mtime_state(), (int) GKFS_DATA->ctime_state(),
(int) GKFS_DATA->link_cnt_state(), (int) GKFS_DATA->blocks_state(),
getuid(), getgid());
// Constants for retry mechanism // Constants for retry mechanism
const int MAX_RETRIES = 5; // Maximum number of retry attempts const int MAX_RETRIES = 5; // Maximum number of retry attempts
const std::chrono::milliseconds RETRY_DELAY( const std::chrono::milliseconds RETRY_DELAY(
......
...@@ -56,8 +56,10 @@ load_hostfile(const std::string& lfpath) { ...@@ -56,8 +56,10 @@ load_hostfile(const std::string& lfpath) {
lfpath, strerror(errno))); lfpath, strerror(errno)));
} }
vector<pair<string, string>> hosts; vector<pair<string, string>> hosts;
const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$", const regex line_re(
"^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize); regex::ECMAScript | regex::optimize);
string line; string line;
string host; string host;
string uri; string uri;
...@@ -213,8 +215,8 @@ connect_to_hosts(const vector<pair<string, string>>& hosts) { ...@@ -213,8 +215,8 @@ connect_to_hosts(const vector<pair<string, string>>& hosts) {
ret = margo_addr_lookup(PROXY_DATA->client_rpc_mid(), uri.c_str(), ret = margo_addr_lookup(PROXY_DATA->client_rpc_mid(), uri.c_str(),
&svr_addr); &svr_addr);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
// still not working after 5 tries. // still not working after 4 tries.
if(i == 4) { if(i == 3) {
auto err_msg = auto err_msg =
fmt::format("{}() Unable to lookup address '{}'", fmt::format("{}() Unable to lookup address '{}'",
__func__, uri); __func__, uri);
......
...@@ -57,7 +57,7 @@ main(int argc, char* argv[]) { ...@@ -57,7 +57,7 @@ main(int argc, char* argv[]) {
const std::string link_ext = dir_ext + "/tmp/link"; const std::string link_ext = dir_ext + "/tmp/link";
char buffIn[] = "oops."; char buffIn[] = "oops.";
char* buffOut = new char[strlen(buffIn) + 1]; char buffOut[strlen(buffIn) + 1];
struct stat st; struct stat st;
int ret; int ret;
...@@ -170,7 +170,7 @@ main(int argc, char* argv[]) { ...@@ -170,7 +170,7 @@ main(int argc, char* argv[]) {
// Check readlink // Check readlink
char* target_path = new char[target_int.size() + 1]; char target_path[target_int.size() + 1];
ret = readlink(link_int.c_str(), target_path, target_int.size() + 1); ret = readlink(link_int.c_str(), target_path, target_int.size() + 1);
if(ret <= 0) { if(ret <= 0) {
std::cerr << "ERROR: Failed to retrieve link path: " << strerror(errno) std::cerr << "ERROR: Failed to retrieve link path: " << strerror(errno)
......
...@@ -61,7 +61,7 @@ main(int argc, char* argv[]) { ...@@ -61,7 +61,7 @@ main(int argc, char* argv[]) {
string mountdir = "/tmp/mountdir"; string mountdir = "/tmp/mountdir";
string p = mountdir + "/file"; string p = mountdir + "/file";
char buffIn[] = "oops."; char buffIn[] = "oops.";
char* buffOut = new char[strlen(buffIn) + 1 + 20]; char buffOut[strlen(buffIn) + 1 + 20];
int fd; int fd;
int ret; int ret;
struct stat st; struct stat st;
......
...@@ -34,7 +34,7 @@ from pathlib import Path ...@@ -34,7 +34,7 @@ from pathlib import Path
from harness.logger import logger, initialize_logging, finalize_logging from harness.logger import logger, initialize_logging, finalize_logging
from harness.cli import add_cli_options, set_default_log_formatter from harness.cli import add_cli_options, set_default_log_formatter
from harness.workspace import Workspace, FileCreator from harness.workspace import Workspace, FileCreator
from harness.gkfs import Daemon, Client, Proxy, ShellClient, FwdDaemon, FwdClient, ShellFwdClient, FwdDaemonCreator, FwdClientCreator from harness.gkfs import Daemon, Client, ClientLibc, Proxy, ShellClient, ShellClientLibc, FwdDaemon, FwdClient, ShellFwdClient, FwdDaemonCreator, FwdClientCreator
from harness.reporter import report_test_status, report_test_headline, report_assertion_pass from harness.reporter import report_test_status, report_test_headline, report_assertion_pass
def pytest_configure(config): def pytest_configure(config):
...@@ -159,6 +159,16 @@ def gkfs_client_proxy(test_workspace): ...@@ -159,6 +159,16 @@ def gkfs_client_proxy(test_workspace):
return Client(test_workspace, True) return Client(test_workspace, True)
@pytest.fixture
def gkfs_clientLibc(test_workspace):
"""
Sets up a gekkofs client environment so that
operations (system calls, library calls, ...) can
be requested from a co-running daemon.
"""
return ClientLibc(test_workspace)
@pytest.fixture @pytest.fixture
def gkfs_shell(test_workspace): def gkfs_shell(test_workspace):
""" """
...@@ -177,6 +187,15 @@ def gkfs_shell_proxy(test_workspace): ...@@ -177,6 +187,15 @@ def gkfs_shell_proxy(test_workspace):
return ShellClient(test_workspace,True) return ShellClient(test_workspace,True)
@pytest.fixture
def gkfs_shellLibc(test_workspace):
"""
Sets up a gekkofs environment so that shell commands
(stat, ls, mkdir, etc.) can be issued to a co-running daemon.
"""
return ShellClientLibc(test_workspace)
@pytest.fixture @pytest.fixture
def file_factory(test_workspace): def file_factory(test_workspace):
""" """
......
This diff is collapsed.
This diff is collapsed.