Newer
Older
#include <fstream>
#include <global/rpc/rpc_utils.hpp>
#include <global/global_func.hpp>
#include <csignal>
using namespace std;
static const std::string dentry_val_delim = ","s;
/*
* TODO: Setting our file descriptor index to a specific value is dangerous because we might clash with the kernel.
* E.g., if we would passthrough and not intercept and the kernel assigns a file descriptor but we will later use
* the same fd value, we will intercept calls that were supposed to be going to the kernel. This works the other way around too.
* To mitigate this issue, we set the initial fd number to a high value. We "hope" that we do not clash but this is no permanent solution.
* Note: This solution will probably work well already for many cases because kernel fd values are reused, unlike to ours.
* The only case where we will clash with the kernel is, if one process has more than 100000 files open at the same time.
*/
static int fd_idx = 100000;
static mutex fd_idx_mutex;
std::atomic<bool> fd_validation_needed(false);
/**
* Generate new file descriptor index to be used as an fd within one process in ADA-FS
* @return fd_idx
*/
int generate_fd_idx() {
// We need a mutex here for thread safety
std::lock_guard<std::mutex> inode_lock(fd_idx_mutex);
if (fd_idx == std::numeric_limits<int>::max()) {
ld_logger->info("{}() File descriptor index exceeded ints max value. Setting it back to 100000", __func__);
/*
* Setting fd_idx back to 3 could have the effect that fd are given twice for different path.
* This must not happen. Instead a flag is set which tells can tell the OpenFileMap that it should check
* if this fd is really safe to use.
*/
fd_idx = 100000;
fd_validation_needed = true;
}
return fd_idx++;
}
int get_fd_idx() {
std::lock_guard<std::mutex> inode_lock(fd_idx_mutex);
return fd_idx;
}
bool is_fs_path(const char* path) {
return strstr(path, fs_config->mountdir.c_str()) == path;
// TODO merge the two stat functions
/**
* Converts the dentry db value into a stat struct, which is needed by Linux
* @param path
* @param db_val
* @param attr
* @return
*/
int db_val_to_stat(const std::string path, std::string db_val, struct stat& attr) {
auto pos = db_val.find(dentry_val_delim);
if (pos == std::string::npos) { // no delimiter found => no metadata enabled. fill with dummy values
attr.st_ino = std::hash<std::string>{}(path);
attr.st_mode = static_cast<unsigned int>(stoul(db_val));
attr.st_nlink = 1;
attr.st_uid = fs_config->uid;
attr.st_gid = fs_config->gid;
attr.st_size = 0;
attr.st_blksize = BLOCKSIZE;
attr.st_blocks = 0;
attr.st_atim.tv_sec = 0;
attr.st_mtim.tv_sec = 0;
attr.st_ctim.tv_sec = 0;
return 0;
}
// some metadata is enabled: mode is always there
attr.st_mode = static_cast<unsigned int>(stoul(db_val.substr(0, pos)));
// size is also there XXX
pos = db_val.find(dentry_val_delim);
if (pos != std::string::npos) { // delimiter found. more metadata is coming
attr.st_size = stol(db_val.substr(0, pos));
db_val.erase(0, pos + 1);
} else {
attr.st_size = stol(db_val);
return 0;
}
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// The order is important. don't change.
if (fs_config->atime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_atim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->mtime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_mtim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->ctime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ctim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->uid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_uid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->gid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_gid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->inode_no_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ino = static_cast<ino_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->link_cnt_state) {
pos = db_val.find(dentry_val_delim);
attr.st_nlink = static_cast<nlink_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->blocks_state) { // last one will not encounter a delimiter anymore
attr.st_blocks = static_cast<blkcnt_t>(stoul(db_val));
}
return 0;
}
/**
* Converts the dentry db value into a stat64 struct, which is needed by Linux
* @param path
* @param db_val
* @param attr
* @return
*/
int db_val_to_stat64(const std::string path, std::string db_val, struct stat64& attr) {
auto pos = db_val.find(dentry_val_delim);
if (pos == std::string::npos) { // no delimiter found => no metadata enabled. fill with dummy values
attr.st_ino = std::hash<std::string>{}(path);
attr.st_mode = static_cast<unsigned int>(stoul(db_val));
attr.st_nlink = 1;
attr.st_uid = fs_config->uid;
attr.st_gid = fs_config->gid;
attr.st_size = 0;
attr.st_blksize = BLOCKSIZE;
attr.st_blocks = 0;
attr.st_atim.tv_sec = 0;
attr.st_mtim.tv_sec = 0;
attr.st_ctim.tv_sec = 0;
return 0;
}
// some metadata is enabled: mode is always there
attr.st_mode = static_cast<unsigned int>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
// size is also there XXX
pos = db_val.find(dentry_val_delim);
if (pos != std::string::npos) { // delimiter found. more metadata is coming
attr.st_size = stol(db_val.substr(0, pos));
db_val.erase(0, pos + 1);
} else {
attr.st_size = stol(db_val);
return 0;
}
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// The order is important. don't change.
if (fs_config->atime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_atim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->mtime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_mtim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->ctime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ctim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->uid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_uid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->gid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_gid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->inode_no_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ino = static_cast<ino_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->link_cnt_state) {
pos = db_val.find(dentry_val_delim);
attr.st_nlink = static_cast<nlink_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->blocks_state) { // last one will not encounter a delimiter anymore
attr.st_blocks = static_cast<blkcnt_t>(stoul(db_val));
}
return 0;
* @return daemon pid. If not running @return -1.
* Loads set deamon mountdir set in daemon.pid file
int get_daemon_pid() {
ifstream ifs(daemon_pid_path(), ::ifstream::in);
int adafs_daemon_pid = -1;
string mountdir;
if (ifs) {
string adafs_daemon_pid_s;
// first line is pid
if (getline(ifs, adafs_daemon_pid_s) && !adafs_daemon_pid_s.empty())
adafs_daemon_pid = ::stoi(adafs_daemon_pid_s);
else {
cerr << "ADA-FS daemon pid not found. Daemon not running?" << endl;
ld_logger->error("{}() Unable to read daemon pid from pid file", __func__);
ifs.close();
return -1;
}
// check that daemon is running
if (kill(adafs_daemon_pid, 0) != 0) {
cerr << "ADA-FS daemon process with pid " << adafs_daemon_pid << " not found. Daemon not running?" << endl;
ld_logger->error("{}() ADA-FS daemon pid {} not found. Daemon not running?", __func__, adafs_daemon_pid);
ifs.close();
return -1;
}
// second line is mountdir
if (getline(ifs, mountdir) && !mountdir.empty()) {
fs_config->mountdir = mountdir;
} else {
ld_logger->error("{}() ADA-FS daemon pid file contains no mountdir path. Exiting ...", __func__);
ifs.close();
return -1;
}
} else {
cerr << "No permission to open pid file at " << daemon_pid_path()
<< " or ADA-FS daemon pid file not found. Daemon not running?" << endl;
ld_logger->error(
"{}() No permission to open pid file at {} or ADA-FS daemon pid file not found. Daemon not running?",
__func__, daemon_pid_path());
}
ifs.close();
return adafs_daemon_pid;
}
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/**
* Read /etc/hosts and put hostname - ip association into a map in fs config.
* We are working with hostnames but some network layers (such as Omnipath) does not look into /etc/hosts.
* Hence, we have to store the mapping ourselves.
* @return success
*/
bool read_system_hostfile() {
ifstream hostfile("/etc/hosts");
if (!hostfile.is_open())
return false;
string line;
map<string, string> sys_hostfile;
while (getline(hostfile, line)) {
if (line.empty() || line == "\n" || line.at(0) == '#')
continue;
std::istringstream iss(line);
std::vector<string> tmp_list((istream_iterator<string>(iss)), istream_iterator<string>());
for (unsigned int i = 1; i < tmp_list.size(); i++) {
if (tmp_list[i].find(HOSTNAME_SUFFIX) != string::npos)
sys_hostfile.insert(make_pair(tmp_list[i], tmp_list[0]));
}
}
fs_config->sys_hostfile = sys_hostfile;
ld_logger->info("{}() /etc/hosts successfully mapped into ADA-FS", __func__);
return true;
}
/**
* Creates an abstract rpc address for a given hostid and puts it into an address cache map
* @param hostid
* @param svr_addr
* @return
*/
bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
/*
* This function might get called from within an Argobots thread.
* A std::mutex would lead to a deadlock as it does not yield the thread by sending it to BLOCKING state
*/
ABT_mutex_lock(rpc_address_cache_mutex);
auto address_lookup = rpc_address_cache.find(hostid);
if (address_lookup != rpc_address_cache.end()) {
svr_addr = address_lookup->second;
ld_logger->trace("RPC address lookup success with hostid {}", address_lookup->first);
ABT_mutex_unlock(rpc_address_cache_mutex);
Marc Vef
committed
// not found, manual lookup and add address mapping to LRU cache
ld_logger->trace("not found in lrucache");
Marc Vef
committed
// Try to get the ip of remote addr. If it cannot be found, use hostname
// first get the hostname with the hostid
auto hostname = fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX;
// then get the ip address from /etc/hosts which is mapped to the sys_hostfile map
if (fs_config->sys_hostfile.count(hostname) == 1) {
auto remote_ip = fs_config->sys_hostfile.at(hostname);
remote_addr = RPC_PROTOCOL + "://"s + remote_ip + ":"s + fs_config->rpc_port;
Marc Vef
committed
// fallback hostname to use for lookup
Marc Vef
committed
remote_addr = RPC_PROTOCOL + "://"s + hostname + ":"s +
fs_config->rpc_port; // convert hostid to remote_addr and port
}
ld_logger->trace("generated remote_addr {} for hostname {} with rpc_port {}",
remote_addr, hostname, fs_config->rpc_port);
// try to look up 3 times before erroring out
hg_return_t ret;
// TODO If this is solution is somewhat helpful, write a more versatile solution
for (unsigned int i = 0; i < 3; i++) {
ret = margo_addr_lookup(ld_margo_rpc_id, remote_addr.c_str(), &svr_addr);
if (ret != HG_SUCCESS) {
// still not working after 5 tries.
if (i == 4) {
ld_logger->error("{}() Unable to lookup address {} from host {}", __func__,
remote_addr, fs_config->hosts.at(fs_config->host_id));
ABT_mutex_unlock(rpc_address_cache_mutex);
return false;
}
// Wait a second then try again
// TODO fix that terrible solution
sleep(1 * (i + 1));
} else {
break;
}
}
if (svr_addr == HG_ADDR_NULL) {
ld_logger->error("{}() looked up address is NULL for address {} from host {}", __func__,
remote_addr, fs_config->hosts.at(fs_config->host_id));
ABT_mutex_unlock(rpc_address_cache_mutex);
}
rpc_address_cache.insert(make_pair(hostid, svr_addr));
ABT_mutex_unlock(rpc_address_cache_mutex);
return true;
}
}
/**
* Determines if the recipient id in an RPC is refering to the local or an remote node
* @param recipient
* @return
*/
bool is_local_op(const size_t recipient) {
return recipient == fs_config->host_id;
}
inline hg_return
margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t recipient, hg_handle_t& handle,
if (is_local_op(recipient) && !force_rpc) { // local
ret = margo_create(ld_margo_ipc_id, daemon_svr_addr, ipc_id, &handle);
ld_logger->debug("{}() to local daemon (IPC)", __func__);
} else { // remote
hg_addr_t svr_addr = HG_ADDR_NULL;
if (!get_addr_by_hostid(recipient, svr_addr)) {
ld_logger->error("{}() server address not resolvable for host id {}", __func__, recipient);
return HG_OTHER_ERROR;
}
ret = margo_create(ld_margo_rpc_id, svr_addr, rpc_id, &handle);
ld_logger->debug("{}() to remote daemon (RPC)", __func__);
}
if (ret != HG_SUCCESS) {
ld_logger->error("{}() creating handle FAILED", __func__);
return HG_OTHER_ERROR;
}
return ret;
}
/**
* Wraps certain margo functions to create a Mercury handle
* @param ipc_id
* @param rpc_id
* @param path
* @param handle
* @return
*/
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
bool force_rpc) {
return margo_create_wrap_helper(ipc_id, rpc_id, adafs_hash_path(path, fs_config->host_size), handle, force_rpc);
}
/**
* Wraps certain margo functions to create a Mercury handle
* @param ipc_id
* @param rpc_id
* @param recipient
* @param handle
* @param svr_addr
* @return
*/
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t& recipient, hg_handle_t& handle,
bool force_rpc) {
return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, force_rpc);