Newer
Older
#include <preload/adafs_functions.hpp>
#include <preload/rpc/ld_rpc_metadentry.hpp>
#include <preload/rpc/ld_rpc_data_ws.hpp>
using namespace std;
int adafs_open(const std::string& path, mode_t mode, int flags) {
auto err = 1;
auto fd = file_map.add(path, (flags & O_APPEND) != 0);
// TODO look up if file exists configurable
if (flags & O_CREAT)
err = rpc_send_open(path, mode, flags);
else
err = 0; //TODO default if no o_creat flag, assume file exists. This should be an rpc to see if file is there
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
if (err == 0)
return fd;
else {
file_map.remove(fd);
return -1;
}
}
// TODO combine adafs_stat and adafs_stat64
int adafs_stat(const std::string& path, struct stat* buf) {
string attr = ""s;
auto err = rpc_send_stat(path, attr);
db_val_to_stat(path, attr, *buf);
return err;
}
int adafs_stat64(const std::string& path, struct stat64* buf) {
string attr = ""s;
auto err = rpc_send_stat(path, attr);
db_val_to_stat64(path, attr, *buf);
return err;
}
ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {
auto adafs_fd = file_map.get(fd);
auto path = adafs_fd->path();
auto read_size = static_cast<size_t>(0);
auto err = 0;
// Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
auto chunk_n = static_cast<size_t>(ceil(
count / static_cast<float>(CHUNKSIZE))); // get number of chunks needed for writing
auto chnk_id_start = offset / CHUNKSIZE;
vector<unsigned long> dest_idx{}; // contains the recipient ids, used to access the dest_ids map
map<unsigned long, vector<unsigned long>> dest_ids{}; // contains the chnk ids (value list) per recipient (key)
for (unsigned long i = 0; i < chunk_n; i++) {
auto chnk_id = i + chnk_id_start;
auto recipient = get_rpc_node(path + fmt::FormatInt(chnk_id).str());
if (dest_ids.count(recipient) == 0) {
dest_ids.insert(make_pair(recipient, vector<unsigned long>{chnk_id}));
dest_idx.push_back(recipient);
} else
dest_ids[recipient].push_back(chnk_id);
}
// Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
ABT_xstream xstream;
ABT_pool pool;
auto ret = ABT_xstream_self(&xstream);
if (ret != 0) {
ld_logger->error("{}() Unable to get self xstream. Is Argobots initialized?", __func__);
return -1;
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if (ret != 0) {
ld_logger->error("{}() Unable to get main pools from ABT xstream", __func__);
return -1;
}
auto dest_n = dest_idx.size();
vector<ABT_thread> threads(dest_n);
vector<ABT_eventual> eventuals(dest_n);
vector<struct read_args*> thread_args(dest_n);
for (unsigned long i = 0; i < dest_n; i++) {
ABT_eventual_create(sizeof(size_t), &eventuals[i]);
struct read_args args = {
path, // path
count, // total size to read
0, // reading offset only for the first chunk
buf, // pointer to write buffer
dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
&eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
};
if (i == 0)
args.in_offset = offset % CHUNKSIZE;
thread_args[i] = &args;
ABT_thread_create(pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
}
for (unsigned long i = 0; i < dest_n; i++) {
size_t* thread_ret_size;
ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
if (thread_ret_size == nullptr || *thread_ret_size == 0) {
err = -1;
ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i);
} else
read_size += *thread_ret_size;
ABT_eventual_free(&eventuals[i]);
ret = ABT_thread_join(threads[i]);
if (ret != 0) {
ld_logger->error("{}() Unable to ABT_thread_join()", __func__);
err = -1;
}
ret = ABT_thread_free(&threads[i]);
if (ret != 0) {
ld_logger->error("{}() Unable to ABT_thread_free()", __func__);
err = -1;
}
}
// XXX check how much we need to deal with the read_size
return err == 0 ? read_size : 0;
}
ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
auto adafs_fd = file_map.get(fd);
auto path = adafs_fd->path();
auto append_flag = adafs_fd->append_flag();
int err = 0;
long updated_size = 0;
auto write_size = static_cast<size_t>(0);
err = rpc_send_update_metadentry_size(path, count, offset, append_flag, updated_size);
ld_logger->error("{}() update_metadentry_size failed with err {}", __func__, err);
if (append_flag)
offset = updated_size - count;
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
auto chnk_start = static_cast<size_t>(offset) / CHUNKSIZE; // first chunk number
auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end)
if ((offset + count) % CHUNKSIZE == 0)
chnk_end--;
// Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
map<unsigned long, vector<unsigned long>> dest_ids{};
// contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset
vector<unsigned long> dest_idx{};
for (auto i = chnk_start; i < chnk_end; i++) {
auto recipient = get_rpc_node(path + fmt::FormatInt(i).str());
if (dest_ids.count(recipient) == 0) {
dest_ids.insert(make_pair(recipient, vector<unsigned long>{i}));
dest_idx.push_back(recipient);
} else
dest_ids[recipient].push_back(i);
}
// Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids
ABT_xstream xstream;
ABT_pool pool;
auto ret = ABT_xstream_self(&xstream);
if (ret != 0) {
ld_logger->error("{}() Unable to get self xstream. Is Argobots initialized?", __func__);
return -1;
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if (ret != 0) {
ld_logger->error("{}() Unable to get main pools from ABT xstream", __func__);
return -1;
}
auto dest_n = dest_idx.size();
vector<ABT_thread> threads(dest_n);
vector<ABT_eventual> eventuals(dest_n);
vector<struct write_args*> thread_args(dest_n);
for (unsigned long i = 0; i < dest_n; i++) {
ABT_eventual_create(sizeof(size_t), &eventuals[i]);
auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE;
if (i == 0) // receiver of first chunk must subtract the offset from first chunk
total_chunk_size -= offset % CHUNKSIZE;
if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract
total_chunk_size -= CHUNKSIZE - ((offset + count) % CHUNKSIZE);
struct write_args args = {
path, // path
total_chunk_size, // total size to write
0, // writing offset only relevant for the first chunk that is written
buf, // pointer to write buffer
chnk_start, // append flag when file was opened
updated_size, // for append truncate TODO needed?
dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
&eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
};
if (i == 0) // first offset in dest_idx is the chunk with a potential offset
args.in_offset = offset % CHUNKSIZE;
thread_args[i] = &args;
ABT_thread_create(pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, &threads[i]);
}
for (unsigned long i = 0; i < dest_n; i++) {
size_t* thread_ret_size;
ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size);
if (thread_ret_size == nullptr || *thread_ret_size == 0) {
// TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted
ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i);
} else
write_size += *thread_ret_size;
ABT_eventual_free(&eventuals[i]);
ret = ABT_thread_join(threads[i]);
if (ret != 0) {
ld_logger->error("{}() Unable to ABT_thread_join()", __func__);
return -1;
}
ret = ABT_thread_free(&threads[i]);
if (ret != 0) {
ld_logger->error("{}() Unable to ABT_thread_free()", __func__);
return -1;
}
}
return write_size;
}