Loading hermes @ 38b6bbfe Compare 5c72fe06 to 38b6bbfe Original line number Diff line number Diff line Subproject commit 5c72fe06d390868bf1345a8767033c249997d2b7 Subproject commit 38b6bbfe77806b527f4e4b4157feea658f555be6 include/client/hooks.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ #ifndef IFS_HOOKS_HPP #define IFS_HOOKS_HPP #include <sys/types.h> #include <fcntl.h> Loading src/client/preload_util.cpp +4 −7 Original line number Diff line number Diff line Loading @@ -170,7 +170,7 @@ void load_hosts() { bool local_host_found = false; std::vector<hermes::endpoint> addrs; addrs.reserve(hosts.size()); addrs.resize(hosts.size()); vector<uint64_t> host_ids(hosts.size()); // populate vector with [0, ..., host_size - 1] Loading @@ -190,10 +190,7 @@ void load_hosts() { const auto& hostname = hosts.at(id).first; const auto& uri = hosts.at(id).second; auto endp = ::lookup_endpoint(uri); auto it = std::next(addrs.begin(), id); addrs.emplace(it, endp); addrs[id] = ::lookup_endpoint(uri); if (!local_host_found && hostname == local_hostname) { CTX->log()->debug("{}() Found local host: {}", __func__, hostname); Loading src/client/rpc/ld_rpc_metadentry.cpp +38 −17 Original line number Diff line number Diff line Loading @@ -159,36 +159,57 @@ int rm_node(const std::string& path, const bool remove_metadentry_only) { std::vector<hermes::rpc_handle<gkfs::rpc::remove>> handles; hermes::endpoint_set endps; std::copy(CTX->hosts().begin(), CTX->hosts().end(), std::back_inserter(endps)); for (const auto& endp : CTX->hosts()) { try { CTX->log()->trace("{}() Sending RPC to host: {}", __func__, endp.to_string()); auto output_set = ld_network_service->broadcast<gkfs::rpc::remove>(endps, path).get(); gkfs::rpc::remove::input in(path); // Wait for RPC responses and then get response for (const auto& out : output_set) { CTX->log()->debug("{}() Got response success: {}", __func__, out.err()); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that // we can retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ handles.emplace_back( ld_network_service->post<gkfs::rpc::remove>(endp, in)); if(out.err() != 0) { errno = out.err(); return -1; } catch (const std::exception& ex) { // TODO(amiranda): we should cancel all previously posted requests // here, unfortunately, Hermes does not support it yet :/ CTX->log()->error("{}() Failed to send request to host: {}", __func__, endp.to_string()); throw std::runtime_error("Failed to forward non-blocking rpc request"); } } return 0; // wait for RPC responses bool got_error = false; for(const auto& h : handles) { try { // XXX We might need a timeout here to not wait forever for an // output that never comes? auto out = h.get().at(0); if(out.err() != 0) { CTX->log()->error("{}() received error response: {}", __func__, out.err()); got_error = true; errno = out.err(); } } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); got_error = true; errno = EBUSY; return -1; } } return got_error ? -1 : 0; } int update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) { Loading src/daemon/handler/h_metadentry.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -132,7 +132,7 @@ static hg_return_t rpc_srv_rm_node(hg_handle_t handle) { if (ret != HG_SUCCESS) ADAFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__); assert(ret == HG_SUCCESS); ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path {}", in.path); ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path '{}'", in.path); try { // Remove metadentry if exists on the node Loading Loading
hermes @ 38b6bbfe Compare 5c72fe06 to 38b6bbfe Original line number Diff line number Diff line Subproject commit 5c72fe06d390868bf1345a8767033c249997d2b7 Subproject commit 38b6bbfe77806b527f4e4b4157feea658f555be6
include/client/hooks.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -14,6 +14,7 @@ #ifndef IFS_HOOKS_HPP #define IFS_HOOKS_HPP #include <sys/types.h> #include <fcntl.h> Loading
src/client/preload_util.cpp +4 −7 Original line number Diff line number Diff line Loading @@ -170,7 +170,7 @@ void load_hosts() { bool local_host_found = false; std::vector<hermes::endpoint> addrs; addrs.reserve(hosts.size()); addrs.resize(hosts.size()); vector<uint64_t> host_ids(hosts.size()); // populate vector with [0, ..., host_size - 1] Loading @@ -190,10 +190,7 @@ void load_hosts() { const auto& hostname = hosts.at(id).first; const auto& uri = hosts.at(id).second; auto endp = ::lookup_endpoint(uri); auto it = std::next(addrs.begin(), id); addrs.emplace(it, endp); addrs[id] = ::lookup_endpoint(uri); if (!local_host_found && hostname == local_hostname) { CTX->log()->debug("{}() Found local host: {}", __func__, hostname); Loading
src/client/rpc/ld_rpc_metadentry.cpp +38 −17 Original line number Diff line number Diff line Loading @@ -159,36 +159,57 @@ int rm_node(const std::string& path, const bool remove_metadentry_only) { std::vector<hermes::rpc_handle<gkfs::rpc::remove>> handles; hermes::endpoint_set endps; std::copy(CTX->hosts().begin(), CTX->hosts().end(), std::back_inserter(endps)); for (const auto& endp : CTX->hosts()) { try { CTX->log()->trace("{}() Sending RPC to host: {}", __func__, endp.to_string()); auto output_set = ld_network_service->broadcast<gkfs::rpc::remove>(endps, path).get(); gkfs::rpc::remove::input in(path); // Wait for RPC responses and then get response for (const auto& out : output_set) { CTX->log()->debug("{}() Got response success: {}", __func__, out.err()); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that // we can retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ handles.emplace_back( ld_network_service->post<gkfs::rpc::remove>(endp, in)); if(out.err() != 0) { errno = out.err(); return -1; } catch (const std::exception& ex) { // TODO(amiranda): we should cancel all previously posted requests // here, unfortunately, Hermes does not support it yet :/ CTX->log()->error("{}() Failed to send request to host: {}", __func__, endp.to_string()); throw std::runtime_error("Failed to forward non-blocking rpc request"); } } return 0; // wait for RPC responses bool got_error = false; for(const auto& h : handles) { try { // XXX We might need a timeout here to not wait forever for an // output that never comes? auto out = h.get().at(0); if(out.err() != 0) { CTX->log()->error("{}() received error response: {}", __func__, out.err()); got_error = true; errno = out.err(); } } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); got_error = true; errno = EBUSY; return -1; } } return got_error ? -1 : 0; } int update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) { Loading
src/daemon/handler/h_metadentry.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -132,7 +132,7 @@ static hg_return_t rpc_srv_rm_node(hg_handle_t handle) { if (ret != HG_SUCCESS) ADAFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__); assert(ret == HG_SUCCESS); ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path {}", in.path); ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path '{}'", in.path); try { // Remove metadentry if exists on the node Loading