Loading include/client/rpc/hg_rpcs.hpp +260 −0 Original line number Diff line number Diff line Loading @@ -658,6 +658,266 @@ struct decr_size { }; }; //============================================================================== // definitions for update_metadentry struct update_metadentry { // forward declarations of public input/output types for this RPC class input; class output; // traits used so that the engine knows what to do with the RPC using self_type = update_metadentry; using handle_type = hermes::rpc_handle<self_type>; using input_type = input; using output_type = output; using mercury_input_type = rpc_update_metadentry_in_t; using mercury_output_type = rpc_err_out_t; // RPC public identifier constexpr static const uint64_t public_id = 99483648; // RPC internal Mercury identifier constexpr static const hg_id_t mercury_id = public_id; // RPC name constexpr static const auto name = hg_tag::update_metadentry; // requires response? constexpr static const auto requires_response = true; // Mercury callback to serialize input arguments constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_update_metadentry_in_t); // Mercury callback to serialize output arguments constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_err_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, uint64_t nlink, uint32_t mode, uint32_t uid, uint32_t gid, int64_t size, int64_t blocks, int64_t atime, int64_t mtime, int64_t ctime, bool nlink_flag, bool mode_flag, bool size_flag, bool block_flag, bool atime_flag, bool mtime_flag, bool ctime_flag) : m_path(path), m_nlink(nlink), m_mode(mode), m_uid(uid), m_gid(gid), m_size(size), m_blocks(blocks), m_atime(atime), m_mtime(mtime), m_ctime(ctime), m_nlink_flag(nlink_flag), m_mode_flag(mode_flag), m_size_flag(size_flag), m_block_flag(block_flag), m_atime_flag(atime_flag), m_mtime_flag(mtime_flag), m_ctime_flag(ctime_flag) { } input(input&& rhs) = default; input(const input& other) = default; input& operator=(input&& rhs) = default; input& operator=(const input& other) = default; std::string path() const { return m_path; } uint64_t nlink() const { return m_nlink; } uint32_t mode() const { return m_mode; } uint32_t uid() const { return m_uid; } uint32_t gid() const { return m_gid; } int64_t size() const { return m_size; } int64_t blocks() const { return m_blocks; } int64_t atime() const { return m_atime; } int64_t mtime() const { return m_mtime; } int64_t ctime() const { return m_ctime; } bool nlink_flag() const { return m_nlink_flag; } bool mode_flag() const { return m_mode_flag; } bool size_flag() const { return m_size_flag; } bool block_flag() const { return m_block_flag; } bool atime_flag() const { return m_atime_flag; } bool mtime_flag() const { return m_mtime_flag; } bool ctime_flag() const { return m_ctime_flag; } explicit input(const rpc_update_metadentry_in_t& other) : m_path(other.path), m_nlink(other.nlink), m_mode(other.mode), m_uid(other.uid), m_gid(other.gid), m_size(other.size), m_blocks(other.blocks), m_atime(other.atime), m_mtime(other.mtime), m_ctime(other.ctime), m_nlink_flag(other.nlink_flag), m_mode_flag(other.mode_flag), m_size_flag(other.size_flag), m_block_flag(other.block_flag), m_atime_flag(other.atime_flag), m_mtime_flag(other.mtime_flag), m_ctime_flag(other.ctime_flag) { } explicit operator rpc_update_metadentry_in_t() { return {m_path.c_str(), m_nlink, m_mode, m_uid, m_gid, m_size, m_blocks, m_atime, m_mtime, m_ctime, m_nlink_flag, m_mode_flag, m_size_flag, m_block_flag, m_atime_flag, m_mtime_flag, m_ctime_flag}; } private: std::string m_path; uint64_t m_nlink; uint32_t m_mode; uint32_t m_uid; uint32_t m_gid; int64_t m_size; int64_t m_blocks; int64_t m_atime; int64_t m_mtime; int64_t m_ctime; bool m_nlink_flag; bool m_mode_flag; bool m_size_flag; bool m_block_flag; bool m_atime_flag; bool m_mtime_flag; bool m_ctime_flag; }; class output { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err() {} output(int32_t err) : m_err(err) {} output(output&& rhs) = default; output(const output& other) = default; output& operator=(output&& rhs) = default; output& operator=(const output& other) = default; explicit output(const rpc_err_out_t& out) { m_err = out.err; } int32_t err() const { return m_err; } private: int32_t m_err; }; }; } // namespace rpc } // namespace gkfs Loading src/client/preload.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -194,6 +194,8 @@ bool init_hermes_client(const std::string& transport_prefix) { rpc_mk_node_id = gkfs::rpc::create::public_id; rpc_stat_id = gkfs::rpc::stat::public_id; rpc_rm_node_id = gkfs::rpc::remove::public_id; rpc_decr_size_id = gkfs::rpc::decr_size::public_id; rpc_update_metadentry_id = gkfs::rpc::update_metadentry::public_id; return true; } Loading src/client/rpc/hg_rpcs.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ register_user_request_types() { (void) registered_requests().add<gkfs::rpc::stat>(); (void) registered_requests().add<gkfs::rpc::remove>(); (void) registered_requests().add<gkfs::rpc::decr_size>(); (void) registered_requests().add<gkfs::rpc::update_metadentry>(); } }} // namespace hermes::detail src/client/rpc/ld_rpc_metadentry.cpp +43 −47 Original line number Diff line number Diff line Loading @@ -162,7 +162,6 @@ int rm_node(const std::string& path, const bool remove_metadentry_only) { return 0; } std::size_t rpc_target_size2 = CTX->hosts2().size(); std::vector<hermes::rpc_handle<gkfs::rpc::remove>> handles; hermes::endpoint_set endps; Loading Loading @@ -197,56 +196,53 @@ int rm_node(const std::string& path, const bool remove_metadentry_only) { int update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) { hg_handle_t handle; rpc_update_metadentry_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; // fill in // add data in.path = path.c_str(); in.size = md_flags.size ? md.size() : 0; in.nlink = md_flags.link_count ? md.link_count() : 0; in.blocks = md_flags.blocks ? md.blocks() : 0; in.atime = md_flags.atime ? md.atime() : 0; in.mtime = md_flags.mtime ? md.mtime() : 0; in.ctime = md_flags.ctime ? md.ctime() : 0; // add data flags in.size_flag = bool_to_merc_bool(md_flags.size); in.nlink_flag = bool_to_merc_bool(md_flags.link_count); in.block_flag = bool_to_merc_bool(md_flags.blocks); in.atime_flag = bool_to_merc_bool(md_flags.atime); in.mtime_flag = bool_to_merc_bool(md_flags.mtime); in.ctime_flag = bool_to_merc_bool(md_flags.ctime); CTX->log()->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(rpc_update_metadentry_id, path, handle); if (ret != HG_SUCCESS) { errno = EBUSY; auto endp = CTX->hosts2().at( CTX->distributor()->locate_file_metadata(path)); try { CTX->log()->debug("{}() Sending RPC ...", __func__); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ auto out = ld_network_service->post<gkfs::rpc::update_metadentry>( endp, path, (md_flags.link_count ? md.link_count() : 0), /* mode */ 0, /* uid */ 0, /* gid */ 0, (md_flags.size ? md.size() : 0), (md_flags.blocks ? md.blocks() : 0), (md_flags.atime ? md.atime() : 0), (md_flags.mtime ? md.mtime() : 0), (md_flags.ctime ? md.ctime() : 0), bool_to_merc_bool(md_flags.link_count), /* mode_flag */ false, bool_to_merc_bool(md_flags.size), bool_to_merc_bool(md_flags.blocks), bool_to_merc_bool(md_flags.atime), bool_to_merc_bool(md_flags.mtime), bool_to_merc_bool(md_flags.ctime)).get().at(0); CTX->log()->debug("{}() Got response success: {}", __func__, out.err()); if(out.err() != 0) { errno = out.err(); return -1; } // Send rpc ret = margo_forward_timed_wrap(handle, &in); // Get response if (ret == HG_SUCCESS) { CTX->log()->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { CTX->log()->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; return 0; } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { CTX->log()->warn("{}() timed out", __func__); errno = EBUSY; return -1; } margo_destroy(handle); return err; } int update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, Loading Loading
include/client/rpc/hg_rpcs.hpp +260 −0 Original line number Diff line number Diff line Loading @@ -658,6 +658,266 @@ struct decr_size { }; }; //============================================================================== // definitions for update_metadentry struct update_metadentry { // forward declarations of public input/output types for this RPC class input; class output; // traits used so that the engine knows what to do with the RPC using self_type = update_metadentry; using handle_type = hermes::rpc_handle<self_type>; using input_type = input; using output_type = output; using mercury_input_type = rpc_update_metadentry_in_t; using mercury_output_type = rpc_err_out_t; // RPC public identifier constexpr static const uint64_t public_id = 99483648; // RPC internal Mercury identifier constexpr static const hg_id_t mercury_id = public_id; // RPC name constexpr static const auto name = hg_tag::update_metadentry; // requires response? constexpr static const auto requires_response = true; // Mercury callback to serialize input arguments constexpr static const auto mercury_in_proc_cb = HG_GEN_PROC_NAME(rpc_update_metadentry_in_t); // Mercury callback to serialize output arguments constexpr static const auto mercury_out_proc_cb = HG_GEN_PROC_NAME(rpc_err_out_t); class input { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: input(const std::string& path, uint64_t nlink, uint32_t mode, uint32_t uid, uint32_t gid, int64_t size, int64_t blocks, int64_t atime, int64_t mtime, int64_t ctime, bool nlink_flag, bool mode_flag, bool size_flag, bool block_flag, bool atime_flag, bool mtime_flag, bool ctime_flag) : m_path(path), m_nlink(nlink), m_mode(mode), m_uid(uid), m_gid(gid), m_size(size), m_blocks(blocks), m_atime(atime), m_mtime(mtime), m_ctime(ctime), m_nlink_flag(nlink_flag), m_mode_flag(mode_flag), m_size_flag(size_flag), m_block_flag(block_flag), m_atime_flag(atime_flag), m_mtime_flag(mtime_flag), m_ctime_flag(ctime_flag) { } input(input&& rhs) = default; input(const input& other) = default; input& operator=(input&& rhs) = default; input& operator=(const input& other) = default; std::string path() const { return m_path; } uint64_t nlink() const { return m_nlink; } uint32_t mode() const { return m_mode; } uint32_t uid() const { return m_uid; } uint32_t gid() const { return m_gid; } int64_t size() const { return m_size; } int64_t blocks() const { return m_blocks; } int64_t atime() const { return m_atime; } int64_t mtime() const { return m_mtime; } int64_t ctime() const { return m_ctime; } bool nlink_flag() const { return m_nlink_flag; } bool mode_flag() const { return m_mode_flag; } bool size_flag() const { return m_size_flag; } bool block_flag() const { return m_block_flag; } bool atime_flag() const { return m_atime_flag; } bool mtime_flag() const { return m_mtime_flag; } bool ctime_flag() const { return m_ctime_flag; } explicit input(const rpc_update_metadentry_in_t& other) : m_path(other.path), m_nlink(other.nlink), m_mode(other.mode), m_uid(other.uid), m_gid(other.gid), m_size(other.size), m_blocks(other.blocks), m_atime(other.atime), m_mtime(other.mtime), m_ctime(other.ctime), m_nlink_flag(other.nlink_flag), m_mode_flag(other.mode_flag), m_size_flag(other.size_flag), m_block_flag(other.block_flag), m_atime_flag(other.atime_flag), m_mtime_flag(other.mtime_flag), m_ctime_flag(other.ctime_flag) { } explicit operator rpc_update_metadentry_in_t() { return {m_path.c_str(), m_nlink, m_mode, m_uid, m_gid, m_size, m_blocks, m_atime, m_mtime, m_ctime, m_nlink_flag, m_mode_flag, m_size_flag, m_block_flag, m_atime_flag, m_mtime_flag, m_ctime_flag}; } private: std::string m_path; uint64_t m_nlink; uint32_t m_mode; uint32_t m_uid; uint32_t m_gid; int64_t m_size; int64_t m_blocks; int64_t m_atime; int64_t m_mtime; int64_t m_ctime; bool m_nlink_flag; bool m_mode_flag; bool m_size_flag; bool m_block_flag; bool m_atime_flag; bool m_mtime_flag; bool m_ctime_flag; }; class output { template <typename ExecutionContext> friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*); public: output() : m_err() {} output(int32_t err) : m_err(err) {} output(output&& rhs) = default; output(const output& other) = default; output& operator=(output&& rhs) = default; output& operator=(const output& other) = default; explicit output(const rpc_err_out_t& out) { m_err = out.err; } int32_t err() const { return m_err; } private: int32_t m_err; }; }; } // namespace rpc } // namespace gkfs Loading
src/client/preload.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -194,6 +194,8 @@ bool init_hermes_client(const std::string& transport_prefix) { rpc_mk_node_id = gkfs::rpc::create::public_id; rpc_stat_id = gkfs::rpc::stat::public_id; rpc_rm_node_id = gkfs::rpc::remove::public_id; rpc_decr_size_id = gkfs::rpc::decr_size::public_id; rpc_update_metadentry_id = gkfs::rpc::update_metadentry::public_id; return true; } Loading
src/client/rpc/hg_rpcs.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ register_user_request_types() { (void) registered_requests().add<gkfs::rpc::stat>(); (void) registered_requests().add<gkfs::rpc::remove>(); (void) registered_requests().add<gkfs::rpc::decr_size>(); (void) registered_requests().add<gkfs::rpc::update_metadentry>(); } }} // namespace hermes::detail
src/client/rpc/ld_rpc_metadentry.cpp +43 −47 Original line number Diff line number Diff line Loading @@ -162,7 +162,6 @@ int rm_node(const std::string& path, const bool remove_metadentry_only) { return 0; } std::size_t rpc_target_size2 = CTX->hosts2().size(); std::vector<hermes::rpc_handle<gkfs::rpc::remove>> handles; hermes::endpoint_set endps; Loading Loading @@ -197,56 +196,53 @@ int rm_node(const std::string& path, const bool remove_metadentry_only) { int update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) { hg_handle_t handle; rpc_update_metadentry_in_t in{}; rpc_err_out_t out{}; int err = EUNKNOWN; // fill in // add data in.path = path.c_str(); in.size = md_flags.size ? md.size() : 0; in.nlink = md_flags.link_count ? md.link_count() : 0; in.blocks = md_flags.blocks ? md.blocks() : 0; in.atime = md_flags.atime ? md.atime() : 0; in.mtime = md_flags.mtime ? md.mtime() : 0; in.ctime = md_flags.ctime ? md.ctime() : 0; // add data flags in.size_flag = bool_to_merc_bool(md_flags.size); in.nlink_flag = bool_to_merc_bool(md_flags.link_count); in.block_flag = bool_to_merc_bool(md_flags.blocks); in.atime_flag = bool_to_merc_bool(md_flags.atime); in.mtime_flag = bool_to_merc_bool(md_flags.mtime); in.ctime_flag = bool_to_merc_bool(md_flags.ctime); CTX->log()->debug("{}() Creating Mercury handle ...", __func__); auto ret = margo_create_wrap(rpc_update_metadentry_id, path, handle); if (ret != HG_SUCCESS) { errno = EBUSY; auto endp = CTX->hosts2().at( CTX->distributor()->locate_file_metadata(path)); try { CTX->log()->debug("{}() Sending RPC ...", __func__); // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can // retry for RPC_TRIES (see old commits with margo) // TODO(amiranda): hermes will eventually provide a post(endpoint) // returning one result and a broadcast(endpoint_set) returning a // result_set. When that happens we can remove the .at(0) :/ auto out = ld_network_service->post<gkfs::rpc::update_metadentry>( endp, path, (md_flags.link_count ? md.link_count() : 0), /* mode */ 0, /* uid */ 0, /* gid */ 0, (md_flags.size ? md.size() : 0), (md_flags.blocks ? md.blocks() : 0), (md_flags.atime ? md.atime() : 0), (md_flags.mtime ? md.mtime() : 0), (md_flags.ctime ? md.ctime() : 0), bool_to_merc_bool(md_flags.link_count), /* mode_flag */ false, bool_to_merc_bool(md_flags.size), bool_to_merc_bool(md_flags.blocks), bool_to_merc_bool(md_flags.atime), bool_to_merc_bool(md_flags.mtime), bool_to_merc_bool(md_flags.ctime)).get().at(0); CTX->log()->debug("{}() Got response success: {}", __func__, out.err()); if(out.err() != 0) { errno = out.err(); return -1; } // Send rpc ret = margo_forward_timed_wrap(handle, &in); // Get response if (ret == HG_SUCCESS) { CTX->log()->trace("{}() Waiting for response", __func__); ret = margo_get_output(handle, &out); if (ret == HG_SUCCESS) { CTX->log()->debug("{}() Got response success: {}", __func__, out.err); err = out.err; } else { // something is wrong errno = EBUSY; return 0; } catch(const std::exception& ex) { CTX->log()->error("{}() while getting rpc output", __func__); } /* clean up resources consumed by this rpc */ margo_free_output(handle, &out); } else { CTX->log()->warn("{}() timed out", __func__); errno = EBUSY; return -1; } margo_destroy(handle); return err; } int update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag, Loading