Newer
Older
#include <preload/adafs_functions.hpp>
#include <preload/rpc/ld_rpc_metadentry.hpp>
#include <preload/rpc/ld_rpc_data_ws.hpp>
#include <global/rpc/rpc_utils.hpp>
using namespace std;
int adafs_open(const std::string& path, mode_t mode, int flags) {
init_ld_env_if_needed();
auto fd = file_map.add(path, flags);
// TODO the open flags should not be in the map just set the pos accordingly
// TODO look up if file exists configurable
if (flags & O_CREAT)
// no access check required here. If one is using our FS they have the permissions.
err = rpc_send_mk_node(path, mode);
else {
auto mask = F_OK; // F_OK == 0
#if defined(CHECK_ACCESS_DURING_OPEN)
if ((mode & S_IRUSR) || (mode & S_IRGRP) || (mode & S_IROTH))
mask = mask & R_OK;
if ((mode & S_IWUSR) || (mode & S_IWGRP) || (mode & S_IWOTH))
mask = mask & W_OK;
if ((mode & S_IXUSR) || (mode & S_IXGRP) || (mode & S_IXOTH))
mask = mask & X_OK;
#endif
#if defined(DO_LOOKUP)
// check if file exists
err = rpc_send_access(path, mask);
#else
// file is assumed to be existing, even though it might not
err = 0;
#endif
}
if (err == 0)
return fd;
else {
file_map.remove(fd);
return -1;
}
}
int adafs_mk_node(const std::string& path, const mode_t mode) {
init_ld_env_if_needed();
return rpc_send_mk_node(path, mode);
}
int adafs_rm_node(const std::string& path) {
init_ld_env_if_needed();
return rpc_send_rm_node(path);
}
int adafs_access(const std::string& path, const int mask) {
init_ld_env_if_needed();
#if !defined(DO_LOOKUP)
// object is assumed to be existing, even though it might not
return 0;
#endif
#else
return rpc_send_access(path, F_OK); // Only check for file exists
#endif
// TODO combine adafs_stat and adafs_stat64
int adafs_stat(const string& path, struct stat* buf) {
init_ld_env_if_needed();
string attr = ""s;
auto err = rpc_send_stat(path, attr);
if (err == 0)
db_val_to_stat(path, attr, *buf);
int adafs_stat64(const string& path, struct stat64* buf) {
init_ld_env_if_needed();
string attr = ""s;
auto err = rpc_send_stat(path, attr);
if (err == 0)
db_val_to_stat64(path, attr, *buf);
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
int adafs_statfs(const string& path, struct statfs* adafs_buf, struct statfs& realfs_buf) {
// Check that file path exists
auto ret = rpc_send_access(path, F_OK);
// Valid fs error
if (ret > 0) {
errno = ret;
return -1;
}
// RPC error (errno has been set)
if (ret < 0)
return -1;
// fs object exists. Let's make up some fs values
adafs_buf->f_type = 0; // fs is not know to VFS. Therefore, no valid specifier possible
adafs_buf->f_bsize = static_cast<int>(CHUNKSIZE);
// some rough estimations
adafs_buf->f_blocks = realfs_buf.f_blocks * fs_config->host_size;
adafs_buf->f_bfree = realfs_buf.f_bfree * fs_config->host_size;
adafs_buf->f_bavail = realfs_buf.f_bavail * fs_config->host_size;
adafs_buf->f_files = realfs_buf.f_files * fs_config->host_size;
adafs_buf->f_ffree = realfs_buf.f_ffree * fs_config->host_size;
adafs_buf->f_fsid = realfs_buf.f_fsid; // "Nobody knows what f_fsid is supposed to contain"
adafs_buf->f_namelen = realfs_buf.f_namelen;
adafs_buf->f_frsize = realfs_buf.f_frsize;
adafs_buf->f_spare[0] = 0;
adafs_buf->f_spare[1] = 0;
adafs_buf->f_spare[2] = 0;
adafs_buf->f_spare[3] = 0;
adafs_buf->f_flags = ST_NOATIME | ST_NOSUID | ST_NODEV | ST_SYNCHRONOUS;
if (!fs_config->atime_state)
adafs_buf->f_flags = adafs_buf->f_flags | ST_NOATIME | ST_NODIRATIME;
return 0;
}
off64_t adafs_lseek(int fd, off64_t offset, int whence) {
init_ld_env_if_needed();
return adafs_lseek(file_map.get(fd), offset, whence);
}
off64_t adafs_lseek(shared_ptr<OpenFile> adafs_fd, off64_t offset, int whence) {
init_ld_env_if_needed();
switch (whence) {
case SEEK_SET:
adafs_fd->pos(offset);
break;
case SEEK_CUR:
adafs_fd->pos(adafs_fd->pos() + offset);
break;
case SEEK_END: {
auto err = rpc_send_get_metadentry_size(adafs_fd->path(), file_size);
if (err < 0) {
errno = err; // Negative numbers are explicitly for error codes
return -1;
}
adafs_fd->pos(file_size + offset);
break;
}
case SEEK_DATA:
// We do not support this whence yet
errno = EINVAL;
return -1;
case SEEK_HOLE:
// We do not support this whence yet
errno = EINVAL;
return -1;
default:
errno = EINVAL;
return -1;
}
return adafs_fd->pos();
}
int adafs_dup(const int oldfd) {
return file_map.dup(oldfd);
}
int adafs_dup2(const int oldfd, const int newfd) {
return file_map.dup2(oldfd, newfd);
}
ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
init_ld_env_if_needed();
auto adafs_fd = file_map.get(fd);
auto path = make_shared<string>(adafs_fd->path());
auto append_flag = adafs_fd->get_flag(OpenFile_flags::append);
int err = 0;
long updated_size = 0;
auto write_size = static_cast<size_t>(0);
err = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size);
ld_logger->error("{}() update_metadentry_size failed with err {}", __func__, err);
if (append_flag)
offset = updated_size - count;
auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
if ((offset + count) % CHUNKSIZE == 0)
chnk_end--;
// Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
// contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
for (uint64_t i = chnk_start; i < chnk_end; i++) {
auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size);
if (dest_ids.count(recipient) == 0) {
dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
dest_idx.push_back(recipient);
} else
dest_ids[recipient].push_back(i);
}
// Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
vector<ABT_eventual> eventuals(dest_n);
vector<unique_ptr<struct write_args>> thread_args(dest_n);
ABT_eventual_create(sizeof(size_t), &eventuals[i]);
auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
if (i == 0) // receiver of first chunk must subtract the offset from first chunk
if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE));
auto args = make_unique<write_args>();
// DEL BEGIN
string ids = ""s;
for (auto&& id : dest_ids[dest_idx[i]]) {
ids += fmt::FormatInt(id).str() + " "s;
}
ld_logger->info(
"{}() destination {} chnk_offset {} size {} total_chnksize {} div {} mod {} chnkids\n{}",
__func__, dest_idx[i], offset % CHUNKSIZE, count, total_chunk_size, total_chunk_size / CHUNKSIZE,
total_chunk_size % CHUNKSIZE, ids);
// DEL END
args->path = path; // path
args->total_chunk_size = total_chunk_size; // total size to write
args->in_size = count;
args->in_offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
args->buf = buf;// pointer to write buffer
args->chnk_start = chnk_start;// append flag when file was opened
args->chnk_end = chnk_end;// append flag when file was opened
args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination
args->recipient = dest_idx[i];// recipient
args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
thread_args[i] = std::move(args);
ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
// Sum written sizes
size_t* thread_ret_size;
ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
if (thread_ret_size == nullptr || *thread_ret_size == 0) {
// TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted
ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i);
} else
write_size += *thread_ret_size;
ABT_eventual_free(&eventuals[i]);
}
return write_size;
}
ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
init_ld_env_if_needed();
auto adafs_fd = file_map.get(fd);
auto path = make_shared<string>(adafs_fd->path());
auto read_size = static_cast<size_t>(0);
auto err = 0;
// Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number
auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
if ((offset + count) % CHUNKSIZE == 0)
chnk_end--;
// Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
map<uint64_t, vector<uint64_t>> dest_ids{};
// contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
vector<uint64_t> dest_idx{};
for (uint64_t i = chnk_start; i < chnk_end; i++) {
auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size);
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
if (dest_ids.count(recipient) == 0) {
dest_ids.insert(make_pair(recipient, vector<uint64_t>{i}));
dest_idx.push_back(recipient);
} else
dest_ids[recipient].push_back(i);
}
// Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
auto dest_n = dest_idx.size();
vector<ABT_eventual> eventuals(dest_n);
vector<unique_ptr<struct read_args>> thread_args(dest_n);
for (uint64_t i = 0; i < dest_n; i++) {
ABT_eventual_create(sizeof(size_t), &eventuals[i]);
auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
if (i == 0) // receiver of first chunk must subtract the offset from first chunk
total_chunk_size -= (offset % CHUNKSIZE);
if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE));
auto args = make_unique<read_args>();
args->path = path;
args->total_chunk_size = total_chunk_size;
args->in_size = count;// total size to read
args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk
args->buf = buf;
args->chnk_start = chnk_start;
args->chnk_end = chnk_end;
args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination
args->recipient = dest_idx[i];// recipient
args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
thread_args[i] = std::move(args);
// Threads are implicitly released once calling function finishes
ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr);
}
// Sum read sizes
for (uint64_t i = 0; i < dest_n; i++) {
size_t* thread_ret_size;
ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
if (thread_ret_size == nullptr || *thread_ret_size == 0) {
err = -1;
ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i);
} else
read_size += *thread_ret_size;
ABT_eventual_free(&eventuals[i]);
}
// XXX check how much we need to deal with the read_size
// XXX check that we don't try to read past end of the file
return err == 0 ? read_size : 0;