Loading include/client/rpc/hg_rpcs.hpp +112 −0 Original line number Diff line number Diff line Loading @@ -217,4 +217,116 @@ struct fs_config { uint32_t m_gid; }; }; //============================================================================== // definitions for create struct create { // forward declarations of public input/output types for this RPC class input; class output; // traits used so that the engine knows what to do with the RPC using self_type = create; using handle_type = hermes::rpc_handle<self_type>; using input_type = input; using output_type = output; using mercury_input_type = rpc_mk_node_in_t; using mercury_output_type = rpc_err_out_t; // RPC public identifier constexpr static const uint64_t public_id = 796590080; // RPC internal Mercury identifier constexpr static const hg_id_t mercury_id = public_id; // RPC name constexpr static const auto name = hg_tag::create; // requires response? constexpr static const auto requires_response = true; // Mercury callback to serialize input arguments constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_mk_node_in_t); // Mercury callback to serialize output arguments constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_err_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, uint32_t mode) : m_path(path), m_mode(mode) { } input(input&& rhs) = default; input(const input& other) = default; input& operator=(input&& rhs) = default; input& operator=(const input& other) = default; std::string path() const { return m_path; } uint32_t mode() const { return m_mode; } explicit input(const rpc_mk_node_in_t& other) : m_path(other.path), m_mode(other.mode) { } explicit operator rpc_mk_node_in_t() { return {m_path.c_str(), m_mode}; } private: std::string m_path; uint32_t m_mode; }; class output { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err() {} output(int32_t err) : m_err(err) {} output(output&& rhs) = default; output(const output& other) = default; output& operator=(output&& rhs) = default; output& operator=(const output& other) = default; explicit output(const rpc_err_out_t& out) { m_err = out.err; } int32_t err() const { return m_err; } private: int32_t m_err; }; }; #endif // GKFS_RPCS_HPP src/client/preload.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -191,6 +191,7 @@ bool init_hermes_client(const std::string& transport_prefix) { } rpc_config_id = fs_config::public_id; rpc_mk_node_id = create::public_id; return true; } Loading src/client/rpc/hg_rpcs.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ namespace hermes { namespace detail { void register_user_request_types() { (void) registered_requests().add<fs_config>(); (void) registered_requests().add<create>(); } }} // namespace hermes::detail src/client/rpc/ld_rpc_metadentry.cpp +20 −32 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <global/rpc/rpc_utils.hpp> #include <global/rpc/distributor.hpp> #include <global/rpc/rpc_types.hpp> #include <client/rpc/hg_rpcs.hpp> namespace rpc_send { Loading @@ -30,42 +31,29 @@ margo_forward_timed_wrap(const hg_handle_t& handle, void* in_struct) { } int mk_node(const std::string& path, const mode_t mode) { hg_handle_t handle; rpc_mk_node_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; // fill in in.path = path.c_str(); in.mode = mode; // Create handle CTX->log()->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(rpc_mk_node_id, path, handle); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc CTX->log()->debug("{}() About to send RPC ...", __func__); ret = margo_forward_timed_wrap(handle, &in); // Get response if (ret == HG_SUCCESS) { CTX->log()->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { CTX->log()->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; auto endp = CTX->hosts2().at( CTX->distributor()->locate_file_metadata(path)); try { CTX->log()->debug("{}() Sending RPC ...", __func__); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ auto out = ld_network_service->post<create>(endp, path, mode).get().at(0); err = out.err(); CTX->log()->debug("{}() Got response success: {}", __func__, err); } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { CTX->log()->warn("{}() timed out", __func__); errno = EBUSY; return -1; } margo_destroy(handle); return err; } Loading Loading
include/client/rpc/hg_rpcs.hpp +112 −0 Original line number Diff line number Diff line Loading @@ -217,4 +217,116 @@ struct fs_config { uint32_t m_gid; }; }; //============================================================================== // definitions for create struct create { // forward declarations of public input/output types for this RPC class input; class output; // traits used so that the engine knows what to do with the RPC using self_type = create; using handle_type = hermes::rpc_handle<self_type>; using input_type = input; using output_type = output; using mercury_input_type = rpc_mk_node_in_t; using mercury_output_type = rpc_err_out_t; // RPC public identifier constexpr static const uint64_t public_id = 796590080; // RPC internal Mercury identifier constexpr static const hg_id_t mercury_id = public_id; // RPC name constexpr static const auto name = hg_tag::create; // requires response? constexpr static const auto requires_response = true; // Mercury callback to serialize input arguments constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_mk_node_in_t); // Mercury callback to serialize output arguments constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_err_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, uint32_t mode) : m_path(path), m_mode(mode) { } input(input&& rhs) = default; input(const input& other) = default; input& operator=(input&& rhs) = default; input& operator=(const input& other) = default; std::string path() const { return m_path; } uint32_t mode() const { return m_mode; } explicit input(const rpc_mk_node_in_t& other) : m_path(other.path), m_mode(other.mode) { } explicit operator rpc_mk_node_in_t() { return {m_path.c_str(), m_mode}; } private: std::string m_path; uint32_t m_mode; }; class output { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err() {} output(int32_t err) : m_err(err) {} output(output&& rhs) = default; output(const output& other) = default; output& operator=(output&& rhs) = default; output& operator=(const output& other) = default; explicit output(const rpc_err_out_t& out) { m_err = out.err; } int32_t err() const { return m_err; } private: int32_t m_err; }; }; #endif // GKFS_RPCS_HPP
src/client/preload.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -191,6 +191,7 @@ bool init_hermes_client(const std::string& transport_prefix) { } rpc_config_id = fs_config::public_id; rpc_mk_node_id = create::public_id; return true; } Loading
src/client/rpc/hg_rpcs.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -22,6 +22,7 @@ namespace hermes { namespace detail { void register_user_request_types() { (void) registered_requests().add<fs_config>(); (void) registered_requests().add<create>(); } }} // namespace hermes::detail
src/client/rpc/ld_rpc_metadentry.cpp +20 −32 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <global/rpc/rpc_utils.hpp> #include <global/rpc/distributor.hpp> #include <global/rpc/rpc_types.hpp> #include <client/rpc/hg_rpcs.hpp> namespace rpc_send { Loading @@ -30,42 +31,29 @@ margo_forward_timed_wrap(const hg_handle_t& handle, void* in_struct) { } int mk_node(const std::string& path, const mode_t mode) { hg_handle_t handle; rpc_mk_node_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; // fill in in.path = path.c_str(); in.mode = mode; // Create handle CTX->log()->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(rpc_mk_node_id, path, handle); if (ret != HG_SUCCESS) { errno = EBUSY; return -1; } // Send rpc CTX->log()->debug("{}() About to send RPC ...", __func__); ret = margo_forward_timed_wrap(handle, &in); // Get response if (ret == HG_SUCCESS) { CTX->log()->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { CTX->log()->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; auto endp = CTX->hosts2().at( CTX->distributor()->locate_file_metadata(path)); try { CTX->log()->debug("{}() Sending RPC ...", __func__); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ auto out = ld_network_service->post<create>(endp, path, mode).get().at(0); err = out.err(); CTX->log()->debug("{}() Got response success: {}", __func__, err); } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { CTX->log()->warn("{}() timed out", __func__); errno = EBUSY; return -1; } margo_destroy(handle); return err; } Loading