From 97c53483e13e5400355a3cc9e60ec993e5a516bd Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Mon, 3 Aug 2020 18:22:30 +0200 Subject: [PATCH 1/4] Refactoring readdir - more error handling - removed runtime exception from client - Streamlining error handling in client and making it consistent. Adding comments - Improved readdir logging --- include/client/rpc/forward_data.hpp | 8 +- include/client/rpc/forward_metadata.hpp | 8 +- src/client/gkfs_functions.cpp | 121 ++++++++---- src/client/preload_util.cpp | 2 + src/client/rpc/forward_data.cpp | 138 +++++++------ src/client/rpc/forward_metadata.cpp | 251 ++++++++++++------------ src/daemon/handler/srv_metadata.cpp | 87 +++++--- 7 files changed, 357 insertions(+), 258 deletions(-) diff --git a/include/client/rpc/forward_data.hpp b/include/client/rpc/forward_data.hpp index a82e73cf9..378914c88 100644 --- a/include/client/rpc/forward_data.hpp +++ b/include/client/rpc/forward_data.hpp @@ -24,14 +24,14 @@ struct ChunkStat { unsigned long chunk_free; }; -ssize_t forward_write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset, - size_t write_size, int64_t updated_metadentry_size); +std::pair forward_write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset, + size_t write_size, int64_t updated_metadentry_size); -ssize_t forward_read(const std::string& path, void* buf, off64_t offset, size_t read_size); +std::pair forward_read(const std::string& path, void* buf, off64_t offset, size_t read_size); int forward_truncate(const std::string& path, size_t current_size, size_t new_size); -ChunkStat forward_get_chunk_stat(); +std::pair forward_get_chunk_stat(); } // namespace rpc } // namespace gkfs diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index 9475aaa4e..6e3a899b6 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -40,12 +40,12 @@ int forward_decr_size(const std::string& path, size_t length); int forward_update_metadentry(const std::string& path, const gkfs::metadata::Metadata& md, const gkfs::metadata::MetadentryUpdateFlags& md_flags); -int forward_update_metadentry_size(const std::string& path, size_t size, off64_t offset, bool append_flag, - off64_t& ret_size); +std::pair +forward_update_metadentry_size(const std::string& path, size_t size, off64_t offset, bool append_flag); -int forward_get_metadentry_size(const std::string& path, off64_t& ret_size); +std::pair forward_get_metadentry_size(const std::string& path); -void forward_get_dirents(gkfs::filemap::OpenDir& open_dir); +int forward_get_dirents(gkfs::filemap::OpenDir& open_dir); #ifdef HAS_SYMLINKS diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index fbef20130..917055775 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -197,7 +197,12 @@ int gkfs_create(const std::string& path, mode_t mode) { if (check_parent_dir(path)) { return -1; } - return gkfs::rpc::forward_create(path, mode); + auto err = gkfs::rpc::forward_create(path, mode); + if (err) { + errno = err; + return -1; + } + return 0; } /** @@ -211,7 +216,12 @@ int gkfs_remove(const std::string& path) { return -1; } bool has_data = S_ISREG(md->mode()) && (md->size() != 0); - return gkfs::rpc::forward_remove(path, !has_data, md->size()); + auto err = gkfs::rpc::forward_remove(path, !has_data, md->size()); + if (err) { + errno = err; + return -1; + } + return 0; } int gkfs_access(const std::string& path, const int mask, bool follow_links) { @@ -271,13 +281,15 @@ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask, #endif int gkfs_statfs(struct statfs* buf) { - gkfs::rpc::ChunkStat blk_stat{}; - try { - blk_stat = gkfs::rpc::forward_get_chunk_stat(); - } catch (const std::exception& e) { - LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + + auto ret = gkfs::rpc::forward_get_chunk_stat(); + auto err = ret.first; + if (err) { + LOG(ERROR, "{}() Failure with error: '{}'", err); + errno = err; return -1; } + auto blk_stat = ret.second; buf->f_type = 0; buf->f_bsize = blk_stat.chunk_size; buf->f_blocks = blk_stat.chunk_total; @@ -294,13 +306,14 @@ int gkfs_statfs(struct statfs* buf) { } int gkfs_statvfs(struct statvfs* buf) { - gkfs::rpc::ChunkStat blk_stat{}; - try { - blk_stat = gkfs::rpc::forward_get_chunk_stat(); - } catch (const std::exception& e) { - LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + auto ret = gkfs::rpc::forward_get_chunk_stat(); + auto err = ret.first; + if (err) { + LOG(ERROR, "{}() Failure with error: '{}'", err); + errno = err; return -1; } + auto blk_stat = ret.second; buf->f_bsize = blk_stat.chunk_size; buf->f_blocks = blk_stat.chunk_total; buf->f_bfree = blk_stat.chunk_free; @@ -329,15 +342,15 @@ off_t gkfs_lseek(shared_ptr gkfs_fd, off_t offset, unsi gkfs_fd->pos(gkfs_fd->pos() + offset); break; case SEEK_END: { - off64_t file_size; - auto err = gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path(), file_size); - - if (err < 0) { - errno = err; // Negative numbers are explicitly for error codes + auto ret = gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path()); + auto err = ret.first; + if (err) { + errno = err; return -1; } - - if (offset < 0 and file_size < -offset) { + + auto file_size = ret.second; + if (offset < 0 && file_size < -offset) { errno = EINVAL; return -1; } @@ -369,14 +382,17 @@ int gkfs_truncate(const std::string& path, off_t old_size, off_t new_size) { if (new_size == old_size) { return 0; } - - if (gkfs::rpc::forward_decr_size(path, new_size)) { + auto err = gkfs::rpc::forward_decr_size(path, new_size); + if (err) { LOG(DEBUG, "Failed to decrease size"); + errno = err; return -1; } - if (gkfs::rpc::forward_truncate(path, old_size, new_size)) { + err = gkfs::rpc::forward_truncate(path, old_size, new_size); + if (err) { LOG(DEBUG, "Failed to truncate data"); + errno = err; return -1; } return 0; @@ -427,19 +443,24 @@ ssize_t gkfs_pwrite(std::shared_ptr file, const char* b } auto path = make_shared(file->path()); auto append_flag = file->get_flag(gkfs::filemap::OpenFile_flags::append); - ssize_t ret = 0; - long updated_size = 0; - ret = gkfs::rpc::forward_update_metadentry_size(*path, count, offset, append_flag, updated_size); - if (ret != 0) { - LOG(ERROR, "update_metadentry_size() failed with ret {}", ret); - return ret; // ERR + auto ret_update_size = gkfs::rpc::forward_update_metadentry_size(*path, count, offset, append_flag); + auto err = ret_update_size.first; + if (err) { + LOG(ERROR, "update_metadentry_size() failed with err '{}'", err); + errno = err; + return -1; } - ret = gkfs::rpc::forward_write(*path, buf, append_flag, offset, count, updated_size); - if (ret < 0) { - LOG(WARNING, "gkfs::rpc::forward_write() failed with ret {}", ret); + auto updated_size = ret_update_size.second; + + auto ret_write = gkfs::rpc::forward_write(*path, buf, append_flag, offset, count, updated_size); + err = ret_write.first; + if (err) { + LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err); + errno = err; + return -1; } - return ret; // return written size or -1 as error + return ret_write.second; // return written size } ssize_t gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) { @@ -521,11 +542,14 @@ ssize_t gkfs_pread(std::shared_ptr file, char* buf, siz memset(buf, 0, sizeof(char) * count); } auto ret = gkfs::rpc::forward_read(file->path(), buf, offset, count); - if (ret < 0) { - LOG(WARNING, "gkfs::rpc::forward_read() failed with ret {}", ret); + auto err = ret.first; + if (err) { + LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'", err); + errno = err; + return -1; } // XXX check that we don't try to read past end of the file - return ret; // return read size or -1 as error + return ret.second; // return read size } ssize_t gkfs_read(int fd, void* buf, size_t count) { @@ -600,7 +624,11 @@ int gkfs_opendir(const std::string& path) { } auto open_dir = std::make_shared(path); - gkfs::rpc::forward_get_dirents(*open_dir); + auto err = gkfs::rpc::forward_get_dirents(*open_dir); + if (err) { + errno = err; + return -1; + } return CTX->file_map()->add(open_dir); } @@ -618,12 +646,21 @@ int gkfs_rmdir(const std::string& path) { } auto open_dir = std::make_shared(path); - gkfs::rpc::forward_get_dirents(*open_dir); + auto err = gkfs::rpc::forward_get_dirents(*open_dir); + if (err) { + errno = err; + return -1; + } if (open_dir->size() != 0) { errno = ENOTEMPTY; return -1; } - return gkfs::rpc::forward_remove(path, true, 0); + err = gkfs::rpc::forward_remove(path, true, 0); + if (err) { + errno = err; + return -1; + } + return 0; } int gkfs_getdents(unsigned int fd, @@ -777,8 +814,12 @@ int gkfs_mk_symlink(const std::string& path, const std::string& target_path) { errno = EEXIST; return -1; } - - return gkfs::rpc::forward_mk_symlink(path, target_path); + auto err = gkfs::rpc::forward_mk_symlink(path, target_path); + if (err) { + errno = err; + return -1; + } + return 0; } int gkfs_readlink(const std::string& path, char* buf, int bufsize) { diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 12c943572..e4cbcf8a3 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -77,6 +77,7 @@ std::shared_ptr get_metadata(const string& path, bool std::string attr; auto err = gkfs::rpc::forward_stat(path, attr); if (err) { + errno = err; return nullptr; } #ifdef HAS_SYMLINKS @@ -85,6 +86,7 @@ std::shared_ptr get_metadata(const string& path, bool while (md.is_link()) { err = gkfs::rpc::forward_stat(md.target_path(), attr); if (err) { + errno = err; return nullptr; } md = gkfs::metadata::Metadata{attr}; diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index 3c850a218..0821b1b63 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -26,13 +26,25 @@ using namespace std; namespace gkfs { namespace rpc { +/* + * This file includes all metadata RPC calls. + * NOTE: No errno is defined here! + */ + // TODO If we decide to keep this functionality with one segment, the function can be merged mostly. // Code is mostly redundant /** - * Sends an RPC request to a specific node to pull all chunks that belong to him + * Send an RPC request to write from a buffer. + * @param path + * @param buf + * @param append_flag + * @param in_offset + * @param write_size + * @param updated_metadentry_size + * @return pair */ -ssize_t forward_write(const string& path, const void* buf, const bool append_flag, +pair forward_write(const string& path, const void* buf, const bool append_flag, const off64_t in_offset, const size_t write_size, const int64_t updated_metadentry_size) { @@ -90,8 +102,7 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla } catch (const std::exception& ex) { LOG(ERROR, "Failed to expose buffers for RMA"); - errno = EBUSY; - return -1; + return make_pair(EBUSY, 0); } std::vector> handles; @@ -152,15 +163,14 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla } catch (const std::exception& ex) { LOG(ERROR, "Unable to send non-blocking rpc for " "path \"{}\" [peer: {}]", path, target); - errno = EBUSY; - return -1; + return make_pair(EBUSY, 0); } } // Wait for RPC responses and then get response and add it to out_size // which is the written size All potential outputs are served to free // resources regardless of errors, although an errorcode is set. - bool error = false; + auto err = 0; ssize_t out_size = 0; std::size_t idx = 0; @@ -172,8 +182,7 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla if (out.err() != 0) { LOG(ERROR, "Daemon reported error: {}", out.err()); - error = true; - errno = out.err(); + err = out.err(); } out_size += static_cast(out.io_size()); @@ -181,20 +190,30 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla } catch (const std::exception& ex) { LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]", path, targets[idx]); - error = true; - errno = EIO; + err = EIO; } - ++idx; + idx++; } - - return error ? -1 : out_size; + /* + * Typically file systems return the size even if only a part of it was written. + * In our case, we do not keep track which daemon fully wrote its workload. Thus, we always return size 0 on error. + */ + if (err) + return make_pair(err, 0); + else + return make_pair(0, out_size); } /** - * Sends an RPC request to a specific node to push all chunks that belong to him + * Send an RPC request to read to a buffer. + * @param path + * @param buf + * @param offset + * @param read_size + * @return pair */ -ssize_t forward_read(const string& path, void* buf, const off64_t offset, const size_t read_size) { +pair forward_read(const string& path, void* buf, const off64_t offset, const size_t read_size) { // Calculate chunkid boundaries and numbers so that daemons know in which // interval to look for chunks @@ -246,8 +265,7 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const } catch (const std::exception& ex) { LOG(ERROR, "Failed to expose buffers for RMA"); - errno = EBUSY; - return -1; + return make_pair(EBUSY, 0); } std::vector> handles; @@ -309,15 +327,14 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const } catch (const std::exception& ex) { LOG(ERROR, "Unable to send non-blocking rpc for path \"{}\" " "[peer: {}]", path, target); - errno = EBUSY; - return -1; + return make_pair(EBUSY, 0); } } // Wait for RPC responses and then get response and add it to out_size // which is the read size. All potential outputs are served to free // resources regardless of errors, although an errorcode is set. - bool error = false; + auto err = 0; ssize_t out_size = 0; std::size_t idx = 0; @@ -329,8 +346,7 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const if (out.err() != 0) { LOG(ERROR, "Daemon reported error: {}", out.err()); - error = true; - errno = out.err(); + err = out.err(); } out_size += static_cast(out.io_size()); @@ -338,20 +354,30 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const } catch (const std::exception& ex) { LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]", path, targets[idx]); - error = true; - errno = EIO; + err = EIO; } - - ++idx; + idx++; } - - return error ? -1 : out_size; + /* + * Typically file systems return the size even if only a part of it was read. + * In our case, we do not keep track which daemon fully read its workload. Thus, we always return size 0 on error. + */ + if (err) + return make_pair(err, 0); + else + return make_pair(0, out_size); } +/** + * Send an RPC request to truncate a file to given new size + * @param path + * @param current_size + * @param new_size + * @return error code + */ int forward_truncate(const std::string& path, size_t current_size, size_t new_size) { assert(current_size > new_size); - bool error = false; // Find out which data servers need to delete data chunks in order to // contact only them @@ -366,6 +392,8 @@ int forward_truncate(const std::string& path, size_t current_size, size_t new_si std::vector> handles; + auto err = 0; + for (const auto& host: hosts) { auto endp = CTX->hosts().at(host); @@ -386,44 +414,41 @@ int forward_truncate(const std::string& path, size_t current_size, size_t new_si // TODO(amiranda): we should cancel all previously posted requests // here, unfortunately, Hermes does not support it yet :/ LOG(ERROR, "Failed to send request to host: {}", host); - errno = EIO; - return -1; + err = EIO; + break; // We need to gather all responses so we can't return here } } // Wait for RPC responses and then get response for (const auto& h : handles) { - try { // XXX We might need a timeout here to not wait forever for an // output that never comes? auto out = h.get().at(0); - if (out.err() != 0) { + if (out.err()) { LOG(ERROR, "received error response: {}", out.err()); - error = true; - errno = EIO; + err = EIO; } } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - error = true; - errno = EIO; + err = EIO; } } - - return error ? -1 : 0; + return err ? err : 0; } /** - * Performs a chunk stat RPC to all hosts - * @return rpc::ChunkStat - * @throws std::runtime_error + * Send an RPC request to chunk stat all hosts + * @return pair */ -ChunkStat forward_get_chunk_stat() { +pair forward_get_chunk_stat() { std::vector> handles; + auto err = 0; + for (const auto& endp : CTX->hosts()) { try { LOG(DEBUG, "Sending RPC to host: {}", endp.to_string()); @@ -441,7 +466,8 @@ ChunkStat forward_get_chunk_stat() { // TODO(amiranda): we should cancel all previously posted requests // here, unfortunately, Hermes does not support it yet :/ LOG(ERROR, "Failed to send request to host: {}", endp.to_string()); - throw std::runtime_error("Failed to forward non-blocking rpc request"); + err = EBUSY; + break; // We need to gather all responses so we can't return here } } @@ -449,8 +475,6 @@ ChunkStat forward_get_chunk_stat() { unsigned long chunk_total = 0; unsigned long chunk_free = 0; - int error = 0; - // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { @@ -461,10 +485,10 @@ ChunkStat forward_get_chunk_stat() { // output that never comes? out = handles[i].get().at(0); - if (out.err() != 0) { - error = out.err(); + if (out.err()) { + err = out.err(); LOG(ERROR, "Host '{}' reported err code '{}' during stat chunk.", CTX->hosts().at(i).to_string(), - error); + err); // we don't break here to ensure all responses are processed continue; } @@ -472,16 +496,14 @@ ChunkStat forward_get_chunk_stat() { chunk_total += out.chunk_total(); chunk_free += out.chunk_free(); } catch (const std::exception& ex) { - errno = EBUSY; - throw std::runtime_error(fmt::format("Failed to get RPC output from host: {}", i)); + LOG(ERROR, "Failed to get RPC output from host: {}", i); + err = EBUSY; } } - if (error != 0) { - errno = error; - throw std::runtime_error("chunk stat failed on one host"); - } - - return {chunk_size, chunk_total, chunk_free}; + if (err) + return make_pair(err, ChunkStat{}); + else + return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free}); } } // namespace rpc diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 884e7ee14..217d845d6 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -27,9 +27,19 @@ using namespace std; namespace gkfs { namespace rpc { +/* + * This file includes all metadata RPC calls. + * NOTE: No errno is defined here! + */ + +/** + * Send an RPC for a create request + * @param path + * @param mode + * @return error code + */ int forward_create(const std::string& path, const mode_t mode) { - int err = EUNKNOWN; auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); try { @@ -40,23 +50,21 @@ int forward_create(const std::string& path, const mode_t mode) { // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ auto out = ld_network_service->post(endp, path, mode).get().at(0); - err = out.err(); - LOG(DEBUG, "Got response success: {}", err); - - if (out.err()) { - errno = out.err(); - return -1; - } + LOG(DEBUG, "Got response success: {}", out.err()); + return out.err() ? out.err() : 0; } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - return -1; + return EBUSY; } - - return err; } +/** + * Send an RPC for a stat request + * @param path + * @param attr + * @return error code + */ int forward_stat(const std::string& path, string& attr) { auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); @@ -71,30 +79,32 @@ int forward_stat(const std::string& path, string& attr) { auto out = ld_network_service->post(endp, path).get().at(0); LOG(DEBUG, "Got response success: {}", out.err()); - if (out.err() != 0) { - errno = out.err(); - return -1; - } + if (out.err()) + return out.err(); attr = out.db_val(); - return 0; - } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - return -1; + return EBUSY; } - return 0; } +/** + * Send an RPC for a remove request. This removes metadata and all data chunks possible distributed across many daemons. + * Optimizations are in place for small files (file_size / chunk_size) < number_of_daemons where no broadcast to all + * daemons is used to remove all chunks. Otherwise, a broadcast to all daemons is used. + * @param path + * @param remove_metadentry_only + * @param size + * @return error code + */ int forward_remove(const std::string& path, const bool remove_metadentry_only, const ssize_t size) { // if only the metadentry should be removed, send one rpc to the // metadentry's responsible node to remove the metadata // else, send an rpc to all hosts and thus broadcast chunk_removal. if (remove_metadentry_only) { - auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); try { @@ -109,20 +119,11 @@ int forward_remove(const std::string& path, const bool remove_metadentry_only, c LOG(DEBUG, "Got response success: {}", out.err()); - if (out.err() != 0) { - errno = out.err(); - return -1; - } - - return 0; - + return out.err() ? out.err() : 0; } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - return -1; + return EBUSY; } - - return 0; } std::vector> handles; @@ -155,9 +156,8 @@ int forward_remove(const std::string& path, const bool remove_metadentry_only, c handles.emplace_back(ld_network_service->post(endp_chnk, in)); } } catch (const std::exception& ex) { - LOG(ERROR, "Failed to send reduced remove requests"); - throw std::runtime_error( - "Failed to forward non-blocking rpc request"); + LOG(ERROR, "Failed to forward non-blocking rpc request reduced remove requests"); + return EBUSY; } } else { // "Big" files for (const auto& endp : CTX->hosts()) { @@ -179,18 +179,15 @@ int forward_remove(const std::string& path, const bool remove_metadentry_only, c } catch (const std::exception& ex) { // TODO(amiranda): we should cancel all previously posted requests // here, unfortunately, Hermes does not support it yet :/ - LOG(ERROR, "Failed to send request to host: {}", + LOG(ERROR, "Failed to forward non-blocking rpc request to host: {}", endp.to_string()); - throw std::runtime_error( - "Failed to forward non-blocking rpc request"); + return EBUSY; } } } // wait for RPC responses - bool got_error = false; - + auto err = 0; for (const auto& h : handles) { - try { // XXX We might need a timeout here to not wait forever for an // output that never comes? @@ -198,25 +195,27 @@ int forward_remove(const std::string& path, const bool remove_metadentry_only, c if (out.err() != 0) { LOG(ERROR, "received error response: {}", out.err()); - got_error = true; - errno = out.err(); + err = out.err(); } } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - got_error = true; - errno = EBUSY; + err = EBUSY; } } - - return got_error ? -1 : 0; + return err; } +/** + * Send an RPC for a decrement file size request. This is for example used during a truncate() call. + * @param path + * @param length + * @return error code + */ int forward_decr_size(const std::string& path, size_t length) { auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); try { - LOG(DEBUG, "Sending RPC ..."); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) @@ -227,27 +226,28 @@ int forward_decr_size(const std::string& path, size_t length) { LOG(DEBUG, "Got response success: {}", out.err()); - if (out.err() != 0) { - errno = out.err(); - return -1; - } - - return 0; - + return out.err() ? out.err() : 0; } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - return -1; + return EBUSY; } } + +/** + * Send an RPC for an update metadentry request. + * NOTE: Currently unused. + * @param path + * @param md + * @param md_flags + * @return error code + */ int forward_update_metadentry(const string& path, const gkfs::metadata::Metadata& md, const gkfs::metadata::MetadentryUpdateFlags& md_flags) { auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); try { - LOG(DEBUG, "Sending RPC ..."); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) @@ -276,28 +276,27 @@ int forward_update_metadentry(const string& path, const gkfs::metadata::Metadata LOG(DEBUG, "Got response success: {}", out.err()); - if (out.err() != 0) { - errno = out.err(); - return -1; - } - - return 0; - + return out.err() ? out.err() : 0; } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - return -1; + return EBUSY; } } -int -forward_update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, - off64_t& ret_size) { +/** + * Send an RPC request for an update to the file size. + * This is called during a write() call or similar + * @param path + * @param size + * @param offset + * @param append_flag + * @return pair + */ +pair +forward_update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag) { auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); - try { - LOG(DEBUG, "Sending RPC ..."); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) @@ -310,30 +309,27 @@ forward_update_metadentry_size(const string& path, const size_t size, const off6 LOG(DEBUG, "Got response success: {}", out.err()); - if (out.err() != 0) { - errno = out.err(); - return -1; - } - - ret_size = out.ret_size(); - return out.err(); - - return 0; - + if (out.err()) + return make_pair(out.err(), 0); + else + return make_pair(0, out.ret_size()); } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - ret_size = 0; - return EUNKNOWN; + return make_pair(EBUSY, 0); } } -int forward_get_metadentry_size(const std::string& path, off64_t& ret_size) { +/** + * Send an RPC request to get the current file size. + * This is called during a lseek() call + * @param path + * @return pair + */ +pair forward_get_metadentry_size(const std::string& path) { auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); try { - LOG(DEBUG, "Sending RPC ..."); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) @@ -344,21 +340,24 @@ int forward_get_metadentry_size(const std::string& path, off64_t& ret_size) { LOG(DEBUG, "Got response success: {}", out.err()); - ret_size = out.ret_size(); - return out.err(); - + if (out.err()) + return make_pair(out.err(), 0); + else + return make_pair(0, out.ret_size()); } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - ret_size = 0; - return EUNKNOWN; + return make_pair(EBUSY, 0); } } /** - * Sends an RPC request to a specific node to push all chunks that belong to him + * Send an RPC request to receive all entries of a directory. + * @param open_dir + * @return error code */ -void forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { +int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { + + LOG(DEBUG, "{}() enter for path '{}'", __func__, open_dir.path()) auto const root_dir = open_dir.path(); auto const targets = CTX->distributor()->locate_directory_metadata(root_dir); @@ -389,7 +388,8 @@ void forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { }, hermes::access_mode::write_only)); } catch (const std::exception& ex) { - throw std::runtime_error("Failed to expose buffers for RMA"); + LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__, ex.what()); + return EBUSY; } } @@ -398,24 +398,26 @@ void forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { for (std::size_t i = 0; i < targets.size(); ++i) { - LOG(DEBUG, "target_host: {}", targets[i]); - // Setup rpc input parameters for each host auto endp = CTX->hosts().at(targets[i]); gkfs::rpc::get_dirents::input in(root_dir, exposed_buffers[i]); try { - - LOG(DEBUG, "Sending RPC to host: {}", targets[i]); + LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]); handles.emplace_back(ld_network_service->post(endp, in)); } catch (const std::exception& ex) { - LOG(ERROR, "Unable to send non-blocking get_dirents() " - "on {} [peer: {}]", root_dir, targets[i]); - throw std::runtime_error("Failed to post non-blocking RPC request"); + LOG(ERROR, "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'", __func__, root_dir, + targets[i], ex.what()); + return EBUSY; } } + LOG(INFO, + "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize", + __func__, open_dir.path(), targets.size(), per_host_buff_size); + + auto err = 0; // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { @@ -427,15 +429,19 @@ void forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { out = handles[i].get().at(0); if (out.err() != 0) { - throw std::runtime_error( - fmt::format("Failed to retrieve dir entries from " - "host '{}'. Error '{}', path '{}'", - targets[i], strerror(out.err()), root_dir)); + LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'", __func__, + targets[i], + strerror(out.err()), root_dir); + err = out.err(); + // We need to gather all responses before exiting + continue; } } catch (const std::exception& ex) { - throw std::runtime_error( - fmt::format("Failed to get rpc output.. [path: {}, " - "target host: {}]", root_dir, targets[i])); + LOG(ERROR, "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'", __func__, root_dir, + targets[i], ex.what()); + err = EBUSY; + // We need to gather all responses before exiting + continue; } // each server wrote information to its pre-defined region in @@ -445,8 +451,7 @@ void forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { void* base_ptr = exposed_buffers[i].begin()->data(); bool* bool_ptr = reinterpret_cast(base_ptr); - char* names_ptr = reinterpret_cast(base_ptr) + - (out.dirents_size() * sizeof(bool)); + char* names_ptr = reinterpret_cast(base_ptr) + (out.dirents_size() * sizeof(bool)); for (std::size_t j = 0; j < out.dirents_size(); j++) { @@ -459,21 +464,28 @@ void forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { assert(static_cast(names_ptr - reinterpret_cast(base_ptr)) < per_host_buff_size); auto name = std::string(names_ptr); + // number of characters in entry + \0 terminator names_ptr += name.size() + 1; open_dir.add(name, ftype); } } + return err; } #ifdef HAS_SYMLINKS +/** + * Send an RPC request to create a symlink. + * @param path + * @param target_path + * @return error code + */ int forward_mk_symlink(const std::string& path, const std::string& target_path) { auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); try { - LOG(DEBUG, "Sending RPC ..."); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) @@ -484,17 +496,10 @@ int forward_mk_symlink(const std::string& path, const std::string& target_path) LOG(DEBUG, "Got response success: {}", out.err()); - if (out.err() != 0) { - errno = out.err(); - return -1; - } - - return 0; - + return out.err() ? out.err() : 0; } catch (const std::exception& ex) { LOG(ERROR, "while getting rpc output"); - errno = EBUSY; - return -1; + return EBUSY; } } diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 2e19ac3d1..43e88b8d5 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -293,6 +293,8 @@ DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_metadentry_size) static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { rpc_get_dirents_in_t in{}; rpc_get_dirents_out_t out{}; + out.err = EIO; + out.dirents_size = 0; hg_bulk_t bulk_handle = nullptr; // Get input parmeters @@ -300,24 +302,31 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Could not get RPC input data with err '{}'", __func__, ret); - return ret; + out.err = EBUSY; + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } // Retrieve size of source buffer auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); - GKFS_DATA->spdlogger()->debug( - "{}() Got dirents RPC with path '{}'", __func__, in.path); auto bulk_size = margo_bulk_get_size(in.bulk_handle); + GKFS_DATA->spdlogger()->debug("{}() Got RPC: path '{}' bulk_size '{}' ", __func__, in.path, bulk_size); //Get directory entries from local DB - std::vector> entries = gkfs::metadata::get_dirents(in.path); + vector> entries{}; + try { + entries = gkfs::metadata::get_dirents(in.path); + } catch (const ::exception& e) { + GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'", __func__, e.what()); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } - out.dirents_size = entries.size(); + GKFS_DATA->spdlogger()->trace("{}() path '{}' Read database with '{}' entries", __func__, in.path, + entries.size()); if (entries.empty()) { out.err = 0; - return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } //Calculate total output size @@ -327,53 +336,73 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { tot_names_size += e.first.size(); } + // tot_names_size (# characters in entry) + # entries * (bool size + char size for \0 character) size_t out_size = tot_names_size + entries.size() * (sizeof(bool) + sizeof(char)); if (bulk_size < out_size) { //Source buffer is smaller than total output size - GKFS_DATA->spdlogger()->error("{}() Entries do not fit source buffer", __func__); + GKFS_DATA->spdlogger()->error( + "{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!", __func__, + bulk_size, out_size); out.err = ENOBUFS; + return gkfs::rpc::cleanup_respond(&handle, &in, &out); + } + + void* bulk_buf; //buffer for bulk transfer + // create bulk handle and allocated memory for buffer with out_size information + ret = margo_bulk_create(mid, 1, nullptr, &out_size, HG_BULK_READ_ONLY, &bulk_handle); + if (ret != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__); + return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); + } + // access the internally allocated memory buffer and put it into bulk_buf + uint32_t actual_count; // number of segments. we use one here because we push the whole buffer at once + ret = margo_bulk_access(bulk_handle, 0, out_size, HG_BULK_READ_ONLY, 1, &bulk_buf, + &out_size, &actual_count); + if (ret != HG_SUCCESS || actual_count != 1) { + GKFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } - //Serialize output data on local buffer - auto out_buff = std::make_unique(out_size); - char* out_buff_ptr = out_buff.get(); + GKFS_DATA->spdlogger()->trace( + "{}() path '{}' entries '{}' out_size '{}'. Set up local read only bulk handle and allocated buffer with size '{}'", + __func__, in.path, entries.size(), out_size, out_size); + //Serialize output data on local buffer + auto out_buff_ptr = static_cast(bulk_buf); auto bool_ptr = reinterpret_cast(out_buff_ptr); - char* names_ptr = out_buff_ptr + entries.size(); + auto names_ptr = out_buff_ptr + entries.size(); + for (auto const& e: entries) { + if (e.first.empty()) { + GKFS_DATA->spdlogger()->warn("{}() Entry in readdir() empty. If this shows up, something else is very wrong.", __func__); + } *bool_ptr = e.second; bool_ptr++; - const char* name = e.first.c_str(); - std::strcpy(names_ptr, name); + const auto name = e.first.c_str(); + ::strcpy(names_ptr, name); + // number of characters + \0 terminator names_ptr += e.first.size() + 1; } - ret = margo_bulk_create(mid, 1, reinterpret_cast(&out_buff_ptr), &out_size, HG_BULK_READ_ONLY, - &bulk_handle); - if (ret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__); - out.err = EBUSY; - return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); - } + GKFS_DATA->spdlogger()->trace( + "{}() path '{}' entries '{}' out_size '{}'. Copied data to bulk_buffer. NEXT bulk_transfer", __func__, + in.path, entries.size(), out_size); - ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, - in.bulk_handle, 0, - bulk_handle, 0, - out_size); + ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, out_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Failed push dirents on path '{}' to client", - __func__, in.path - ); + "{}() Failed push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'", __func__, + entries.size(), in.path, bulk_size, out_size); + out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } out.dirents_size = entries.size(); out.err = 0; - GKFS_DATA->spdlogger()->debug( - "{}() Sending output response", __func__); + GKFS_DATA->spdlogger()->debug("{}() Sending output response err '{}' dirents_size '{}'. DONE", __func__, + out.err, + out.dirents_size); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } -- GitLab From 79d6fa4da07d3267b2536b516bc0afc4c6c56d9b Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Mon, 3 Aug 2020 17:32:56 +0200 Subject: [PATCH 2/4] Removing static keywords from handler functions --- src/daemon/handler/srv_data.cpp | 27 ++++++++---- src/daemon/handler/srv_management.cpp | 14 ++++-- src/daemon/handler/srv_metadata.cpp | 61 ++++++++++++++++++--------- 3 files changed, 70 insertions(+), 32 deletions(-) diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 27863fd4b..63ff3c53a 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -32,12 +32,18 @@ using namespace std; +/* + * This file contains all Margo RPC handlers that are concerning management operations + */ + +namespace { + /** * RPC handler for an incoming write RPC * @param handle * @return */ -static hg_return_t rpc_srv_write(hg_handle_t handle) { +hg_return_t rpc_srv_write(hg_handle_t handle) { /* * 1. Setup */ @@ -182,7 +188,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { // origin offset of a chunk is dependent on a given offset in a write operation if (in.offset > 0) origin_offset = (gkfs::config::rpc::chunksize - in.offset) + - ((chnk_id_file - in.chunk_start) - 1) * gkfs::config::rpc::chunksize; + ((chnk_id_file - in.chunk_start) - 1) * gkfs::config::rpc::chunksize; else origin_offset = (chnk_id_file - in.chunk_start) * gkfs::config::rpc::chunksize; // last chunk might have different transfer_size @@ -245,14 +251,12 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_write) - /** * RPC handler for an incoming read RPC * @param handle * @return */ -static hg_return_t rpc_srv_read(hg_handle_t handle) { +hg_return_t rpc_srv_read(hg_handle_t handle) { /* * 1. Setup */ @@ -438,14 +442,13 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_read) /** * RPC handler for an incoming truncate RPC * @param handle * @return */ -static hg_return_t rpc_srv_truncate(hg_handle_t handle) { +hg_return_t rpc_srv_truncate(hg_handle_t handle) { rpc_trunc_in_t in{}; rpc_err_out_t out{}; out.err = EIO; @@ -474,14 +477,13 @@ static hg_return_t rpc_srv_truncate(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate) /** * RPC handler for an incoming chunk stat RPC * @param handle * @return */ -static hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { +hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() enter", __func__); rpc_chunk_stat_out_t out{}; out.err = EIO; @@ -503,6 +505,13 @@ static hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &out); } +} + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_write) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_read) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate) DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat) #ifdef GKFS_ENABLE_AGIOS diff --git a/src/daemon/handler/srv_management.cpp b/src/daemon/handler/srv_management.cpp index 79d0cf5ee..42752f83d 100644 --- a/src/daemon/handler/srv_management.cpp +++ b/src/daemon/handler/srv_management.cpp @@ -17,10 +17,15 @@ #include - using namespace std; -static hg_return_t rpc_srv_get_fs_config(hg_handle_t handle) { +/* + * This file contains all Margo RPC handlers that are concerning data operations + */ + +namespace { + +hg_return_t rpc_srv_get_fs_config(hg_handle_t handle) { rpc_config_out_t out{}; GKFS_DATA->spdlogger()->debug("{}() Got config RPC", __func__); @@ -47,4 +52,7 @@ static hg_return_t rpc_srv_get_fs_config(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_fs_config) \ No newline at end of file +} + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_fs_config) + diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 43e88b8d5..2a9a188fb 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -22,7 +22,13 @@ using namespace std; -static hg_return_t rpc_srv_create(hg_handle_t handle) { +/* + * This file contains all Margo RPC handlers that are concerning metadata operations + */ + +namespace { + +hg_return_t rpc_srv_create(hg_handle_t handle) { rpc_mk_node_in_t in; rpc_err_out_t out; @@ -52,9 +58,8 @@ static hg_return_t rpc_srv_create(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_create) -static hg_return_t rpc_srv_stat(hg_handle_t handle) { +hg_return_t rpc_srv_stat(hg_handle_t handle) { rpc_path_only_in_t in{}; rpc_stat_out_t out{}; auto ret = margo_get_input(handle, &in); @@ -89,9 +94,8 @@ static hg_return_t rpc_srv_stat(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_stat) -static hg_return_t rpc_srv_decr_size(hg_handle_t handle) { +hg_return_t rpc_srv_decr_size(hg_handle_t handle) { rpc_trunc_in_t in{}; rpc_err_out_t out{}; @@ -123,9 +127,8 @@ static hg_return_t rpc_srv_decr_size(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_decr_size) -static hg_return_t rpc_srv_remove(hg_handle_t handle) { +hg_return_t rpc_srv_remove(hg_handle_t handle) { rpc_rm_node_in_t in{}; rpc_err_out_t out{}; @@ -162,10 +165,8 @@ static hg_return_t rpc_srv_remove(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove) - -static hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { +hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { rpc_update_metadentry_in_t in{}; rpc_err_out_t out{}; @@ -211,9 +212,8 @@ static hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry) -static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { +hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { rpc_update_metadentry_size_in_t in{}; rpc_update_metadentry_size_out_t out{}; @@ -251,9 +251,8 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry_size) -static hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { +hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { rpc_path_only_in_t in{}; rpc_get_metadentry_size_out_t out{}; @@ -288,9 +287,8 @@ static hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { return HG_SUCCESS; } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_metadentry_size) -static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { +hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { rpc_get_dirents_in_t in{}; rpc_get_dirents_out_t out{}; out.err = EIO; @@ -392,7 +390,8 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, out_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Failed push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'", __func__, + "{}() Failed to push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'", + __func__, entries.size(), in.path, bulk_size, out_size); out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); @@ -406,11 +405,11 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } -DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents) + #ifdef HAS_SYMLINKS -static hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { +hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { rpc_mk_symlink_in_t in{}; rpc_err_out_t out{}; @@ -441,6 +440,28 @@ static hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { return HG_SUCCESS; } +#endif + +} + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_create) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_stat) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_decr_size) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry_size) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_metadentry_size) + +DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents) + +#ifdef HAS_SYMLINKS + DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink) -#endif +#endif \ No newline at end of file -- GitLab From a7d12ebf608c93a94994d1bd6936332442bb23bf Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Mon, 3 Aug 2020 17:33:15 +0200 Subject: [PATCH 3/4] Adding doxygen comments to gkfs_functions --- src/client/gkfs_functions.cpp | 257 ++++++++++++++++++++++++++-- src/client/preload_util.cpp | 8 +- src/client/rpc/forward_metadata.cpp | 2 - 3 files changed, 254 insertions(+), 13 deletions(-) diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 917055775..963b45f5f 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -61,6 +61,12 @@ struct linux_dirent64 { namespace { +/** + * Checks if metadata for parent directory exists (can be disabled with CREATE_CHECK_PARENTS). + * errno may be set + * @param path + * @return 0 on success, -1 on failure + */ int check_parent_dir(const std::string& path) { #if CREATE_CHECK_PARENTS auto p_comp = gkfs::path::dirname(path); @@ -86,6 +92,14 @@ int check_parent_dir(const std::string& path) { namespace gkfs { namespace syscall { +/** + * gkfs wrapper for open() system calls + * errno may be set + * @param path + * @param mode + * @param flags + * @return 0 on success, -1 on failure + */ int gkfs_open(const std::string& path, mode_t mode, int flags) { if (flags & O_PATH) { @@ -171,6 +185,13 @@ int gkfs_open(const std::string& path, mode_t mode, int flags) { return CTX->file_map()->add(std::make_shared(path, flags)); } +/** + * Wrapper function for file/directory creation + * errno may be set + * @param path + * @param mode + * @return 0 on success, -1 on failure + */ int gkfs_create(const std::string& path, mode_t mode) { //file type must be set @@ -206,9 +227,10 @@ int gkfs_create(const std::string& path, mode_t mode) { } /** - * This sends internally a broadcast (i.e. n RPCs) to clean their chunk folders for that path + * gkfs wrapper for unlink() system calls + * errno may be set * @param path - * @return + * @return 0 on success, -1 on failure */ int gkfs_remove(const std::string& path) { auto md = gkfs::util::get_metadata(path); @@ -224,6 +246,14 @@ int gkfs_remove(const std::string& path) { return 0; } +/** + * gkfs wrapper for access() system calls + * errno may be set + * @param path + * @param mask + * @param follow_links + * @return 0 on success, -1 on failure + */ int gkfs_access(const std::string& path, const int mask, bool follow_links) { auto md = gkfs::util::get_metadata(path, follow_links); if (!md) { @@ -233,6 +263,14 @@ int gkfs_access(const std::string& path, const int mask, bool follow_links) { return 0; } +/** + * gkfs wrapper for stat() system calls + * errno may be set + * @param path + * @param buf + * @param follow_links + * @return 0 on success, -1 on failure + */ int gkfs_stat(const string& path, struct stat* buf, bool follow_links) { auto md = gkfs::util::get_metadata(path, follow_links); if (!md) { @@ -243,6 +281,18 @@ int gkfs_stat(const string& path, struct stat* buf, bool follow_links) { } #ifdef STATX_TYPE + +/** + * gkfs wrapper for statx() system calls + * errno may be set + * @param dirfs + * @param path + * @param flags + * @param mask + * @param buf + * @param follow_links + * @return 0 on success, -1 on failure + */ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask, struct statx* buf, bool follow_links) { auto md = gkfs::util::get_metadata(path, follow_links); if (!md) { @@ -270,16 +320,22 @@ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask, buf->stx_mtime.tv_sec = tmp.st_mtim.tv_sec; buf->stx_mtime.tv_nsec = tmp.st_mtim.tv_nsec; - + buf->stx_ctime.tv_sec = tmp.st_ctim.tv_sec; buf->stx_ctime.tv_nsec = tmp.st_ctim.tv_nsec; buf->stx_btime = buf->stx_atime; - + return 0; } #endif +/** + * gkfs wrapper for statfs() system calls + * errno may be set + * @param buf + * @return 0 on success, -1 on failure + */ int gkfs_statfs(struct statfs* buf) { auto ret = gkfs::rpc::forward_get_chunk_stat(); @@ -305,6 +361,15 @@ int gkfs_statfs(struct statfs* buf) { return 0; } +/** + * gkfs wrapper for statvfs() system calls + * errno may be set + * + * NOTE: Currently unused. + * + * @param buf + * @return 0 on success, -1 on failure + */ int gkfs_statvfs(struct statvfs* buf) { auto ret = gkfs::rpc::forward_get_chunk_stat(); auto err = ret.first; @@ -329,10 +394,26 @@ int gkfs_statvfs(struct statvfs* buf) { return 0; } +/** + * gkfs wrapper for lseek() system calls with available file descriptor + * errno may be set + * @param fd + * @param offset + * @param whence + * @return 0 on success, -1 on failure + */ off_t gkfs_lseek(unsigned int fd, off_t offset, unsigned int whence) { return gkfs_lseek(CTX->file_map()->get(fd), offset, whence); } +/** + * gkfs wrapper for lseek() system calls with available shared ptr to gkfs FileMap + * errno may be set + * @param gkfs_fd + * @param offset + * @param whence + * @return 0 on success, -1 on failure + */ off_t gkfs_lseek(shared_ptr gkfs_fd, off_t offset, unsigned int whence) { switch (whence) { case SEEK_SET: @@ -375,6 +456,14 @@ off_t gkfs_lseek(shared_ptr gkfs_fd, off_t offset, unsi return gkfs_fd->pos(); } +/** + * wrapper function for gkfs_truncate + * errno may be set + * @param path + * @param old_size + * @param new_size + * @return 0 on success, -1 on failure + */ int gkfs_truncate(const std::string& path, off_t old_size, off_t new_size) { assert(new_size >= 0); assert(new_size <= old_size); @@ -398,6 +487,13 @@ int gkfs_truncate(const std::string& path, off_t old_size, off_t new_size) { return 0; } +/** + * gkfs wrapper for truncate() system calls + * errno may be set + * @param path + * @param length + * @return 0 on success, -1 on failure + */ int gkfs_truncate(const std::string& path, off_t length) { /* TODO CONCURRENCY: * At the moment we first ask the length to the metadata-server in order to @@ -426,14 +522,36 @@ int gkfs_truncate(const std::string& path, off_t length) { return gkfs_truncate(path, size, length); } +/** + * gkfs wrapper for dup() system calls + * errno may be set + * @param oldfd + * @return file descriptor int or -1 on error + */ int gkfs_dup(const int oldfd) { return CTX->file_map()->dup(oldfd); } +/** + * gkfs wrapper for dup2() system calls + * errno may be set + * @param oldfd + * @param newfd + * @return file descriptor int or -1 on error + */ int gkfs_dup2(const int oldfd, const int newfd) { return CTX->file_map()->dup2(oldfd, newfd); } +/** + * Wrapper function for all gkfs write operations + * errno may be set + * @param file + * @param buf + * @param count + * @param offset + * @return written size or -1 on error + */ ssize_t gkfs_pwrite(std::shared_ptr file, const char* buf, size_t count, off64_t offset) { if (file->type() != gkfs::filemap::FileType::regular) { assert(file->type() == gkfs::filemap::FileType::directory); @@ -463,16 +581,28 @@ ssize_t gkfs_pwrite(std::shared_ptr file, const char* b return ret_write.second; // return written size } +/** + * gkfs wrapper for pwrite() system calls + * errno may be set + * @param fd + * @param buf + * @param count + * @param offset + * @return written size or -1 on error + */ ssize_t gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) { auto file = CTX->file_map()->get(fd); return gkfs_pwrite(file, reinterpret_cast(buf), count, offset); } -/* Write counts bytes starting from current file position - * It also update the file position accordingly - * - * Same as write syscall. -*/ +/** + * gkfs wrapper for write() system calls + * errno may be set + * @param fd + * @param buf + * @param count + * @return written size or -1 on error + */ ssize_t gkfs_write(int fd, const void* buf, size_t count) { auto gkfs_fd = CTX->file_map()->get(fd); auto pos = gkfs_fd->pos(); //retrieve the current offset @@ -486,6 +616,15 @@ ssize_t gkfs_write(int fd, const void* buf, size_t count) { return ret; } +/** + * gkfs wrapper for pwritev() system calls + * errno may be set + * @param fd + * @param iov + * @param iovcnt + * @param offset + * @return written size or -1 on error + */ ssize_t gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) { auto file = CTX->file_map()->get(fd); @@ -516,6 +655,14 @@ ssize_t gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) return written; } +/** + * gkfs wrapper for writev() system calls + * errno may be set + * @param fd + * @param iov + * @param iovcnt + * @return written size or -1 on error + */ ssize_t gkfs_writev(int fd, const struct iovec* iov, int iovcnt) { auto gkfs_fd = CTX->file_map()->get(fd); @@ -529,6 +676,14 @@ ssize_t gkfs_writev(int fd, const struct iovec* iov, int iovcnt) { return ret; } +/** + * Wrapper function for all gkfs read operations + * @param file + * @param buf + * @param count + * @param offset + * @return read size or -1 on error + */ ssize_t gkfs_pread(std::shared_ptr file, char* buf, size_t count, off64_t offset) { if (file->type() != gkfs::filemap::FileType::regular) { assert(file->type() == gkfs::filemap::FileType::directory); @@ -552,6 +707,14 @@ ssize_t gkfs_pread(std::shared_ptr file, char* buf, siz return ret.second; // return read size } +/** + * gkfs wrapper for read() system calls + * errno may be set + * @param fd + * @param buf + * @param count + * @return read size or -1 on error + */ ssize_t gkfs_read(int fd, void* buf, size_t count) { auto gkfs_fd = CTX->file_map()->get(fd); auto pos = gkfs_fd->pos(); //retrieve the current offset @@ -563,6 +726,15 @@ ssize_t gkfs_read(int fd, void* buf, size_t count) { return ret; } +/** + * gkfs wrapper for preadv() system calls + * errno may be set + * @param fd + * @param iov + * @param iovcnt + * @param offset + * @return read size or -1 on error + */ ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) { auto file = CTX->file_map()->get(fd); @@ -593,6 +765,14 @@ ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) { return read; } +/** + * gkfs wrapper for readv() system calls + * errno may be set + * @param fd + * @param iov + * @param iovcnt + * @return read size or -1 on error + */ ssize_t gkfs_readv(int fd, const struct iovec* iov, int iovcnt) { auto gkfs_fd = CTX->file_map()->get(fd); @@ -606,11 +786,26 @@ ssize_t gkfs_readv(int fd, const struct iovec* iov, int iovcnt) { return ret; } +/** + * gkfs wrapper for pread() system calls + * errno may be set + * @param fd + * @param buf + * @param count + * @param offset + * @return read size or -1 on error + */ ssize_t gkfs_pread_ws(int fd, void* buf, size_t count, off64_t offset) { auto gkfs_fd = CTX->file_map()->get(fd); return gkfs_pread(gkfs_fd, reinterpret_cast(buf), count, offset); } +/** + * wrapper function for opening directories + * errno may be set + * @param path + * @return 0 on success or -1 on error + */ int gkfs_opendir(const std::string& path) { auto md = gkfs::util::get_metadata(path); @@ -632,6 +827,12 @@ int gkfs_opendir(const std::string& path) { return CTX->file_map()->add(open_dir); } +/** + * gkfs wrapper for rmdir() system calls + * errno may be set + * @param path + * @return 0 on success or -1 on error + */ int gkfs_rmdir(const std::string& path) { auto md = gkfs::util::get_metadata(path); if (!md) { @@ -663,6 +864,14 @@ int gkfs_rmdir(const std::string& path) { return 0; } +/** + * gkfs wrapper for getdents() system calls + * errno may be set + * @param fd + * @param dirp + * @param count + * @return 0 on success or -1 on error + */ int gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) { @@ -725,7 +934,14 @@ int gkfs_getdents(unsigned int fd, return written; } - +/** + * gkfs wrapper for getdents64() system calls + * errno may be set + * @param fd + * @param dirp + * @param count + * @return 0 on success or -1 on error + */ int gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp, unsigned int count) { @@ -786,6 +1002,16 @@ int gkfs_getdents64(unsigned int fd, #ifdef HAS_SYMLINKS +/** + * gkfs wrapper for make symlink() system calls + * errno may be set + * + * * NOTE: Currently unused + * + * @param path + * @param target_path + * @return 0 on success or -1 on error + */ int gkfs_mk_symlink(const std::string& path, const std::string& target_path) { gkfs::preload::init_ld_env_if_needed(); /* The following check is not POSIX compliant. @@ -822,6 +1048,17 @@ int gkfs_mk_symlink(const std::string& path, const std::string& target_path) { return 0; } +/** + * gkfs wrapper for reading symlinks + * errno may be set + * + * NOTE: Currently unused + * + * @param path + * @param buf + * @param bufsize + * @return 0 on success or -1 on error + */ int gkfs_readlink(const std::string& path, char* buf, int bufsize) { gkfs::preload::init_ld_env_if_needed(); auto md = gkfs::util::get_metadata(path, false); diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index e4cbcf8a3..9bc721f01 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -72,7 +72,13 @@ hermes::endpoint lookup_endpoint(const std::string& uri, namespace gkfs { namespace util { - +/** + * Retrieve metadata from daemon + * errno may be set + * @param path + * @param follow_links + * @return shared_ptr for metadata, nullptr else + */ std::shared_ptr get_metadata(const string& path, bool follow_links) { std::string attr; auto err = gkfs::rpc::forward_stat(path, attr); diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 217d845d6..cf8e927c8 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -171,8 +171,6 @@ int forward_remove(const std::string& path, const bool remove_metadentry_only, c // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ - // - // handles.emplace_back(ld_network_service->post(endp, in)); -- GitLab From 4629e00c1510dcc0bdc00ea28cc35bdfaf6b4a49 Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Wed, 5 Aug 2020 12:02:56 +0200 Subject: [PATCH 4/4] forward_get_dirents() now returns a shared_ptr instead of it being given --- include/client/rpc/forward_data.hpp | 2 ++ include/client/rpc/forward_metadata.hpp | 5 +++- src/client/gkfs_functions.cpp | 13 ++++++---- src/client/rpc/forward_metadata.cpp | 34 +++++++++++++++---------- 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/include/client/rpc/forward_data.hpp b/include/client/rpc/forward_data.hpp index 378914c88..253f16647 100644 --- a/include/client/rpc/forward_data.hpp +++ b/include/client/rpc/forward_data.hpp @@ -24,6 +24,8 @@ struct ChunkStat { unsigned long chunk_free; }; +// TODO once we have LEAF, remove all the error code returns and throw them as an exception. + std::pair forward_write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset, size_t write_size, int64_t updated_metadentry_size); diff --git a/include/client/rpc/forward_metadata.hpp b/include/client/rpc/forward_metadata.hpp index 6e3a899b6..a8ab600ff 100644 --- a/include/client/rpc/forward_metadata.hpp +++ b/include/client/rpc/forward_metadata.hpp @@ -16,6 +16,7 @@ #define GEKKOFS_CLIENT_FORWARD_METADATA_HPP #include +#include /* Forward declaration */ namespace gkfs { @@ -27,6 +28,8 @@ struct MetadentryUpdateFlags; class Metadata; } +// TODO once we have LEAF, remove all the error code returns and throw them as an exception. + namespace rpc { int forward_create(const std::string& path, mode_t mode); @@ -45,7 +48,7 @@ forward_update_metadentry_size(const std::string& path, size_t size, off64_t off std::pair forward_get_metadentry_size(const std::string& path); -int forward_get_dirents(gkfs::filemap::OpenDir& open_dir); +std::pair> forward_get_dirents(const std::string& path); #ifdef HAS_SYMLINKS diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 963b45f5f..aa399c209 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -818,13 +818,14 @@ int gkfs_opendir(const std::string& path) { return -1; } - auto open_dir = std::make_shared(path); - auto err = gkfs::rpc::forward_get_dirents(*open_dir); + auto ret = gkfs::rpc::forward_get_dirents(path); + auto err = ret.first; if (err) { errno = err; return -1; } - return CTX->file_map()->add(open_dir); + assert(ret.second); + return CTX->file_map()->add(ret.second); } /** @@ -846,12 +847,14 @@ int gkfs_rmdir(const std::string& path) { return -1; } - auto open_dir = std::make_shared(path); - auto err = gkfs::rpc::forward_get_dirents(*open_dir); + auto ret = gkfs::rpc::forward_get_dirents(path); + auto err = ret.first; if (err) { errno = err; return -1; } + assert(ret.second); + auto open_dir = ret.second; if (open_dir->size() != 0) { errno = ENOTEMPTY; return -1; diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index cf8e927c8..664e8a15f 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -353,12 +353,11 @@ pair forward_get_metadentry_size(const std::string& path) { * @param open_dir * @return error code */ -int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { +pair> forward_get_dirents(const string& path) { - LOG(DEBUG, "{}() enter for path '{}'", __func__, open_dir.path()) + LOG(DEBUG, "{}() enter for path '{}'", __func__, path) - auto const root_dir = open_dir.path(); - auto const targets = CTX->distributor()->locate_directory_metadata(root_dir); + auto const targets = CTX->distributor()->locate_directory_metadata(path); /* preallocate receiving buffer. The actual size is not known yet. * @@ -387,10 +386,11 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { hermes::access_mode::write_only)); } catch (const std::exception& ex) { LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__, ex.what()); - return EBUSY; + return make_pair(EBUSY, nullptr); } } + auto err = 0; // send RPCs std::vector> handles; @@ -399,23 +399,25 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // Setup rpc input parameters for each host auto endp = CTX->hosts().at(targets[i]); - gkfs::rpc::get_dirents::input in(root_dir, exposed_buffers[i]); + gkfs::rpc::get_dirents::input in(path, exposed_buffers[i]); try { LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]); handles.emplace_back(ld_network_service->post(endp, in)); } catch (const std::exception& ex) { - LOG(ERROR, "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'", __func__, root_dir, + LOG(ERROR, "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'", __func__, path, targets[i], ex.what()); - return EBUSY; + err = EBUSY; + break; // we need to gather responses from already sent RPCS } } LOG(INFO, "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize", - __func__, open_dir.path(), targets.size(), per_host_buff_size); + __func__, path, targets.size(), per_host_buff_size); - auto err = 0; + auto send_error = err != 0; + auto open_dir = make_shared(path); // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { @@ -425,17 +427,21 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // XXX We might need a timeout here to not wait forever for an // output that never comes? out = handles[i].get().at(0); + // skip processing dirent data if there was an error during send + // In this case all responses are gathered but their contents skipped + if (send_error) + continue; if (out.err() != 0) { LOG(ERROR, "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'", __func__, targets[i], - strerror(out.err()), root_dir); + strerror(out.err()), path); err = out.err(); // We need to gather all responses before exiting continue; } } catch (const std::exception& ex) { - LOG(ERROR, "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'", __func__, root_dir, + LOG(ERROR, "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'", __func__, path, targets[i], ex.what()); err = EBUSY; // We need to gather all responses before exiting @@ -465,10 +471,10 @@ int forward_get_dirents(gkfs::filemap::OpenDir& open_dir) { // number of characters in entry + \0 terminator names_ptr += name.size() + 1; - open_dir.add(name, ftype); + open_dir->add(name, ftype); } } - return err; + return make_pair(err, open_dir); } #ifdef HAS_SYMLINKS -- GitLab