Newer
Older
#include <dirent.h>
#include <fstream>
using namespace std;
static const std::string dentry_val_delim = ","s;
bool is_fs_path(const char* path) {
return strstr(path, fs_config->mountdir.c_str()) == path;
// TODO merge the two stat functions
/**
* Converts the dentry db value into a stat struct, which is needed by Linux
* @param path
* @param db_val
* @param attr
* @return
*/
int db_val_to_stat(const std::string path, std::string db_val, struct stat& attr) {
auto pos = db_val.find(dentry_val_delim);
if (pos == std::string::npos) { // no delimiter found => no metadata enabled. fill with dummy values
attr.st_ino = std::hash<std::string>{}(path);
attr.st_mode = static_cast<unsigned int>(stoul(db_val));
attr.st_nlink = 1;
attr.st_uid = fs_config->uid;
attr.st_gid = fs_config->gid;
attr.st_size = 0;
attr.st_blksize = BLOCKSIZE;
attr.st_blocks = 0;
attr.st_atim.tv_sec = 0;
attr.st_mtim.tv_sec = 0;
attr.st_ctim.tv_sec = 0;
return 0;
}
// some metadata is enabled: mode is always there
attr.st_mode = static_cast<unsigned int>(stoul(db_val.substr(0, pos)));
// size is also there XXX
pos = db_val.find(dentry_val_delim);
if (pos != std::string::npos) { // delimiter found. more metadata is coming
attr.st_size = stol(db_val.substr(0, pos));
db_val.erase(0, pos + 1);
} else {
attr.st_size = stol(db_val);
return 0;
}
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// The order is important. don't change.
if (fs_config->atime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_atim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->mtime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_mtim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->ctime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ctim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->uid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_uid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->gid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_gid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->inode_no_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ino = static_cast<ino_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->link_cnt_state) {
pos = db_val.find(dentry_val_delim);
attr.st_nlink = static_cast<nlink_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->blocks_state) { // last one will not encounter a delimiter anymore
attr.st_blocks = static_cast<blkcnt_t>(stoul(db_val));
}
return 0;
}
/**
* Converts the dentry db value into a stat64 struct, which is needed by Linux
* @param path
* @param db_val
* @param attr
* @return
*/
int db_val_to_stat64(const std::string path, std::string db_val, struct stat64& attr) {
auto pos = db_val.find(dentry_val_delim);
if (pos == std::string::npos) { // no delimiter found => no metadata enabled. fill with dummy values
attr.st_ino = std::hash<std::string>{}(path);
attr.st_mode = static_cast<unsigned int>(stoul(db_val));
attr.st_nlink = 1;
attr.st_uid = fs_config->uid;
attr.st_gid = fs_config->gid;
attr.st_size = 0;
attr.st_blksize = BLOCKSIZE;
attr.st_blocks = 0;
attr.st_atim.tv_sec = 0;
attr.st_mtim.tv_sec = 0;
attr.st_ctim.tv_sec = 0;
return 0;
}
// some metadata is enabled: mode is always there
attr.st_mode = static_cast<unsigned int>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
// size is also there XXX
pos = db_val.find(dentry_val_delim);
if (pos != std::string::npos) { // delimiter found. more metadata is coming
attr.st_size = stol(db_val.substr(0, pos));
db_val.erase(0, pos + 1);
} else {
attr.st_size = stol(db_val);
return 0;
}
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// The order is important. don't change.
if (fs_config->atime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_atim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->mtime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_mtim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->ctime_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ctim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->uid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_uid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->gid_state) {
pos = db_val.find(dentry_val_delim);
attr.st_gid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->inode_no_state) {
pos = db_val.find(dentry_val_delim);
attr.st_ino = static_cast<ino_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->link_cnt_state) {
pos = db_val.find(dentry_val_delim);
attr.st_nlink = static_cast<nlink_t>(stoul(db_val.substr(0, pos)));
db_val.erase(0, pos + 1);
}
if (fs_config->blocks_state) { // last one will not encounter a delimiter anymore
attr.st_blocks = static_cast<blkcnt_t>(stoul(db_val));
}
return 0;
/**
* Returns the process id for a process name
* @param procName
* @return
*/
int getProcIdByName(string procName) {
int pid = -1;
// Open the /proc directory
DIR* dp = opendir("/proc");
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Enumerate all entries in directory until process found
struct dirent* dirp;
while (pid < 0 && (dirp = readdir(dp))) {
// Skip non-numeric entries
int id = atoi(dirp->d_name);
if (id > 0) {
// Read contents of virtual /proc/{pid}/cmdline file
string cmdPath = string("/proc/") + dirp->d_name + "/cmdline";
ifstream cmdFile(cmdPath.c_str());
string cmdLine;
getline(cmdFile, cmdLine);
if (!cmdLine.empty()) {
// Keep first cmdline item which contains the program path
size_t pos = cmdLine.find('\0');
if (pos != string::npos)
cmdLine = cmdLine.substr(0, pos);
// Keep program name only, removing the path
pos = cmdLine.rfind('/');
if (pos != string::npos)
cmdLine = cmdLine.substr(pos + 1);
// Compare against requested process name
if (procName == cmdLine)
pid = id;
}
}
}
}
closedir(dp);
return pid;
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
/**
* Returns the path where daemon process writes information for the running clients
* @return string
*/
string daemon_register_path(int pid) {
return (DAEMON_AUX_PATH + "/daemon_"s + to_string(pid) + ".run"s);
}
bool get_daemon_auxiliaries() {
auto ret = false;
auto adafs_daemon_pid = getProcIdByName("adafs_daemon"s);
if (adafs_daemon_pid == -1) {
ld_logger->error("{}() ADA-FS daemon not started. Exiting ...", __func__);
perror("ADA-FS daemon not started. Exiting ...");
return false;
}
ifstream ifs(daemon_register_path(adafs_daemon_pid).c_str(), ::ifstream::in);
string mountdir;
if (ifs) {
getline(ifs, mountdir);
if (!mountdir.empty()) {
ret = true;
fs_config->mountdir = mountdir;
} else {
ld_logger->error("{}() Error reading daemon auxiliary file", __func__);
}
}
if (ifs.bad()) {
perror("Error opening file to register daemon process");
ld_logger->error("{}() Error opening file to daemon auxiliary file", __func__);
return false;
}
ifs.close();
return ret;
}
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/**
* Read /etc/hosts and put hostname - ip association into a map in fs config.
* We are working with hostnames but some network layers (such as Omnipath) does not look into /etc/hosts.
* Hence, we have to store the mapping ourselves.
* @return success
*/
bool read_system_hostfile() {
ifstream hostfile("/etc/hosts");
if (!hostfile.is_open())
return false;
string line;
map<string, string> sys_hostfile;
while (getline(hostfile, line)) {
if (line.empty() || line == "\n" || line.at(0) == '#')
continue;
std::istringstream iss(line);
std::vector<string> tmp_list((istream_iterator<string>(iss)), istream_iterator<string>());
for (unsigned int i = 1; i < tmp_list.size(); i++) {
if (tmp_list[i].find(HOSTNAME_SUFFIX) != string::npos)
sys_hostfile.insert(make_pair(tmp_list[i], tmp_list[0]));
}
}
fs_config->sys_hostfile = sys_hostfile;
ld_logger->info("{}() /etc/hosts successfully mapped into ADA-FS", __func__);
return true;
}
/**
* Creates an abstract rpc address for a given hostid and puts it into an address cache map
* @param hostid
* @param svr_addr
* @return
*/
bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
if (rpc_address_cache.tryGet(hostid, svr_addr)) {
ld_logger->trace("tryGet successful and put in svr_addr");
//found
return true;
} else {
ld_logger->trace("not found in lrucache");
// not found, manual lookup and add address mapping to LRU cache
// ofi+psm2 requires an IP. An remote_addr, even if put in /etc/hosts, does not suffice
if (string(RPC_PROTOCOL) ==
"ofi+psm2") { // XXX dangerous. redo. The whole definition insanity needs to be const expr anyways
// first get the hostname with the hostid
auto hostname = fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX;
// then get the ip address from /etc/hosts which is mapped to the sys_hostfile map
if (fs_config->sys_hostfile.count(hostname) == 1) {
auto remote_ip = fs_config->sys_hostfile.at(hostname);
remote_addr = RPC_PROTOCOL + "://"s + remote_ip + ":"s + fs_config->rpc_port;
}
}
if (remote_addr.empty()) {
remote_addr = RPC_PROTOCOL + "://"s + fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX + ":"s +
fs_config->rpc_port; // convert hostid to remote_addr and port
}
ld_logger->trace("generated remote_addr {} with rpc_port {}", remote_addr, fs_config->rpc_port);
// try to look up 3 times before erroring out
hg_return_t ret;
// TODO If this is solution is somewhat helpful, write a more versatile solution
for (unsigned int i = 0; i < 3; i++) {
ret = margo_addr_lookup(ld_margo_rpc_id, remote_addr.c_str(), &svr_addr);
if (ret != HG_SUCCESS) {
// still not working after 5 tries.
if (i == 4) {
ld_logger->error("{}() Unable to lookup address {} from host {}", __func__,
remote_addr, fs_config->hosts.at(fs_config->host_id));
return false;
}
// Wait a second then try again
sleep(1 * (i + 1));
} else {
break;
}
}
if (svr_addr == HG_ADDR_NULL) {
ld_logger->error("{}() looked up address is NULL for address {} from host {}", __func__,
remote_addr, fs_config->hosts.at(fs_config->host_id));
}
rpc_address_cache.insert(hostid, svr_addr);
return true;
}
}
/**
* Determines the node id for a given path
* @param to_hash
* @return
*/
size_t get_rpc_node(const string& to_hash) {
return std::hash<string>{}(to_hash) % fs_config->host_size;
}
/**
* Determines if the recipient id in an RPC is refering to the local or an remote node
* @param recipient
* @return
*/
bool is_local_op(const size_t recipient) {
return recipient == fs_config->host_id;
}
inline hg_return
margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t recipient, hg_handle_t& handle,
hg_addr_t& svr_addr, bool force_rpc) {
if (is_local_op(recipient) && !force_rpc) { // local
ret = margo_create(ld_margo_ipc_id, daemon_svr_addr, ipc_id, &handle);
ld_logger->debug("{}() to local daemon (IPC)", __func__);
} else { // remote
// TODO HG_ADDR_T is never freed atm. Need to change LRUCache
if (!get_addr_by_hostid(recipient, svr_addr)) {
ld_logger->error("{}() server address not resolvable for host id {}", __func__, recipient);
return HG_OTHER_ERROR;
}
ret = margo_create(ld_margo_rpc_id, svr_addr, rpc_id, &handle);
ld_logger->debug("{}() to remote daemon (RPC)", __func__);
}
if (ret != HG_SUCCESS) {
ld_logger->error("{}() creating handle FAILED", __func__);
return HG_OTHER_ERROR;
}
return ret;
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
}
/**
* Wraps certain margo functions to create a Mercury handle
* @param ipc_id
* @param rpc_id
* @param path
* @param handle
* @param svr_addr
* @return
*/
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
hg_addr_t& svr_addr, bool force_rpc) {
return margo_create_wrap_helper(ipc_id, rpc_id, get_rpc_node(path), handle, svr_addr, force_rpc);
}
/**
* Wraps certain margo functions to create a Mercury handle
* @param ipc_id
* @param rpc_id
* @param recipient
* @param handle
* @param svr_addr
* @return
*/
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t& recipient, hg_handle_t& handle,
hg_addr_t& svr_addr, bool force_rpc) {
return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, svr_addr, force_rpc);