Newer
Older
#include <preload/adafs_functions.hpp>
#include <preload/rpc/ld_rpc_metadentry.hpp>
#include <preload/rpc/ld_rpc_data_ws.hpp>
using namespace std;
int adafs_open(const std::string& path, mode_t mode, int flags) {
init_ld_env_if_needed();
auto fd = file_map.add(path, flags);
// TODO the open flags should not be in the map just set the pos accordingly
// TODO look up if file exists configurable
if (flags & O_CREAT)
// no access check required here. If one is using our FS they have the permissions.
err = rpc_send_mk_node(path, mode);
else {
auto mask = F_OK; // F_OK == 0
#if defined(CHECK_ACCESS_DURING_OPEN)
if ((mode & S_IRUSR) || (mode & S_IRGRP) || (mode & S_IROTH))
mask = mask & R_OK;
if ((mode & S_IWUSR) || (mode & S_IWGRP) || (mode & S_IWOTH))
mask = mask & W_OK;
if ((mode & S_IXUSR) || (mode & S_IXGRP) || (mode & S_IXOTH))
mask = mask & X_OK;
#endif
#if defined(DO_LOOKUP)
// check if file exists
err = rpc_send_access(path, mask);
#else
// file is assumed to be existing, even though it might not
err = 0;
#endif
}
if (err == 0)
return fd;
else {
file_map.remove(fd);
return -1;
}
}
int adafs_mk_node(const std::string& path, const mode_t mode) {
init_ld_env_if_needed();
return rpc_send_mk_node(path, mode);
}
int adafs_rm_node(const std::string& path) {
init_ld_env_if_needed();
return rpc_send_rm_node(path);
}
int adafs_access(const std::string& path, const int mask) {
init_ld_env_if_needed();
#if !defined(DO_LOOKUP)
// object is assumed to be existing, even though it might not
return 0;
#endif
#else
return rpc_send_access(path, F_OK); // Only check for file exists
#endif
// TODO combine adafs_stat and adafs_stat64
int adafs_stat(const string& path, struct stat* buf) {
init_ld_env_if_needed();
string attr = ""s;
auto err = rpc_send_stat(path, attr);
if (err == 0)
db_val_to_stat(path, attr, *buf);
int adafs_stat64(const string& path, struct stat64* buf) {
init_ld_env_if_needed();
string attr = ""s;
auto err = rpc_send_stat(path, attr);
if (err == 0)
db_val_to_stat64(path, attr, *buf);
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
int adafs_statfs(const string& path, struct statfs* adafs_buf, struct statfs& realfs_buf) {
// Check that file path exists
auto ret = rpc_send_access(path, F_OK);
// Valid fs error
if (ret > 0) {
errno = ret;
return -1;
}
// RPC error (errno has been set)
if (ret < 0)
return -1;
// fs object exists. Let's make up some fs values
adafs_buf->f_type = 0; // fs is not know to VFS. Therefore, no valid specifier possible
adafs_buf->f_bsize = static_cast<int>(CHUNKSIZE);
// some rough estimations
adafs_buf->f_blocks = realfs_buf.f_blocks * fs_config->host_size;
adafs_buf->f_bfree = realfs_buf.f_bfree * fs_config->host_size;
adafs_buf->f_bavail = realfs_buf.f_bavail * fs_config->host_size;
adafs_buf->f_files = realfs_buf.f_files * fs_config->host_size;
adafs_buf->f_ffree = realfs_buf.f_ffree * fs_config->host_size;
adafs_buf->f_fsid = realfs_buf.f_fsid; // "Nobody knows what f_fsid is supposed to contain"
adafs_buf->f_namelen = realfs_buf.f_namelen;
adafs_buf->f_frsize = realfs_buf.f_frsize;
adafs_buf->f_spare[0] = 0;
adafs_buf->f_spare[1] = 0;
adafs_buf->f_spare[2] = 0;
adafs_buf->f_spare[3] = 0;
adafs_buf->f_flags = ST_NOATIME | ST_NOSUID | ST_NODEV | ST_SYNCHRONOUS;
if (!fs_config->atime_state)
adafs_buf->f_flags = adafs_buf->f_flags | ST_NOATIME | ST_NODIRATIME;
return 0;
}
off64_t adafs_lseek(int fd, off64_t offset, int whence) {
init_ld_env_if_needed();
return adafs_lseek(file_map.get(fd), offset, whence);
}
off64_t adafs_lseek(shared_ptr<OpenFile> adafs_fd, off64_t offset, int whence) {
init_ld_env_if_needed();
switch (whence) {
case SEEK_SET:
adafs_fd->pos(offset);
break;
case SEEK_CUR:
adafs_fd->pos(adafs_fd->pos() + offset);
break;
case SEEK_END: {
auto err = rpc_send_get_metadentry_size(adafs_fd->path(), file_size);
if (err < 0) {
errno = err; // Negative numbers are explicitly for error codes
return -1;
}
adafs_fd->pos(file_size + offset);
break;
}
case SEEK_DATA:
// We do not support this whence yet
errno = EINVAL;
return -1;
case SEEK_HOLE:
// We do not support this whence yet
errno = EINVAL;
return -1;
default:
errno = EINVAL;
return -1;
}
return adafs_fd->pos();
}
int adafs_dup(const int oldfd) {
return file_map.dup(oldfd);
}
int adafs_dup2(const int oldfd, const int newfd) {
return file_map.dup2(oldfd, newfd);
}
ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
init_ld_env_if_needed();
auto adafs_fd = file_map.get(fd);
auto path = make_shared<string>(adafs_fd->path());
auto read_size = static_cast<size_t>(0);
auto err = 0;
// Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
Marc Vef
committed
auto chnk_start = static_cast<size_t>(offset) / CHUNKSIZE; // first chunk number
auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
if ((offset + count) % CHUNKSIZE == 0)
chnk_end--;
vector<unsigned long> dest_idx{}; // contains the recipient ids, used to access the dest_ids map
map<unsigned long, vector<unsigned long>> dest_ids{}; // contains the chnk ids (value list) per recipient (key)
Marc Vef
committed
for (unsigned long i = chnk_start; i < chnk_end; i++) {
auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
if (dest_ids.count(recipient) == 0) {
Marc Vef
committed
dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
dest_idx.push_back(recipient);
} else
Marc Vef
committed
dest_ids[recipient].push_back(i);
auto dest_n = dest_idx.size();
vector<ABT_thread> threads(dest_n);
vector<ABT_eventual> eventuals(dest_n);
vector<unique_ptr<struct read_args>> thread_args(dest_n);
for (unsigned long i = 0; i < dest_n; i++) {
ABT_eventual_create(sizeof(size_t), &eventuals[i]);
Marc Vef
committed
auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
if (i == 0) // receiver of first chunk must subtract the offset from first chunk
total_chunk_size -= offset % CHUNKSIZE;
if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
auto args = make_unique<read_args>();
args->path = path;
Marc Vef
committed
args->in_size = total_chunk_size;// total size to read
args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk
args->buf = buf;
args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination
Marc Vef
committed
args->chnk_start = chnk_start;
args->recipient = dest_idx[i];// recipient
args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
thread_args[i] = std::move(args);
ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
}
for (unsigned long i = 0; i < dest_n; i++) {
size_t* thread_ret_size;
ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
if (thread_ret_size == nullptr || *thread_ret_size == 0) {
err = -1;
ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i);
} else
read_size += *thread_ret_size;
ABT_eventual_free(&eventuals[i]);
auto ret = ABT_thread_join(threads[i]);
if (ret != 0) {
ld_logger->error("{}() Unable to ABT_thread_join()", __func__);
err = -1;
}
ret = ABT_thread_free(&threads[i]);
if (ret != 0) {
ld_logger->warn("{}() Unable to ABT_thread_free()", __func__);
}
}
// XXX check how much we need to deal with the read_size
// XXX check that we don't try to read past end of the file
return err == 0 ? read_size : 0;
}
ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
init_ld_env_if_needed();
auto adafs_fd = file_map.get(fd);
auto path = make_shared<string>(adafs_fd->path());
auto append_flag = adafs_fd->get_flag(OpenFile_flags::append);
int err = 0;
long updated_size = 0;
auto write_size = static_cast<size_t>(0);
err = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size);
ld_logger->error("{}() update_metadentry_size failed with err {}", __func__, err);
if (append_flag)
offset = updated_size - count;
auto chnk_start = static_cast<size_t>(offset) / CHUNKSIZE; // first chunk number
auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
if ((offset + count) % CHUNKSIZE == 0)
chnk_end--;
// Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
map<unsigned long, vector<unsigned long>> dest_ids{};
// contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
vector<unsigned long> dest_idx{};
for (auto i = chnk_start; i < chnk_end; i++) {
auto recipient = get_rpc_node(*path + fmt::FormatInt(i).str());
if (dest_ids.count(recipient) == 0) {
dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
dest_idx.push_back(recipient);
} else
dest_ids[recipient].push_back(i);
}
// Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
auto dest_n = dest_idx.size();
vector<ABT_thread> threads(dest_n);
vector<ABT_eventual> eventuals(dest_n);
vector<unique_ptr<struct write_args>> thread_args(dest_n);
for (unsigned long i = 0; i < dest_n; i++) {
ABT_eventual_create(sizeof(size_t), &eventuals[i]);
auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
if (i == 0) // receiver of first chunk must subtract the offset from first chunk
total_chunk_size -= offset % CHUNKSIZE;
if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
auto args = make_unique<write_args>();
args->path = path; // path
args->in_size = total_chunk_size; // total size to write
args->in_offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
args->buf = buf;// pointer to write buffer
args->chnk_start = chnk_start;// append flag when file was opened
args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination
args->recipient = dest_idx[i];// recipient
args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
thread_args[i] = std::move(args);
ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
// Sum written sizes
for (unsigned long i = 0; i < dest_n; i++) {
size_t* thread_ret_size;
ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
if (thread_ret_size == nullptr || *thread_ret_size == 0) {
// TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted
ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i);
} else
write_size += *thread_ret_size;
ABT_eventual_free(&eventuals[i]);
auto ret = ABT_thread_join(threads[i]);
if (ret != 0) {
ld_logger->error("{}() Unable to ABT_thread_join()", __func__);
return -1;
}
ret = ABT_thread_free(&threads[i]);
if (ret != 0) {
ld_logger->warn("{}() Unable to ABT_thread_free()", __func__);
}
}
return write_size;
}