Loading ifs/configure.hpp +0 −3 Original line number Diff line number Diff line Loading @@ -46,7 +46,4 @@ // Debug configurations //#define RPC_TEST //unused // Using Margo for IPC or raw sockets #define MARGOIPC #endif //FS_CONFIGURE_H ifs/include/daemon/adafs_daemon.hpp +0 −3 Original line number Diff line number Diff line Loading @@ -7,9 +7,6 @@ #include "../../main.hpp" void daemon_loop(void* arg); void run_daemon(); void init_environment(); void destroy_enviroment(); Loading ifs/main.cpp +1 −4 Original line number Diff line number Diff line Loading @@ -115,9 +115,6 @@ int main(int argc, const char* argv[]) { bfs::create_directories(ADAFS_DATA->mgmt_path()); init_environment(); #ifndef MARGOIPC run_daemon(); // blocks here until application loop is exited TODO don't know yet how it'll be closed :D #else signal(SIGINT, shutdown_handler); signal(SIGTERM, shutdown_handler); Loading @@ -129,7 +126,7 @@ int main(int argc, const char* argv[]) { ADAFS_DATA->spdlogger()->info("Shutting done signal encountered. Shutting down ..."); #endif destroy_enviroment(); return 0; Loading ifs/src/daemon/adafs_daemon.cpp +0 −54 Original line number Diff line number Diff line Loading @@ -8,60 +8,6 @@ #include <rpc/rpc_defs.hpp> #include <preload/ipc_types.hpp> void daemon_loop(void* arg) { ADAFS_DATA->spdlogger()->info("Starting application loop ..."); while (true) { ADAFS_DATA->spdlogger()->info("sleeping"); sleep(10); /* TODO for Nafiseh * Connect to the IPC socket with the looping thread and listed for messages from the preload lib * When new message is received spawn a new thread that will trigger the operation and respond to preload lib * Ensure that messages from the lib are not lost. XXX */ // connect to the ipc socket and a separate thread retrieves the message from the preload lib. in ADAFS_DATA->spdlogger()->info("done sleeping. exiting ..."); break; } } void run_daemon() { ABT_xstream xstream; ABT_pool pool; ABT_thread loop_thread; auto argo_ret = ABT_xstream_self( &xstream); // get the current execution stream (1 core) to use (started with ABT_init()) if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error getting the execution stream when starting the daemon."); return; } argo_ret = ABT_xstream_get_main_pools(xstream, 1, &pool); // get the thread pool if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error getting the thread pool when starting the daemon."); return; } argo_ret = ABT_thread_create(pool, daemon_loop, nullptr, ABT_THREAD_ATTR_NULL, &loop_thread); if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error creating loop thread"); return; } // wait for the daemon to be closed and free the loop thread ABT_thread_yield_to(loop_thread); argo_ret = ABT_thread_join(loop_thread); if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error joining loop thread"); return; } argo_ret = ABT_thread_free(&loop_thread); if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error freeing loop thread."); return; } } void init_environment() { // Initialize rocksdb auto err = init_rocksdb(); Loading ifs/src/preload/preload.cpp +2 −54 Original line number Diff line number Diff line Loading @@ -115,9 +115,6 @@ int open(const char* path, int flags, ...) { if (is_env_initialized && is_fs_path(path)) { auto err = 1; auto fd = file_map.add(path, (flags & O_APPEND) != 0); #ifndef MARGOIPC #else if (flags & O_CREAT) { // do file create TODO handle all other flags if (fs_config->host_size > 1) { // multiple node operation auto recipient = get_rpc_node(path); Loading @@ -134,7 +131,6 @@ int open(const char* path, int flags, ...) { // TODO look up if file exists err = 0; } #endif if (err == 0) return fd; else { Loading Loading @@ -192,8 +188,6 @@ int unlink(const char* path) __THROW { int err; // LD_LOG_DEBUG(debug_fd, "unlink called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { #ifndef MARGOIPC #else if (fs_config->host_size > 1) { // multiple node operation auto recipient = get_rpc_node(path); if (is_local_op(recipient)) { // local Loading @@ -206,8 +200,6 @@ int unlink(const char* path) __THROW { } return err; #endif } return (reinterpret_cast<decltype(&unlink)>(libc_unlink))(path); } Loading Loading @@ -268,11 +260,7 @@ int stat(const char* path, struct stat* buf) __THROW { LD_LOG_DEBUG(debug_fd, "stat called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { // TODO call daemon and return #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&stat)>(libc_stat))(path, buf); } Loading @@ -283,11 +271,7 @@ int fstat(int fd, struct stat* buf) __THROW { if (is_env_initialized && file_map.exist(fd)) { auto path = file_map.get(fd)->path(); // TODO use this to send to the daemon (call directly) // TODO call daemon and return #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&fstat)>(libc_fstat))(fd, buf); } Loading @@ -297,11 +281,7 @@ int __xstat(int ver, const char* path, struct stat* buf) __THROW { LD_LOG_DEBUG(debug_fd, "__xstat called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { // TODO call stat #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&__xstat)>(libc___xstat))(ver, path, buf); } Loading @@ -310,11 +290,7 @@ int __xstat64(int ver, const char* path, struct stat64* buf) __THROW { init_passthrough_if_needed(); LD_LOG_DEBUG(debug_fd, "__xstat64 called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { #ifndef MARGOIPC #else return adafs_stat64(path, buf); #endif // // Not implemented // return -1; } Loading @@ -327,11 +303,7 @@ int __fxstat(int ver, int fd, struct stat* buf) __THROW { if (is_env_initialized && file_map.exist(fd)) { // TODO call fstat auto path = file_map.get(fd)->path(); #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&__fxstat)>(libc___fxstat))(ver, fd, buf); } Loading @@ -342,11 +314,7 @@ int __fxstat64(int ver, int fd, struct stat64* buf) __THROW { if (is_env_initialized && file_map.exist(fd)) { // TODO call fstat64 auto path = file_map.get(fd)->path(); #ifndef MARGOIPC #else return adafs_stat64(path, buf); #endif } return (reinterpret_cast<decltype(&__fxstat64)>(libc___fxstat64))(ver, fd, buf); } Loading @@ -372,11 +340,7 @@ extern int __lxstat64(int ver, const char* path, struct stat64* buf) __THROW { int access(const char* path, int mode) __THROW { init_passthrough_if_needed(); if (is_env_initialized && is_fs_path(path)) { #ifndef MARGOIPC #else #endif // TODO } return (reinterpret_cast<decltype(&access)>(libc_access))(path, mode); } Loading Loading @@ -404,9 +368,6 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) { size_t write_size; int err = 0; long updated_size = 0; #ifndef MARGOIPC #else /* * Update the metadentry size first to prevent two processes to write to the same offset when O_APPEND is given. * The metadentry size update is atomic XXX actually not yet. see metadentry.cpp Loading @@ -424,7 +385,6 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) { LD_LOG_ERROR0(debug_fd, "pwrite: write failed\n"); return 0; } #endif return write_size; } return (reinterpret_cast<decltype(&pwrite)>(libc_pwrite))(fd, buf, count, offset); Loading @@ -445,11 +405,7 @@ ssize_t pread(int fd, void* buf, size_t count, off_t offset) { auto path = adafs_fd->path(); size_t read_size = 0; int err; #ifndef MARGOIPC #else err = rpc_send_read(ipc_read_data_id, rpc_read_data_id, path, count, offset, buf, read_size); #endif // TODO check how much we need to deal with the read_size return err == 0 ? read_size : 0; } Loading @@ -461,11 +417,6 @@ ssize_t pread64(int fd, void* buf, size_t nbyte, __off64_t offset) { if (is_env_initialized && file_map.exist(fd)) { // auto path = file_map.get(fd)->path(); // TODO use this to send to the daemon (call directly) // TODO call daemon and return size written #ifndef MARGOIPC #else #endif return 0; // TODO } return (reinterpret_cast<decltype(&pread64)>(libc_pread64))(fd, buf, nbyte, offset); Loading Loading @@ -716,7 +667,6 @@ hg_addr_t daemon_addr() { * This function is only called in the preload constructor */ void init_environment() { #ifdef MARGOIPC // init margo client for IPC auto err = init_ld_argobots(); assert(err); Loading @@ -726,7 +676,6 @@ void init_environment() { assert(err); err = init_rpc_client(); assert(err); #endif is_env_initialized = true; LD_LOG_DEBUG0(debug_fd, "Environment initialized.\n"); } Loading Loading @@ -792,7 +741,6 @@ void init_preload(void) { */ void destroy_preload(void) { #ifdef MARGOIPC LD_LOG_DEBUG0(debug_fd, "Freeing Mercury daemon addr ...\n"); HG_Addr_free(margo_get_class(margo_ipc_id_), daemon_svr_addr_); LD_LOG_DEBUG0(debug_fd, "Finalizing Margo IPC client ...\n"); Loading Loading @@ -821,6 +769,6 @@ void destroy_preload(void) { HG_Finalize(mercury_ipc_class); HG_Finalize(mercury_rpc_class); LD_LOG_DEBUG0(debug_fd, "Preload library shut down.\n"); #endif fclose(debug_fd); } No newline at end of file Loading
ifs/configure.hpp +0 −3 Original line number Diff line number Diff line Loading @@ -46,7 +46,4 @@ // Debug configurations //#define RPC_TEST //unused // Using Margo for IPC or raw sockets #define MARGOIPC #endif //FS_CONFIGURE_H
ifs/include/daemon/adafs_daemon.hpp +0 −3 Original line number Diff line number Diff line Loading @@ -7,9 +7,6 @@ #include "../../main.hpp" void daemon_loop(void* arg); void run_daemon(); void init_environment(); void destroy_enviroment(); Loading
ifs/main.cpp +1 −4 Original line number Diff line number Diff line Loading @@ -115,9 +115,6 @@ int main(int argc, const char* argv[]) { bfs::create_directories(ADAFS_DATA->mgmt_path()); init_environment(); #ifndef MARGOIPC run_daemon(); // blocks here until application loop is exited TODO don't know yet how it'll be closed :D #else signal(SIGINT, shutdown_handler); signal(SIGTERM, shutdown_handler); Loading @@ -129,7 +126,7 @@ int main(int argc, const char* argv[]) { ADAFS_DATA->spdlogger()->info("Shutting done signal encountered. Shutting down ..."); #endif destroy_enviroment(); return 0; Loading
ifs/src/daemon/adafs_daemon.cpp +0 −54 Original line number Diff line number Diff line Loading @@ -8,60 +8,6 @@ #include <rpc/rpc_defs.hpp> #include <preload/ipc_types.hpp> void daemon_loop(void* arg) { ADAFS_DATA->spdlogger()->info("Starting application loop ..."); while (true) { ADAFS_DATA->spdlogger()->info("sleeping"); sleep(10); /* TODO for Nafiseh * Connect to the IPC socket with the looping thread and listed for messages from the preload lib * When new message is received spawn a new thread that will trigger the operation and respond to preload lib * Ensure that messages from the lib are not lost. XXX */ // connect to the ipc socket and a separate thread retrieves the message from the preload lib. in ADAFS_DATA->spdlogger()->info("done sleeping. exiting ..."); break; } } void run_daemon() { ABT_xstream xstream; ABT_pool pool; ABT_thread loop_thread; auto argo_ret = ABT_xstream_self( &xstream); // get the current execution stream (1 core) to use (started with ABT_init()) if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error getting the execution stream when starting the daemon."); return; } argo_ret = ABT_xstream_get_main_pools(xstream, 1, &pool); // get the thread pool if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error getting the thread pool when starting the daemon."); return; } argo_ret = ABT_thread_create(pool, daemon_loop, nullptr, ABT_THREAD_ATTR_NULL, &loop_thread); if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error creating loop thread"); return; } // wait for the daemon to be closed and free the loop thread ABT_thread_yield_to(loop_thread); argo_ret = ABT_thread_join(loop_thread); if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error joining loop thread"); return; } argo_ret = ABT_thread_free(&loop_thread); if (argo_ret != 0) { ADAFS_DATA->spdlogger()->error("Error freeing loop thread."); return; } } void init_environment() { // Initialize rocksdb auto err = init_rocksdb(); Loading
ifs/src/preload/preload.cpp +2 −54 Original line number Diff line number Diff line Loading @@ -115,9 +115,6 @@ int open(const char* path, int flags, ...) { if (is_env_initialized && is_fs_path(path)) { auto err = 1; auto fd = file_map.add(path, (flags & O_APPEND) != 0); #ifndef MARGOIPC #else if (flags & O_CREAT) { // do file create TODO handle all other flags if (fs_config->host_size > 1) { // multiple node operation auto recipient = get_rpc_node(path); Loading @@ -134,7 +131,6 @@ int open(const char* path, int flags, ...) { // TODO look up if file exists err = 0; } #endif if (err == 0) return fd; else { Loading Loading @@ -192,8 +188,6 @@ int unlink(const char* path) __THROW { int err; // LD_LOG_DEBUG(debug_fd, "unlink called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { #ifndef MARGOIPC #else if (fs_config->host_size > 1) { // multiple node operation auto recipient = get_rpc_node(path); if (is_local_op(recipient)) { // local Loading @@ -206,8 +200,6 @@ int unlink(const char* path) __THROW { } return err; #endif } return (reinterpret_cast<decltype(&unlink)>(libc_unlink))(path); } Loading Loading @@ -268,11 +260,7 @@ int stat(const char* path, struct stat* buf) __THROW { LD_LOG_DEBUG(debug_fd, "stat called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { // TODO call daemon and return #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&stat)>(libc_stat))(path, buf); } Loading @@ -283,11 +271,7 @@ int fstat(int fd, struct stat* buf) __THROW { if (is_env_initialized && file_map.exist(fd)) { auto path = file_map.get(fd)->path(); // TODO use this to send to the daemon (call directly) // TODO call daemon and return #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&fstat)>(libc_fstat))(fd, buf); } Loading @@ -297,11 +281,7 @@ int __xstat(int ver, const char* path, struct stat* buf) __THROW { LD_LOG_DEBUG(debug_fd, "__xstat called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { // TODO call stat #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&__xstat)>(libc___xstat))(ver, path, buf); } Loading @@ -310,11 +290,7 @@ int __xstat64(int ver, const char* path, struct stat64* buf) __THROW { init_passthrough_if_needed(); LD_LOG_DEBUG(debug_fd, "__xstat64 called with path %s\n", path); if (is_env_initialized && is_fs_path(path)) { #ifndef MARGOIPC #else return adafs_stat64(path, buf); #endif // // Not implemented // return -1; } Loading @@ -327,11 +303,7 @@ int __fxstat(int ver, int fd, struct stat* buf) __THROW { if (is_env_initialized && file_map.exist(fd)) { // TODO call fstat auto path = file_map.get(fd)->path(); #ifndef MARGOIPC #else return adafs_stat(path, buf); #endif } return (reinterpret_cast<decltype(&__fxstat)>(libc___fxstat))(ver, fd, buf); } Loading @@ -342,11 +314,7 @@ int __fxstat64(int ver, int fd, struct stat64* buf) __THROW { if (is_env_initialized && file_map.exist(fd)) { // TODO call fstat64 auto path = file_map.get(fd)->path(); #ifndef MARGOIPC #else return adafs_stat64(path, buf); #endif } return (reinterpret_cast<decltype(&__fxstat64)>(libc___fxstat64))(ver, fd, buf); } Loading @@ -372,11 +340,7 @@ extern int __lxstat64(int ver, const char* path, struct stat64* buf) __THROW { int access(const char* path, int mode) __THROW { init_passthrough_if_needed(); if (is_env_initialized && is_fs_path(path)) { #ifndef MARGOIPC #else #endif // TODO } return (reinterpret_cast<decltype(&access)>(libc_access))(path, mode); } Loading Loading @@ -404,9 +368,6 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) { size_t write_size; int err = 0; long updated_size = 0; #ifndef MARGOIPC #else /* * Update the metadentry size first to prevent two processes to write to the same offset when O_APPEND is given. * The metadentry size update is atomic XXX actually not yet. see metadentry.cpp Loading @@ -424,7 +385,6 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) { LD_LOG_ERROR0(debug_fd, "pwrite: write failed\n"); return 0; } #endif return write_size; } return (reinterpret_cast<decltype(&pwrite)>(libc_pwrite))(fd, buf, count, offset); Loading @@ -445,11 +405,7 @@ ssize_t pread(int fd, void* buf, size_t count, off_t offset) { auto path = adafs_fd->path(); size_t read_size = 0; int err; #ifndef MARGOIPC #else err = rpc_send_read(ipc_read_data_id, rpc_read_data_id, path, count, offset, buf, read_size); #endif // TODO check how much we need to deal with the read_size return err == 0 ? read_size : 0; } Loading @@ -461,11 +417,6 @@ ssize_t pread64(int fd, void* buf, size_t nbyte, __off64_t offset) { if (is_env_initialized && file_map.exist(fd)) { // auto path = file_map.get(fd)->path(); // TODO use this to send to the daemon (call directly) // TODO call daemon and return size written #ifndef MARGOIPC #else #endif return 0; // TODO } return (reinterpret_cast<decltype(&pread64)>(libc_pread64))(fd, buf, nbyte, offset); Loading Loading @@ -716,7 +667,6 @@ hg_addr_t daemon_addr() { * This function is only called in the preload constructor */ void init_environment() { #ifdef MARGOIPC // init margo client for IPC auto err = init_ld_argobots(); assert(err); Loading @@ -726,7 +676,6 @@ void init_environment() { assert(err); err = init_rpc_client(); assert(err); #endif is_env_initialized = true; LD_LOG_DEBUG0(debug_fd, "Environment initialized.\n"); } Loading Loading @@ -792,7 +741,6 @@ void init_preload(void) { */ void destroy_preload(void) { #ifdef MARGOIPC LD_LOG_DEBUG0(debug_fd, "Freeing Mercury daemon addr ...\n"); HG_Addr_free(margo_get_class(margo_ipc_id_), daemon_svr_addr_); LD_LOG_DEBUG0(debug_fd, "Finalizing Margo IPC client ...\n"); Loading Loading @@ -821,6 +769,6 @@ void destroy_preload(void) { HG_Finalize(mercury_ipc_class); HG_Finalize(mercury_rpc_class); LD_LOG_DEBUG0(debug_fd, "Preload library shut down.\n"); #endif fclose(debug_fd); } No newline at end of file