Loading lfs/src/classes/fs_data.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -166,11 +166,11 @@ void FsData::host_size(size_t host_size) { FsData::host_size_ = host_size; } int FsData::rpc_port() const { std::string FsData::rpc_port() const { return rpc_port_; } void FsData::rpc_port(int rpc_port) { void FsData::rpc_port(std::string rpc_port) { FsData::rpc_port_ = rpc_port; } Loading lfs/src/classes/fs_data.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -38,7 +38,7 @@ private: std::map<uint64_t, std::string> hosts_; uint64_t host_id_; // my host number size_t host_size_; int rpc_port_; std::string rpc_port_; // rocksdb std::shared_ptr<rocksdb::DB> rdb_; Loading Loading @@ -145,9 +145,9 @@ public: void host_size(size_t host_size); int rpc_port() const; std::string rpc_port() const; void rpc_port(int rpc_port); void rpc_port(std::string rpc_port); // Utility member functions Loading lfs/src/classes/rpc_data.cpp +20 −1 Original line number Diff line number Diff line Loading @@ -65,6 +65,14 @@ lru11::Cache<uint64_t, hg_addr_t>& RPCData::address_cache() { return address_cache_; } hg_id_t RPCData::rpc_srv_create_id() const { return rpc_srv_create_id_; } void RPCData::rpc_srv_create_id(hg_id_t rpc_srv_create_id) { RPCData::rpc_srv_create_id_ = rpc_srv_create_id; } // Utility functions bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { Loading @@ -73,7 +81,8 @@ bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { return true; } else { // not found, manual lookup and add address mapping to LRU cache auto hostname = ADAFS_DATA->hosts().at(hostid); // convert hostid to hostname auto hostname = "cci+tcp://" + ADAFS_DATA->hosts().at(hostid) + ":" + ADAFS_DATA->rpc_port(); // convert hostid to hostname and port margo_addr_lookup(RPC_DATA->client_mid(), hostname.c_str(), &svr_addr); if (svr_addr == HG_ADDR_NULL) return false; Loading @@ -82,3 +91,13 @@ bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { } } size_t RPCData::get_rpc_node(std::string to_hash) { return ADAFS_DATA->hashf()(to_hash) % ADAFS_DATA->host_size(); } std::string RPCData::get_dentry_hashable(const fuse_ino_t parent, const char* name) { return fmt::FormatInt(parent).str() + "_" + name; } lfs/src/classes/rpc_data.hpp +9 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ private: // TODO RPC client IDs // RPC client IDs hg_id_t rpc_minimal_id_; hg_id_t rpc_srv_create_id_; public: Loading Loading @@ -76,9 +77,17 @@ public: lru11::Cache<uint64_t, hg_addr_t>& address_cache(); hg_id_t rpc_srv_create_id() const; void rpc_srv_create_id(hg_id_t rpc_srv_create_id); // Utility functions bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr); size_t get_rpc_node(std::string to_hash); std::string get_dentry_hashable(const fuse_ino_t parent, const char* name); }; Loading lfs/src/fuse_ops/file.cpp +50 −15 Original line number Diff line number Diff line Loading @@ -118,12 +118,24 @@ void adafs_ll_setattr(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_ // TODO } auto buf = make_shared<struct stat>(); auto err = get_attr(*buf, ino); struct stat buf{}; auto err = get_attr(buf, ino); // TODO I think we need a cache to cache metadata on a node. Otherwise we have to get the metadata remotely all the time // int err = 0; // buf.st_ino = ino; // buf.st_size = attr->st_size; // buf.st_nlink = attr->st_nlink; // buf.st_gid = fuse_req_ctx(req)->gid; // buf.st_blocks = attr->st_blocks; // buf.st_blksize = attr->st_blksize; // buf.st_mode = S_IFREG | 477; // buf.st_uid = fuse_req_ctx(req)->gid; // buf.st_atim = attr->st_atim; // buf.st_mtim = attr->st_mtim; // buf.st_ctim = attr->st_ctim; if (err == 0) { fuse_reply_attr(req, buf.get(), 1.0); fuse_reply_attr(req, &buf, 1.0); } else { fuse_reply_err(req, err); } Loading Loading @@ -177,21 +189,44 @@ void adafs_ll_create(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t // auto diff_count = chrono::duration_cast<ns>(diff).count(); // ADAFS_DATA->spdlogger()->info("TIME SPENT: {} microseconds", (diff_count / 1000)); // send_rpc... auto fep = make_shared<fuse_entry_param>(); // XXX check if file exists (how can we omit this? Let's just try to create it and see if it fails) // XXX check permissions (omittable) // XXX all this below stuff needs to be atomic. reroll if error happens fuse_entry_param fep{}; int err; auto uid = fuse_req_ctx(req)->uid; auto gid = fuse_req_ctx(req)->gid; if (ADAFS_DATA->host_size() > 1) { auto recipient = RPC_DATA->get_rpc_node(RPC_DATA->get_dentry_hashable(parent, name)); if (recipient == ADAFS_DATA->host_id()) { // local // XXX check permissions (omittable), should create node be atomic? err = create_node(fep, parent, string(name), uid, gid, S_IFREG | mode); } else { // remote fuse_ino_t new_inode; err = rpc_send_create(recipient, parent, name, uid, gid, S_IFREG | mode, new_inode); // rpc_send_create only returns the new inode. The other values are set to default values which are set // during create operation on the other node. Since the values are expected rpc doesn't need to return it if (err == 0) { fep.ino = new_inode; fep.attr.st_ino = new_inode; fep.attr.st_mode = mode; fep.attr.st_blocks = 0; fep.attr.st_gid = gid; fep.attr.st_uid = uid; fep.attr.st_nlink = 0; fep.attr.st_size = 0; fep.entry_timeout = 1.0; fep.attr_timeout = 1.0; // times are ignored here } } } else { // local // XXX check permissions (omittable), should create node be atomic? err = create_node(fep, parent, string(name), uid, gid, S_IFREG | mode); } auto err = create_node(*fep, parent, string(name), fuse_req_ctx(req)->uid, fuse_req_ctx(req)->gid, S_IFREG | mode); // XXX create chunk space if (err == 0) fuse_reply_create(req, fep.get(), fi); fuse_reply_create(req, &fep, fi); else fuse_reply_err(req, err); } Loading Loading
lfs/src/classes/fs_data.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -166,11 +166,11 @@ void FsData::host_size(size_t host_size) { FsData::host_size_ = host_size; } int FsData::rpc_port() const { std::string FsData::rpc_port() const { return rpc_port_; } void FsData::rpc_port(int rpc_port) { void FsData::rpc_port(std::string rpc_port) { FsData::rpc_port_ = rpc_port; } Loading
lfs/src/classes/fs_data.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -38,7 +38,7 @@ private: std::map<uint64_t, std::string> hosts_; uint64_t host_id_; // my host number size_t host_size_; int rpc_port_; std::string rpc_port_; // rocksdb std::shared_ptr<rocksdb::DB> rdb_; Loading Loading @@ -145,9 +145,9 @@ public: void host_size(size_t host_size); int rpc_port() const; std::string rpc_port() const; void rpc_port(int rpc_port); void rpc_port(std::string rpc_port); // Utility member functions Loading
lfs/src/classes/rpc_data.cpp +20 −1 Original line number Diff line number Diff line Loading @@ -65,6 +65,14 @@ lru11::Cache<uint64_t, hg_addr_t>& RPCData::address_cache() { return address_cache_; } hg_id_t RPCData::rpc_srv_create_id() const { return rpc_srv_create_id_; } void RPCData::rpc_srv_create_id(hg_id_t rpc_srv_create_id) { RPCData::rpc_srv_create_id_ = rpc_srv_create_id; } // Utility functions bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { Loading @@ -73,7 +81,8 @@ bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { return true; } else { // not found, manual lookup and add address mapping to LRU cache auto hostname = ADAFS_DATA->hosts().at(hostid); // convert hostid to hostname auto hostname = "cci+tcp://" + ADAFS_DATA->hosts().at(hostid) + ":" + ADAFS_DATA->rpc_port(); // convert hostid to hostname and port margo_addr_lookup(RPC_DATA->client_mid(), hostname.c_str(), &svr_addr); if (svr_addr == HG_ADDR_NULL) return false; Loading @@ -82,3 +91,13 @@ bool RPCData::get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) { } } size_t RPCData::get_rpc_node(std::string to_hash) { return ADAFS_DATA->hashf()(to_hash) % ADAFS_DATA->host_size(); } std::string RPCData::get_dentry_hashable(const fuse_ino_t parent, const char* name) { return fmt::FormatInt(parent).str() + "_" + name; }
lfs/src/classes/rpc_data.hpp +9 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ private: // TODO RPC client IDs // RPC client IDs hg_id_t rpc_minimal_id_; hg_id_t rpc_srv_create_id_; public: Loading Loading @@ -76,9 +77,17 @@ public: lru11::Cache<uint64_t, hg_addr_t>& address_cache(); hg_id_t rpc_srv_create_id() const; void rpc_srv_create_id(hg_id_t rpc_srv_create_id); // Utility functions bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr); size_t get_rpc_node(std::string to_hash); std::string get_dentry_hashable(const fuse_ino_t parent, const char* name); }; Loading
lfs/src/fuse_ops/file.cpp +50 −15 Original line number Diff line number Diff line Loading @@ -118,12 +118,24 @@ void adafs_ll_setattr(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_ // TODO } auto buf = make_shared<struct stat>(); auto err = get_attr(*buf, ino); struct stat buf{}; auto err = get_attr(buf, ino); // TODO I think we need a cache to cache metadata on a node. Otherwise we have to get the metadata remotely all the time // int err = 0; // buf.st_ino = ino; // buf.st_size = attr->st_size; // buf.st_nlink = attr->st_nlink; // buf.st_gid = fuse_req_ctx(req)->gid; // buf.st_blocks = attr->st_blocks; // buf.st_blksize = attr->st_blksize; // buf.st_mode = S_IFREG | 477; // buf.st_uid = fuse_req_ctx(req)->gid; // buf.st_atim = attr->st_atim; // buf.st_mtim = attr->st_mtim; // buf.st_ctim = attr->st_ctim; if (err == 0) { fuse_reply_attr(req, buf.get(), 1.0); fuse_reply_attr(req, &buf, 1.0); } else { fuse_reply_err(req, err); } Loading Loading @@ -177,21 +189,44 @@ void adafs_ll_create(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t // auto diff_count = chrono::duration_cast<ns>(diff).count(); // ADAFS_DATA->spdlogger()->info("TIME SPENT: {} microseconds", (diff_count / 1000)); // send_rpc... auto fep = make_shared<fuse_entry_param>(); // XXX check if file exists (how can we omit this? Let's just try to create it and see if it fails) // XXX check permissions (omittable) // XXX all this below stuff needs to be atomic. reroll if error happens fuse_entry_param fep{}; int err; auto uid = fuse_req_ctx(req)->uid; auto gid = fuse_req_ctx(req)->gid; if (ADAFS_DATA->host_size() > 1) { auto recipient = RPC_DATA->get_rpc_node(RPC_DATA->get_dentry_hashable(parent, name)); if (recipient == ADAFS_DATA->host_id()) { // local // XXX check permissions (omittable), should create node be atomic? err = create_node(fep, parent, string(name), uid, gid, S_IFREG | mode); } else { // remote fuse_ino_t new_inode; err = rpc_send_create(recipient, parent, name, uid, gid, S_IFREG | mode, new_inode); // rpc_send_create only returns the new inode. The other values are set to default values which are set // during create operation on the other node. Since the values are expected rpc doesn't need to return it if (err == 0) { fep.ino = new_inode; fep.attr.st_ino = new_inode; fep.attr.st_mode = mode; fep.attr.st_blocks = 0; fep.attr.st_gid = gid; fep.attr.st_uid = uid; fep.attr.st_nlink = 0; fep.attr.st_size = 0; fep.entry_timeout = 1.0; fep.attr_timeout = 1.0; // times are ignored here } } } else { // local // XXX check permissions (omittable), should create node be atomic? err = create_node(fep, parent, string(name), uid, gid, S_IFREG | mode); } auto err = create_node(*fep, parent, string(name), fuse_req_ctx(req)->uid, fuse_req_ctx(req)->gid, S_IFREG | mode); // XXX create chunk space if (err == 0) fuse_reply_create(req, fep.get(), fi); fuse_reply_create(req, &fep, fi); else fuse_reply_err(req, err); } Loading