Loading lfs/src/classes/rpc_data.cpp +9 −0 Original line number Diff line number Diff line Loading @@ -73,6 +73,14 @@ void RPCData::rpc_srv_create_id(hg_id_t rpc_srv_create_id) { RPCData::rpc_srv_create_id_ = rpc_srv_create_id; } hg_id_t RPCData::rpc_srv_attr_id() const { return rpc_srv_attr_id_; } void RPCData::rpc_srv_attr_id(hg_id_t rpc_srv_attr_id) { RPCData::rpc_srv_attr_id_ = rpc_srv_attr_id; } // Utility functions bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { Loading Loading @@ -101,3 +109,4 @@ std::string RPCData::get_dentry_hashable(const fuse_ino_t parent, const char* na lfs/src/classes/rpc_data.hpp +5 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ private: // RPC client IDs hg_id_t rpc_minimal_id_; hg_id_t rpc_srv_create_id_; hg_id_t rpc_srv_attr_id_; public: Loading Loading @@ -81,6 +82,10 @@ public: void rpc_srv_create_id(hg_id_t rpc_srv_create_id); hg_id_t rpc_srv_attr_id() const; void rpc_srv_attr_id(hg_id_t rpc_srv_attr_id); // Utility functions bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr); Loading lfs/src/fuse_ops/file.cpp +19 −3 Original line number Diff line number Diff line Loading @@ -35,11 +35,27 @@ using namespace std; void adafs_ll_getattr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { ADAFS_DATA->spdlogger()->debug("adafs_ll_getattr() enter: inode {}", ino); auto attr = make_shared<struct stat>(); auto err = get_attr(*attr, ino); struct stat attr{}; int err; if (ADAFS_DATA->host_size() > 1) { // might be remote auto recipient = RPC_DATA->get_rpc_node(fmt::FormatInt(ino).str()); if (recipient == ADAFS_DATA->host_id()) { // local err = get_attr(attr, ino); } else { // remote err = rpc_send_get_attr(recipient, ino, attr); } } else { // local err = get_attr(attr, ino); } // auto attr = make_shared<struct stat>(); if (err == 0) { // XXX take a look into timeout value later fuse_reply_attr(req, attr.get(), 1.0); fuse_reply_attr(req, &attr, 1.0); } else { fuse_reply_err(req, err); } Loading lfs/src/rpc/client/c_metadata.cpp +52 −1 Original line number Diff line number Diff line Loading @@ -145,7 +145,7 @@ int rpc_send_create(const size_t recipient, const fuse_ino_t parent, const strin /* clean up resources consumed by this rpc */ HG_Free_output(handle, &out); } else { ADAFS_DATA->spdlogger()->error("RPC NOT send (timed out)"); ADAFS_DATA->spdlogger()->error("RPC send_create (timed out)"); } Loading @@ -153,3 +153,54 @@ int rpc_send_create(const size_t recipient, const fuse_ino_t parent, const strin HG_Destroy(handle); return 0; } int rpc_send_get_attr(const size_t recipient, const fuse_ino_t inode, struct stat& attr) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_get_attr_in_t in; rpc_get_attr_out_t out; // fill in in.inode = inode; // TODO HG_ADDR_T is never freed atm. Need to change LRUCache if (!RPC_DATA->get_addr_by_hostid(recipient, svr_addr)) { ADAFS_DATA->spdlogger()->error("server address not resolvable for host id {}", recipient); return 1; } auto ret = HG_Create(RPC_DATA->client_hg_context(), svr_addr, RPC_DATA->rpc_srv_attr_id(), &handle); if (ret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("creating handle FAILED"); return 1; } int send_ret = HG_FALSE; for (int i = 0; i < max_retries; ++i) { send_ret = margo_forward_timed(RPC_DATA->client_mid(), handle, &in, 15000); if (send_ret == HG_SUCCESS) { break; } } if (send_ret == HG_SUCCESS) { /* decode response */ ret = HG_Get_output(handle, &out); ADAFS_DATA->spdlogger()->debug("Got response mode {}", out.mode); attr.st_atim.tv_sec = static_cast<time_t>(out.atime); attr.st_mtim.tv_sec = static_cast<time_t>(out.mtime); attr.st_ctim.tv_sec = static_cast<time_t>(out.ctime); attr.st_mode = static_cast<mode_t>(out.mode); attr.st_uid = static_cast<uid_t>(out.uid); attr.st_gid = static_cast<gid_t>(out.gid); attr.st_nlink = static_cast<nlink_t>(out.nlink); attr.st_size = static_cast<size_t>(out.size); attr.st_blocks = static_cast<blkcnt_t>(out.blocks); /* clean up resources consumed by this rpc */ HG_Free_output(handle, &out); } else { ADAFS_DATA->spdlogger()->error("RPC send_get_attr (timed out)"); } HG_Free_input(handle, &in); HG_Destroy(handle); return 0; } No newline at end of file lfs/src/rpc/client/c_metadata.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -12,4 +12,6 @@ void send_minimal_rpc(void* arg); int rpc_send_create(const size_t recipient, const fuse_ino_t parent, const std::string& name, const uid_t uid, const gid_t gid, const mode_t mode, fuse_ino_t& new_inode); int rpc_send_get_attr(const size_t recipient, const fuse_ino_t inode, struct stat& attr); #endif //LFS_C_METADATA_HPP Loading
lfs/src/classes/rpc_data.cpp +9 −0 Original line number Diff line number Diff line Loading @@ -73,6 +73,14 @@ void RPCData::rpc_srv_create_id(hg_id_t rpc_srv_create_id) { RPCData::rpc_srv_create_id_ = rpc_srv_create_id; } hg_id_t RPCData::rpc_srv_attr_id() const { return rpc_srv_attr_id_; } void RPCData::rpc_srv_attr_id(hg_id_t rpc_srv_attr_id) { RPCData::rpc_srv_attr_id_ = rpc_srv_attr_id; } // Utility functions bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { Loading Loading @@ -101,3 +109,4 @@ std::string RPCData::get_dentry_hashable(const fuse_ino_t parent, const char* na
lfs/src/classes/rpc_data.hpp +5 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ private: // RPC client IDs hg_id_t rpc_minimal_id_; hg_id_t rpc_srv_create_id_; hg_id_t rpc_srv_attr_id_; public: Loading Loading @@ -81,6 +82,10 @@ public: void rpc_srv_create_id(hg_id_t rpc_srv_create_id); hg_id_t rpc_srv_attr_id() const; void rpc_srv_attr_id(hg_id_t rpc_srv_attr_id); // Utility functions bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr); Loading
lfs/src/fuse_ops/file.cpp +19 −3 Original line number Diff line number Diff line Loading @@ -35,11 +35,27 @@ using namespace std; void adafs_ll_getattr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { ADAFS_DATA->spdlogger()->debug("adafs_ll_getattr() enter: inode {}", ino); auto attr = make_shared<struct stat>(); auto err = get_attr(*attr, ino); struct stat attr{}; int err; if (ADAFS_DATA->host_size() > 1) { // might be remote auto recipient = RPC_DATA->get_rpc_node(fmt::FormatInt(ino).str()); if (recipient == ADAFS_DATA->host_id()) { // local err = get_attr(attr, ino); } else { // remote err = rpc_send_get_attr(recipient, ino, attr); } } else { // local err = get_attr(attr, ino); } // auto attr = make_shared<struct stat>(); if (err == 0) { // XXX take a look into timeout value later fuse_reply_attr(req, attr.get(), 1.0); fuse_reply_attr(req, &attr, 1.0); } else { fuse_reply_err(req, err); } Loading
lfs/src/rpc/client/c_metadata.cpp +52 −1 Original line number Diff line number Diff line Loading @@ -145,7 +145,7 @@ int rpc_send_create(const size_t recipient, const fuse_ino_t parent, const strin /* clean up resources consumed by this rpc */ HG_Free_output(handle, &out); } else { ADAFS_DATA->spdlogger()->error("RPC NOT send (timed out)"); ADAFS_DATA->spdlogger()->error("RPC send_create (timed out)"); } Loading @@ -153,3 +153,54 @@ int rpc_send_create(const size_t recipient, const fuse_ino_t parent, const strin HG_Destroy(handle); return 0; } int rpc_send_get_attr(const size_t recipient, const fuse_ino_t inode, struct stat& attr) { hg_handle_t handle; hg_addr_t svr_addr = HG_ADDR_NULL; rpc_get_attr_in_t in; rpc_get_attr_out_t out; // fill in in.inode = inode; // TODO HG_ADDR_T is never freed atm. Need to change LRUCache if (!RPC_DATA->get_addr_by_hostid(recipient, svr_addr)) { ADAFS_DATA->spdlogger()->error("server address not resolvable for host id {}", recipient); return 1; } auto ret = HG_Create(RPC_DATA->client_hg_context(), svr_addr, RPC_DATA->rpc_srv_attr_id(), &handle); if (ret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("creating handle FAILED"); return 1; } int send_ret = HG_FALSE; for (int i = 0; i < max_retries; ++i) { send_ret = margo_forward_timed(RPC_DATA->client_mid(), handle, &in, 15000); if (send_ret == HG_SUCCESS) { break; } } if (send_ret == HG_SUCCESS) { /* decode response */ ret = HG_Get_output(handle, &out); ADAFS_DATA->spdlogger()->debug("Got response mode {}", out.mode); attr.st_atim.tv_sec = static_cast<time_t>(out.atime); attr.st_mtim.tv_sec = static_cast<time_t>(out.mtime); attr.st_ctim.tv_sec = static_cast<time_t>(out.ctime); attr.st_mode = static_cast<mode_t>(out.mode); attr.st_uid = static_cast<uid_t>(out.uid); attr.st_gid = static_cast<gid_t>(out.gid); attr.st_nlink = static_cast<nlink_t>(out.nlink); attr.st_size = static_cast<size_t>(out.size); attr.st_blocks = static_cast<blkcnt_t>(out.blocks); /* clean up resources consumed by this rpc */ HG_Free_output(handle, &out); } else { ADAFS_DATA->spdlogger()->error("RPC send_get_attr (timed out)"); } HG_Free_input(handle, &in); HG_Destroy(handle); return 0; } No newline at end of file
lfs/src/rpc/client/c_metadata.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -12,4 +12,6 @@ void send_minimal_rpc(void* arg); int rpc_send_create(const size_t recipient, const fuse_ino_t parent, const std::string& name, const uid_t uid, const gid_t gid, const mode_t mode, fuse_ino_t& new_inode); int rpc_send_get_attr(const size_t recipient, const fuse_ino_t inode, struct stat& attr); #endif //LFS_C_METADATA_HPP