Loading CMakeLists.txt +1 −0 Original line number Diff line number Diff line Loading @@ -309,6 +309,7 @@ set(INCLUDE_DIR "${CMAKE_SOURCE_DIR}/include") include_directories( ${INCLUDE_DIR} ${CMAKE_BINARY_DIR}/include /usr/include ) add_subdirectory(src) Loading include/client/preload_context.hpp +10 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include <hermes.hpp> #include <map> #include <mercury.h> #include <common/msgpack_util.hpp> #include <memory> #include <vector> #include <string> Loading Loading @@ -107,6 +108,9 @@ private: std::string hostname; int replicas_; gkfs::messagepack::ClientMetrics write_metrics_{}; gkfs::messagepack::ClientMetrics read_metrics_{}; public: static PreloadContext* getInstance() { Loading Loading @@ -223,6 +227,12 @@ public: int get_replicas(); messagepack::ClientMetrics& write_metrics(); messagepack::ClientMetrics& read_metrics(); }; } // namespace preload Loading include/common/msgpack_util.hpp +38 −13 Original line number Diff line number Diff line Loading @@ -32,27 +32,52 @@ #include <msgpack/msgpack.hpp> #include <string> #include <vector> #include <chrono> #include <csignal> #include <thread> namespace gkfs::msgpack { namespace gkfs::messagepack { struct client_metrics { uint64_t total_bytes_read; uint64_t total_bytes_written; enum class client_metric_type { write, read }; class ClientMetrics { std::string name; int age; std::vector<std::string> aliases; public: // std::thread thread_{}; std::chrono::time_point<std::chrono::system_clock> init_t_; std::string hostname_; int pid_; // in milliseconds std::vector<double> start_t_{}; std::vector<double> end_t_{}; // in bytes per second std::vector<double> avg_{}; uint64_t total_bytes_{}; int total_iops_{0}; // public: ClientMetrics(); void add_event(size_t size, std::chrono::time_point<std::chrono::system_clock> start); void flush_msgpack(std::string path); template <class T> void msgpack(T &pack) { pack(name, age, aliases); void pack(T& pack) { pack(init_t_, hostname_, pid_, start_t_, end_t_, avg_, total_iops_, total_bytes_); } }; void test_msgpack(){} } } // namespace gkfs::messagepack #endif // GKFS_COMMON_MSGPACK_HPP src/client/gkfs_functions.cpp +4 −2 Original line number Diff line number Diff line Loading @@ -37,7 +37,6 @@ #include <client/open_dir.hpp> #include <common/path_util.hpp> #include <common/msgpack_util.hpp> extern "C" { #include <dirent.h> // used for file types in the getdents{,64}() functions Loading Loading @@ -873,7 +872,7 @@ gkfs_dup2(const int oldfd, const int newfd) { ssize_t gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf, size_t count, off64_t offset, bool update_pos) { gkfs::msgpack::test_msgpack(); auto start_t = std::chrono::high_resolution_clock::now(); if(file->type() != gkfs::filemap::FileType::regular) { assert(file->type() == gkfs::filemap::FileType::directory); LOG(WARNING, "Cannot write to directory"); Loading Loading @@ -938,6 +937,7 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf, "gkfs::rpc::forward_write() wrote '{}' bytes instead of '{}'", write_size, count); } CTX->write_metrics().add_event(write_size, start_t); return write_size; // return written size } Loading Loading @@ -1046,6 +1046,7 @@ gkfs_writev(int fd, const struct iovec* iov, int iovcnt) { ssize_t gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf, size_t count, off64_t offset) { auto start_t = std::chrono::high_resolution_clock::now(); if(file->type() != gkfs::filemap::FileType::regular) { assert(file->type() == gkfs::filemap::FileType::directory); LOG(WARNING, "Cannot read from directory"); Loading Loading @@ -1082,6 +1083,7 @@ gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf, errno = err; return -1; } CTX->read_metrics().add_event(ret.second, start_t); // XXX check that we don't try to read past end of the file return ret.second; // return read size } Loading src/client/preload.cpp +8 −0 Original line number Diff line number Diff line Loading @@ -310,6 +310,14 @@ destroy_preload() { if(!forwarding_map_file.empty()) { destroy_forwarding_mapper(); } if(CTX->write_metrics().total_iops_ > 0) { CTX->write_metrics().flush_msgpack( "/tmp/gkfs_client_write_metrics.msgpack"); } if(CTX->read_metrics().total_iops_ > 0) { CTX->read_metrics().flush_msgpack( "/tmp/gkfs_client_read_metrics.msgpack"); } CTX->clear_hosts(); LOG(DEBUG, "Peer information deleted"); Loading Loading
CMakeLists.txt +1 −0 Original line number Diff line number Diff line Loading @@ -309,6 +309,7 @@ set(INCLUDE_DIR "${CMAKE_SOURCE_DIR}/include") include_directories( ${INCLUDE_DIR} ${CMAKE_BINARY_DIR}/include /usr/include ) add_subdirectory(src) Loading
include/client/preload_context.hpp +10 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ #include <hermes.hpp> #include <map> #include <mercury.h> #include <common/msgpack_util.hpp> #include <memory> #include <vector> #include <string> Loading Loading @@ -107,6 +108,9 @@ private: std::string hostname; int replicas_; gkfs::messagepack::ClientMetrics write_metrics_{}; gkfs::messagepack::ClientMetrics read_metrics_{}; public: static PreloadContext* getInstance() { Loading Loading @@ -223,6 +227,12 @@ public: int get_replicas(); messagepack::ClientMetrics& write_metrics(); messagepack::ClientMetrics& read_metrics(); }; } // namespace preload Loading
include/common/msgpack_util.hpp +38 −13 Original line number Diff line number Diff line Loading @@ -32,27 +32,52 @@ #include <msgpack/msgpack.hpp> #include <string> #include <vector> #include <chrono> #include <csignal> #include <thread> namespace gkfs::msgpack { namespace gkfs::messagepack { struct client_metrics { uint64_t total_bytes_read; uint64_t total_bytes_written; enum class client_metric_type { write, read }; class ClientMetrics { std::string name; int age; std::vector<std::string> aliases; public: // std::thread thread_{}; std::chrono::time_point<std::chrono::system_clock> init_t_; std::string hostname_; int pid_; // in milliseconds std::vector<double> start_t_{}; std::vector<double> end_t_{}; // in bytes per second std::vector<double> avg_{}; uint64_t total_bytes_{}; int total_iops_{0}; // public: ClientMetrics(); void add_event(size_t size, std::chrono::time_point<std::chrono::system_clock> start); void flush_msgpack(std::string path); template <class T> void msgpack(T &pack) { pack(name, age, aliases); void pack(T& pack) { pack(init_t_, hostname_, pid_, start_t_, end_t_, avg_, total_iops_, total_bytes_); } }; void test_msgpack(){} } } // namespace gkfs::messagepack #endif // GKFS_COMMON_MSGPACK_HPP
src/client/gkfs_functions.cpp +4 −2 Original line number Diff line number Diff line Loading @@ -37,7 +37,6 @@ #include <client/open_dir.hpp> #include <common/path_util.hpp> #include <common/msgpack_util.hpp> extern "C" { #include <dirent.h> // used for file types in the getdents{,64}() functions Loading Loading @@ -873,7 +872,7 @@ gkfs_dup2(const int oldfd, const int newfd) { ssize_t gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf, size_t count, off64_t offset, bool update_pos) { gkfs::msgpack::test_msgpack(); auto start_t = std::chrono::high_resolution_clock::now(); if(file->type() != gkfs::filemap::FileType::regular) { assert(file->type() == gkfs::filemap::FileType::directory); LOG(WARNING, "Cannot write to directory"); Loading Loading @@ -938,6 +937,7 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf, "gkfs::rpc::forward_write() wrote '{}' bytes instead of '{}'", write_size, count); } CTX->write_metrics().add_event(write_size, start_t); return write_size; // return written size } Loading Loading @@ -1046,6 +1046,7 @@ gkfs_writev(int fd, const struct iovec* iov, int iovcnt) { ssize_t gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf, size_t count, off64_t offset) { auto start_t = std::chrono::high_resolution_clock::now(); if(file->type() != gkfs::filemap::FileType::regular) { assert(file->type() == gkfs::filemap::FileType::directory); LOG(WARNING, "Cannot read from directory"); Loading Loading @@ -1082,6 +1083,7 @@ gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf, errno = err; return -1; } CTX->read_metrics().add_event(ret.second, start_t); // XXX check that we don't try to read past end of the file return ret.second; // return read size } Loading
src/client/preload.cpp +8 −0 Original line number Diff line number Diff line Loading @@ -310,6 +310,14 @@ destroy_preload() { if(!forwarding_map_file.empty()) { destroy_forwarding_mapper(); } if(CTX->write_metrics().total_iops_ > 0) { CTX->write_metrics().flush_msgpack( "/tmp/gkfs_client_write_metrics.msgpack"); } if(CTX->read_metrics().total_iops_ > 0) { CTX->read_metrics().flush_msgpack( "/tmp/gkfs_client_read_metrics.msgpack"); } CTX->clear_hosts(); LOG(DEBUG, "Peer information deleted"); Loading