Loading include/client/rpc/hg_rpcs.hpp +137 −0 Original line number Diff line number Diff line Loading @@ -1026,6 +1026,143 @@ struct get_metadentry_size { int64_t m_ret_size; }; }; //============================================================================== // definitions for update_metadentry_size struct update_metadentry_size { // forward declarations of public input/output types for this RPC class input; class output; // traits used so that the engine knows what to do with the RPC using self_type = update_metadentry_size; using handle_type = hermes::rpc_handle<self_type>; using input_type = input; using output_type = output; using mercury_input_type = rpc_update_metadentry_size_in_t; using mercury_output_type = rpc_update_metadentry_size_out_t; // RPC public identifier constexpr static const uint64_t public_id = 2760900608; // RPC internal Mercury identifier constexpr static const hg_id_t mercury_id = public_id; // RPC name constexpr static const auto name = hg_tag::update_metadentry_size; // requires response? constexpr static const auto requires_response = true; // Mercury callback to serialize input arguments constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_update_metadentry_size_in_t); // Mercury callback to serialize output arguments constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_update_metadentry_size_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, uint64_t size, int64_t offset, bool append) : m_path(path), m_size(size), m_offset(offset), m_append(append) { } input(input&& rhs) = default; input(const input& other) = default; input& operator=(input&& rhs) = default; input& operator=(const input& other) = default; std::string path() const { return m_path; } uint64_t size() const { return m_size; } int64_t offset() const { return m_offset; } bool append() const { return m_append; } explicit input(const rpc_update_metadentry_size_in_t& other) : m_path(other.path), m_size(other.size), m_offset(other.offset), m_append(other.append) { } explicit operator rpc_update_metadentry_size_in_t() { return {m_path.c_str(), m_size, m_offset, m_append}; } private: std::string m_path; uint64_t m_size; int64_t m_offset; bool m_append; }; class output { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err(), m_ret_size() {} output(int32_t err, int64_t ret_size) : m_err(err), m_ret_size(ret_size) {} output(output&& rhs) = default; output(const output& other) = default; output& operator=(output&& rhs) = default; output& operator=(const output& other) = default; explicit output(const rpc_update_metadentry_size_out_t& out) { m_err = out.err; m_ret_size = out.ret_size; } int32_t err() const { return m_err; } int64_t ret_size() const { return m_ret_size; } private: int32_t m_err; int64_t m_ret_size; }; }; } // namespace rpc } // namespace gkfs Loading src/client/preload.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -197,6 +197,7 @@ bool init_hermes_client(const std::string& transport_prefix) { rpc_decr_size_id = gkfs::rpc::decr_size::public_id; rpc_update_metadentry_id = gkfs::rpc::update_metadentry::public_id; rpc_get_metadentry_size_id = gkfs::rpc::get_metadentry_size::public_id; rpc_update_metadentry_size_id = gkfs::rpc::update_metadentry::public_id; return true; } Loading src/client/rpc/hg_rpcs.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ register_user_request_types() { (void) registered_requests().add<gkfs::rpc::decr_size>(); (void) registered_requests().add<gkfs::rpc::update_metadentry>(); (void) registered_requests().add<gkfs::rpc::get_metadentry_size>(); (void) registered_requests().add<gkfs::rpc::update_metadentry_size>(); } }} // namespace hermes::detail src/client/rpc/ld_rpc_metadentry.cpp +31 −43 Original line number Diff line number Diff line Loading @@ -247,53 +247,41 @@ int update_metadentry(const string& path, const Metadata& md, const MetadentryUp int update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, off64_t& ret_size) { hg_handle_t handle; rpc_update_metadentry_size_in_t in{}; rpc_update_metadentry_size_out_t out{}; // add data in.path = path.c_str(); in.size = size; in.offset = offset; if (append_flag) in.append = HG_TRUE; else in.append = HG_FALSE; int err = EUNKNOWN; CTX->log()->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(rpc_update_metadentry_size_id, path, handle); if (ret != HG_SUCCESS) { ret_size = 0; errno = EBUSY; margo_destroy(handle); return -1; } // Send rpc ret = margo_forward_timed_wrap(handle, &in); if (ret != HG_SUCCESS) { CTX->log()->error("{}() margo forward failed: {}", __func__, HG_Error_to_string(ret)); ret_size = 0; errno = EBUSY; margo_destroy(handle); auto endp = CTX->hosts2().at( CTX->distributor()->locate_file_metadata(path)); try { CTX->log()->debug("{}() Sending RPC ...", __func__); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ auto out = ld_network_service->post<gkfs::rpc::update_metadentry_size>( endp, path, size, offset, bool_to_merc_bool(append_flag)).get().at(0); CTX->log()->debug("{}() Got response success: {}", __func__, out.err()); if(out.err() != 0) { errno = out.err(); return -1; } ret = margo_get_output(handle, &out); if (ret != HG_SUCCESS) { CTX->log()->error("{}() failed to get rpc ouptut: {}", __func__, HG_Error_to_string(ret)); ret_size = 0; errno = EBUSY; margo_free_output(handle, &out); margo_destroy(handle); } ret_size = out.ret_size(); return out.err(); CTX->log()->debug("{}() Got response: {}", __func__, out.err); err = out.err; ret_size = out.ret_size; return 0; margo_free_output(handle, &out); margo_destroy(handle); return err; } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); errno = EBUSY; ret_size = 0; return EUNKNOWN; } } int get_metadentry_size(const std::string& path, off64_t& ret_size) { Loading Loading
include/client/rpc/hg_rpcs.hpp +137 −0 Original line number Diff line number Diff line Loading @@ -1026,6 +1026,143 @@ struct get_metadentry_size { int64_t m_ret_size; }; }; //============================================================================== // definitions for update_metadentry_size struct update_metadentry_size { // forward declarations of public input/output types for this RPC class input; class output; // traits used so that the engine knows what to do with the RPC using self_type = update_metadentry_size; using handle_type = hermes::rpc_handle<self_type>; using input_type = input; using output_type = output; using mercury_input_type = rpc_update_metadentry_size_in_t; using mercury_output_type = rpc_update_metadentry_size_out_t; // RPC public identifier constexpr static const uint64_t public_id = 2760900608; // RPC internal Mercury identifier constexpr static const hg_id_t mercury_id = public_id; // RPC name constexpr static const auto name = hg_tag::update_metadentry_size; // requires response? constexpr static const auto requires_response = true; // Mercury callback to serialize input arguments constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_update_metadentry_size_in_t); // Mercury callback to serialize output arguments constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_update_metadentry_size_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, uint64_t size, int64_t offset, bool append) : m_path(path), m_size(size), m_offset(offset), m_append(append) { } input(input&& rhs) = default; input(const input& other) = default; input& operator=(input&& rhs) = default; input& operator=(const input& other) = default; std::string path() const { return m_path; } uint64_t size() const { return m_size; } int64_t offset() const { return m_offset; } bool append() const { return m_append; } explicit input(const rpc_update_metadentry_size_in_t& other) : m_path(other.path), m_size(other.size), m_offset(other.offset), m_append(other.append) { } explicit operator rpc_update_metadentry_size_in_t() { return {m_path.c_str(), m_size, m_offset, m_append}; } private: std::string m_path; uint64_t m_size; int64_t m_offset; bool m_append; }; class output { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err(), m_ret_size() {} output(int32_t err, int64_t ret_size) : m_err(err), m_ret_size(ret_size) {} output(output&& rhs) = default; output(const output& other) = default; output& operator=(output&& rhs) = default; output& operator=(const output& other) = default; explicit output(const rpc_update_metadentry_size_out_t& out) { m_err = out.err; m_ret_size = out.ret_size; } int32_t err() const { return m_err; } int64_t ret_size() const { return m_ret_size; } private: int32_t m_err; int64_t m_ret_size; }; }; } // namespace rpc } // namespace gkfs Loading
src/client/preload.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -197,6 +197,7 @@ bool init_hermes_client(const std::string& transport_prefix) { rpc_decr_size_id = gkfs::rpc::decr_size::public_id; rpc_update_metadentry_id = gkfs::rpc::update_metadentry::public_id; rpc_get_metadentry_size_id = gkfs::rpc::get_metadentry_size::public_id; rpc_update_metadentry_size_id = gkfs::rpc::update_metadentry::public_id; return true; } Loading
src/client/rpc/hg_rpcs.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ register_user_request_types() { (void) registered_requests().add<gkfs::rpc::decr_size>(); (void) registered_requests().add<gkfs::rpc::update_metadentry>(); (void) registered_requests().add<gkfs::rpc::get_metadentry_size>(); (void) registered_requests().add<gkfs::rpc::update_metadentry_size>(); } }} // namespace hermes::detail
src/client/rpc/ld_rpc_metadentry.cpp +31 −43 Original line number Diff line number Diff line Loading @@ -247,53 +247,41 @@ int update_metadentry(const string& path, const Metadata& md, const MetadentryUp int update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, off64_t& ret_size) { hg_handle_t handle; rpc_update_metadentry_size_in_t in{}; rpc_update_metadentry_size_out_t out{}; // add data in.path = path.c_str(); in.size = size; in.offset = offset; if (append_flag) in.append = HG_TRUE; else in.append = HG_FALSE; int err = EUNKNOWN; CTX->log()->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(rpc_update_metadentry_size_id, path, handle); if (ret != HG_SUCCESS) { ret_size = 0; errno = EBUSY; margo_destroy(handle); return -1; } // Send rpc ret = margo_forward_timed_wrap(handle, &in); if (ret != HG_SUCCESS) { CTX->log()->error("{}() margo forward failed: {}", __func__, HG_Error_to_string(ret)); ret_size = 0; errno = EBUSY; margo_destroy(handle); auto endp = CTX->hosts2().at( CTX->distributor()->locate_file_metadata(path)); try { CTX->log()->debug("{}() Sending RPC ...", __func__); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ auto out = ld_network_service->post<gkfs::rpc::update_metadentry_size>( endp, path, size, offset, bool_to_merc_bool(append_flag)).get().at(0); CTX->log()->debug("{}() Got response success: {}", __func__, out.err()); if(out.err() != 0) { errno = out.err(); return -1; } ret = margo_get_output(handle, &out); if (ret != HG_SUCCESS) { CTX->log()->error("{}() failed to get rpc ouptut: {}", __func__, HG_Error_to_string(ret)); ret_size = 0; errno = EBUSY; margo_free_output(handle, &out); margo_destroy(handle); } ret_size = out.ret_size(); return out.err(); CTX->log()->debug("{}() Got response: {}", __func__, out.err); err = out.err; ret_size = out.ret_size; return 0; margo_free_output(handle, &out); margo_destroy(handle); return err; } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); errno = EBUSY; ret_size = 0; return EUNKNOWN; } } int get_metadentry_size(const std::string& path, off64_t& ret_size) { Loading