Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS.
12 :
13 : GekkoFS is free software: you can redistribute it and/or modify
14 : it under the terms of the GNU General Public License as published by
15 : the Free Software Foundation, either version 3 of the License, or
16 : (at your option) any later version.
17 :
18 : GekkoFS is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU General Public License for more details.
22 :
23 : You should have received a copy of the GNU General Public License
24 : along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
25 :
26 : SPDX-License-Identifier: GPL-3.0-or-later
27 : */
28 : /**
29 : * @brief Provides all Margo RPC handler definitions called by Mercury on client
30 : * request for all file system data operations.
31 : * @internal
32 : * The end of the file defines the associates the Margo RPC handler functions
33 : * and associates them with their corresponding GekkoFS handler functions.
34 : * @endinternal
35 : */
36 : #include <daemon/daemon.hpp>
37 : #include <daemon/handler/rpc_defs.hpp>
38 : #include <daemon/handler/rpc_util.hpp>
39 : #include <daemon/backend/data/chunk_storage.hpp>
40 : #include <daemon/ops/data.hpp>
41 :
42 : #include <common/rpc/rpc_types.hpp>
43 : #include <common/rpc/rpc_util.hpp>
44 : #include <common/rpc/distributor.hpp>
45 : #include <common/arithmetic/arithmetic.hpp>
46 : #include <common/statistics/stats.hpp>
47 :
48 : #ifdef GKFS_ENABLE_AGIOS
49 : #include <daemon/scheduler/agios.hpp>
50 :
51 : #define AGIOS_READ 0
52 : #define AGIOS_WRITE 1
53 : #define AGIOS_SERVER_ID_IGNORE 0
54 : #endif
55 : using namespace std;
56 :
57 :
58 : namespace {
59 :
60 : /**
61 : * @brief Serves a write request transferring the chunks associated with this
62 : * daemon and store them on the node-local FS.
63 : * @internal
64 : * The write operation has multiple steps:
65 : * 1. Setting up all RPC related information
66 : * 2. Allocating space for bulk transfer buffers
67 : * 3. By processing the RPC input, the chunk IDs that are hashing to this daemon
68 : * are computed based on a client-defined interval (start and endchunk id for
69 : * this write operation). The client does _not_ provide the daemons with a list
70 : * of chunk IDs because it is dynamic data that cannot be part of an RPC input
71 : * struct. Therefore, this information would need to be pulled with a bulk
72 : * transfer as well, adding unnecessary latency to the overall write operation.
73 : *
74 : * For each relevant chunk, a PULL bulk transfer is issued. Once finished, a
75 : * non-blocking Argobots tasklet is launched to write the data chunk to the
76 : * backend storage. Therefore, bulk transfer and the backend I/O operation are
77 : * overlapping for efficiency.
78 : * 4. Wait for all tasklets to complete adding up all the complete written data
79 : * size as reported by each task.
80 : * 5. Respond to client (when all backend write operations are finished) and
81 : * cleanup RPC resources. Any error is reported in the RPC output struct. Note,
82 : * that backend write operations are not canceled while in-flight when a task
83 : * encounters an error.
84 : *
85 : * Note, refer to the data backend documentation w.r.t. how Argobots tasklets
86 : * work and why they are used.
87 : *
88 : * All exceptions must be caught here and dealt with accordingly.
89 : * @endinteral
90 : * @param handle Mercury RPC handle
91 : * @return Mercury error code to Mercury
92 : */
93 : hg_return_t
94 41 : rpc_srv_write(hg_handle_t handle) {
95 : /*
96 : * 1. Setup
97 : */
98 41 : rpc_write_data_in_t in{};
99 41 : rpc_data_out_t out{};
100 41 : hg_bulk_t bulk_handle = nullptr;
101 : // default out for error
102 41 : out.err = EIO;
103 41 : out.io_size = 0;
104 : // Getting some information from margo
105 41 : auto ret = margo_get_input(handle, &in);
106 41 : if(ret != HG_SUCCESS) {
107 0 : GKFS_DATA->spdlogger()->error(
108 0 : "{}() Could not get RPC input data with err {}", __func__, ret);
109 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
110 : }
111 41 : auto hgi = margo_get_info(handle);
112 41 : auto mid = margo_hg_info_get_instance(hgi);
113 41 : auto bulk_size = margo_bulk_get_size(in.bulk_handle);
114 41 : GKFS_DATA->spdlogger()->debug(
115 : "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
116 : __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
117 41 : in.total_chunk_size, bulk_size, in.offset);
118 :
119 41 : std::vector<uint8_t> write_ops_vect =
120 123 : gkfs::rpc::decompress_bitset(in.wbitset);
121 :
122 : #ifdef GKFS_ENABLE_AGIOS
123 : int* data;
124 : ABT_eventual eventual = ABT_EVENTUAL_NULL;
125 :
126 : /* creating eventual */
127 : ABT_eventual_create(sizeof(int64_t), &eventual);
128 :
129 : unsigned long long int request_id = generate_unique_id();
130 : char* agios_path = (char*) in.path;
131 :
132 : // We should call AGIOS before chunking (as that is an internal way to
133 : // handle the requests)
134 : if(!agios_add_request(agios_path, AGIOS_WRITE, in.offset,
135 : in.total_chunk_size, request_id,
136 : AGIOS_SERVER_ID_IGNORE, agios_eventual_callback,
137 : eventual)) {
138 : GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS",
139 : __func__);
140 : } else {
141 : GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS",
142 : __func__, request_id);
143 : }
144 :
145 : /* Block until the eventual is signaled */
146 : ABT_eventual_wait(eventual, (void**) &data);
147 :
148 : unsigned long long int result = *data;
149 : GKFS_DATA->spdlogger()->debug(
150 : "{}() request {} was unblocked (offset = {})!", __func__, result,
151 : in.offset);
152 :
153 : ABT_eventual_free(&eventual);
154 :
155 : // Let AGIOS knows it can release the request, as it is completed
156 : if(!agios_release_request(agios_path, AGIOS_WRITE, in.total_chunk_size,
157 : in.offset)) {
158 : GKFS_DATA->spdlogger()->error(
159 : "{}() Failed to release request from AGIOS", __func__);
160 : }
161 : #endif
162 :
163 : /*
164 : * 2. Set up buffers for pull bulk transfers
165 : */
166 41 : void* bulk_buf; // buffer for bulk transfer
167 82 : vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
168 : // create bulk handle and allocated memory for buffer with buf_sizes
169 : // information
170 41 : ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size,
171 : HG_BULK_READWRITE, &bulk_handle);
172 41 : if(ret != HG_SUCCESS) {
173 0 : GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
174 0 : __func__);
175 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out,
176 : static_cast<hg_bulk_t*>(nullptr));
177 : }
178 : // access the internally allocated memory buffer and put it into buf_ptrs
179 41 : uint32_t actual_count;
180 41 : ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size,
181 : HG_BULK_READWRITE, 1, &bulk_buf,
182 : &in.total_chunk_size, &actual_count);
183 41 : if(ret != HG_SUCCESS || actual_count != 1) {
184 0 : GKFS_DATA->spdlogger()->error(
185 : "{}() Failed to access allocated buffer from bulk handle",
186 0 : __func__);
187 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
188 : }
189 41 : auto const host_id = in.host_id;
190 41 : [[maybe_unused]] auto const host_size = in.host_size;
191 :
192 82 : auto path = make_shared<string>(in.path);
193 : // chnk_ids used by this host
194 82 : vector<uint64_t> chnk_ids_host(in.chunk_n);
195 : // counter to track how many chunks have been assigned
196 41 : auto chnk_id_curr = static_cast<uint64_t>(0);
197 : // chnk sizes per chunk for this host
198 82 : vector<uint64_t> chnk_sizes(in.chunk_n);
199 : // how much size is left to assign chunks for writing
200 41 : auto chnk_size_left_host = in.total_chunk_size;
201 : // temporary traveling pointer
202 41 : auto chnk_ptr = static_cast<char*>(bulk_buf);
203 : /*
204 : * consider the following cases:
205 : * 1. Very first chunk has offset or not and is serviced by this node
206 : * 2. If offset, will still be only 1 chunk written (small IO): (offset +
207 : * bulk_size <= CHUNKSIZE) ? bulk_size
208 : * 3. If no offset, will only be 1 chunk written (small IO): (bulk_size <=
209 : * CHUNKSIZE) ? bulk_size
210 : * 4. Chunks between start and end chunk have size of the CHUNKSIZE
211 : * 5. Last chunk (if multiple chunks are written): Don't write CHUNKSIZE but
212 : * chnk_size_left for this destination Last chunk can also happen if only
213 : * one chunk is written. This is covered by 2 and 3.
214 : */
215 : // temporary variables
216 41 : auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize)
217 41 : ? bulk_size
218 : : gkfs::config::rpc::chunksize;
219 41 : uint64_t origin_offset;
220 41 : uint64_t local_offset;
221 : // object for asynchronous disk IO
222 123 : gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n};
223 :
224 : /*
225 : * 3. Calculate chunk sizes that correspond to this host, transfer data, and
226 : * start tasks to write to disk
227 : */
228 : // Start to look for a chunk that hashes to this host with the first chunk
229 : // in the buffer
230 141 : for(auto chnk_id_file = in.chunk_start;
231 141 : chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
232 : chnk_id_file++) {
233 : // Continue if chunk does not hash to this host
234 :
235 100 : if(!(gkfs::rpc::get_bitset(write_ops_vect,
236 100 : chnk_id_file - in.chunk_start))) {
237 0 : GKFS_DATA->spdlogger()->trace(
238 : "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
239 0 : __func__, chnk_id_file, host_id, chnk_id_curr);
240 0 : continue;
241 : }
242 :
243 100 : if(GKFS_DATA->enable_chunkstats()) {
244 200 : GKFS_DATA->stats()->add_write(in.path, chnk_id_file);
245 : }
246 :
247 100 : GKFS_DATA->spdlogger()->error("{}() Processing at host {} -> {}",
248 100 : __func__, host_id, chnk_id_file);
249 100 : chnk_ids_host[chnk_id_curr] =
250 : chnk_id_file; // save this id to host chunk list
251 : // offset case. Only relevant in the first iteration of the loop and if
252 : // the chunk hashes to this host
253 100 : if(chnk_id_file == in.chunk_start && in.offset > 0) {
254 : // if only 1 destination and 1 chunk (small write) the transfer_size
255 : // == bulk_size
256 11 : size_t offset_transfer_size = 0;
257 11 : if(in.offset + bulk_size <= gkfs::config::rpc::chunksize)
258 : offset_transfer_size = bulk_size;
259 : else
260 0 : offset_transfer_size = static_cast<size_t>(
261 0 : gkfs::config::rpc::chunksize - in.offset);
262 11 : ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr,
263 : in.bulk_handle, 0, bulk_handle, 0,
264 : offset_transfer_size);
265 11 : if(ret != HG_SUCCESS) {
266 0 : GKFS_DATA->spdlogger()->error(
267 : "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}",
268 : __func__, chnk_id_file, in.chunk_start,
269 0 : in.chunk_end - 1);
270 0 : out.err = EBUSY;
271 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out,
272 0 : &bulk_handle);
273 : }
274 11 : bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
275 11 : chnk_sizes[chnk_id_curr] = offset_transfer_size;
276 11 : chnk_ptr += offset_transfer_size;
277 11 : chnk_size_left_host -= offset_transfer_size;
278 : } else {
279 89 : local_offset = in.total_chunk_size - chnk_size_left_host;
280 : // origin offset of a chunk is dependent on a given offset in a
281 : // write operation
282 89 : if(in.offset > 0)
283 0 : origin_offset = (gkfs::config::rpc::chunksize - in.offset) +
284 0 : ((chnk_id_file - in.chunk_start) - 1) *
285 : gkfs::config::rpc::chunksize;
286 : else
287 89 : origin_offset = (chnk_id_file - in.chunk_start) *
288 : gkfs::config::rpc::chunksize;
289 : // last chunk might have different transfer_size
290 89 : if(chnk_id_curr == in.chunk_n - 1)
291 30 : transfer_size = chnk_size_left_host;
292 89 : GKFS_DATA->spdlogger()->trace(
293 : "{}() BULK_TRANSFER_PULL hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}",
294 : __func__, host_id, in.path, chnk_id_file,
295 : in.total_chunk_size, chnk_size_left_host, origin_offset,
296 89 : local_offset, transfer_size);
297 : // RDMA the data to here
298 89 : ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr,
299 : in.bulk_handle, origin_offset,
300 : bulk_handle, local_offset, transfer_size);
301 89 : if(ret != HG_SUCCESS) {
302 0 : GKFS_DATA->spdlogger()->error(
303 : "{}() Failed to pull data from client. file {} chunk {} (startchunk {}; endchunk {})",
304 : __func__, in.path, chnk_id_file, in.chunk_start,
305 0 : (in.chunk_end - 1));
306 0 : out.err = EBUSY;
307 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out,
308 : &bulk_handle);
309 : }
310 89 : bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
311 89 : chnk_sizes[chnk_id_curr] = transfer_size;
312 89 : chnk_ptr += transfer_size;
313 89 : chnk_size_left_host -= transfer_size;
314 : }
315 100 : try {
316 : // start tasklet for writing chunk
317 41 : chunk_op.write_nonblock(
318 100 : chnk_id_curr, chnk_ids_host[chnk_id_curr],
319 100 : bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr],
320 100 : (chnk_id_file == in.chunk_start) ? in.offset : 0);
321 0 : } catch(const gkfs::data::ChunkWriteOpException& e) {
322 : // This exception is caused by setup of Argobots variables. If this
323 : // fails, something is really wrong
324 0 : GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'",
325 0 : __func__, e.what());
326 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
327 : }
328 : // next chunk
329 100 : chnk_id_curr++;
330 : }
331 : // Sanity check that all chunks where detected in previous loop
332 : // TODO don't proceed if that happens.
333 41 : if(chnk_size_left_host != 0)
334 0 : GKFS_DATA->spdlogger()->warn(
335 : "{}() Not all chunks were detected!!! Size left {}", __func__,
336 0 : chnk_size_left_host);
337 : /*
338 : * 4. Read task results and accumulate in out.io_size
339 : */
340 41 : auto write_result = chunk_op.wait_for_tasks();
341 41 : out.err = write_result.first;
342 41 : out.io_size = write_result.second;
343 :
344 : // Sanity check to see if all data has been written
345 41 : if(in.total_chunk_size != out.io_size) {
346 0 : GKFS_DATA->spdlogger()->warn(
347 : "{}() total chunk size {} and out.io_size {} mismatch!",
348 0 : __func__, in.total_chunk_size, out.io_size);
349 : }
350 :
351 : /*
352 : * 5. Respond and cleanup
353 : */
354 41 : GKFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__,
355 41 : out.err);
356 41 : auto handler_ret =
357 41 : gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
358 41 : if(GKFS_DATA->enable_stats()) {
359 41 : GKFS_DATA->stats()->add_value_size(
360 : gkfs::utils::Stats::SizeOp::write_size, bulk_size);
361 : }
362 : return handler_ret;
363 : }
364 :
365 : /**
366 : * @brief Serves a read request reading the chunks associated with this
367 : * daemon from the node-local FS and transferring them back to the client.
368 : * @internal
369 : * The read operation has multiple steps:
370 : * 1. Setting up all RPC related information
371 : * 2. Allocating space for bulk transfer buffers
372 : * 3. By processing the RPC input, the chunk IDs that are hashing to this daemon
373 : * are computed based on a client-defined interval (start and endchunk id for
374 : * this read operation). The client does _not_ provide the daemons with a list
375 : * of chunk IDs because it is dynamic data that cannot be part of an RPC input
376 : * struct. Therefore, this information would need to be pulled with a bulk
377 : * transfer as well, adding unnecessary latency to the overall write operation.
378 : *
379 : * For each relevant chunk, a non-blocking Arbobots tasklet is launched to read
380 : * the data chunk from the backend storage to the allocated buffers.
381 : * 4. Wait for all tasklets to finish the read operation while PUSH bulk
382 : * transferring each chunk back to the client when a tasklet finishes.
383 : * Therefore, bulk transfer and the backend I/O operation are overlapping for
384 : * efficiency. The read size is added up for all tasklets.
385 : * 5. Respond to client (when all bulk transfers are finished) and cleanup RPC
386 : * resources. Any error is reported in the RPC output struct. Note, that backend
387 : * read operations are not canceled while in-flight when a task encounters an
388 : * error.
389 : *
390 : * Note, refer to the data backend documentation w.r.t. how Argobots tasklets
391 : * work and why they are used.
392 : *
393 : * All exceptions must be caught here and dealt with accordingly.
394 : * @endinteral
395 : * @param handle Mercury RPC handle
396 : * @return Mercury error code to Mercury
397 : */
398 : hg_return_t
399 28 : rpc_srv_read(hg_handle_t handle) {
400 : /*
401 : * 1. Setup
402 : */
403 28 : rpc_read_data_in_t in{};
404 28 : rpc_data_out_t out{};
405 28 : hg_bulk_t bulk_handle = nullptr;
406 : // Set default out for error
407 28 : out.err = EIO;
408 28 : out.io_size = 0;
409 : // Getting some information from margo
410 28 : auto ret = margo_get_input(handle, &in);
411 28 : if(ret != HG_SUCCESS) {
412 0 : GKFS_DATA->spdlogger()->error(
413 0 : "{}() Could not get RPC input data with err {}", __func__, ret);
414 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
415 : }
416 28 : auto hgi = margo_get_info(handle);
417 28 : auto mid = margo_hg_info_get_instance(hgi);
418 28 : auto bulk_size = margo_bulk_get_size(in.bulk_handle);
419 :
420 28 : GKFS_DATA->spdlogger()->debug(
421 : "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
422 : __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
423 28 : in.total_chunk_size, bulk_size, in.offset);
424 28 : std::vector<uint8_t> read_bitset_vect =
425 84 : gkfs::rpc::decompress_bitset(in.wbitset);
426 : #ifdef GKFS_ENABLE_AGIOS
427 : int* data;
428 : ABT_eventual eventual = ABT_EVENTUAL_NULL;
429 :
430 : /* creating eventual */
431 : ABT_eventual_create(sizeof(int64_t), &eventual);
432 :
433 : unsigned long long int request_id = generate_unique_id();
434 : char* agios_path = (char*) in.path;
435 :
436 : // We should call AGIOS before chunking (as that is an internal way to
437 : // handle the requests)
438 : if(!agios_add_request(agios_path, AGIOS_READ, in.offset,
439 : in.total_chunk_size, request_id,
440 : AGIOS_SERVER_ID_IGNORE, agios_eventual_callback,
441 : eventual)) {
442 : GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS",
443 : __func__);
444 : } else {
445 : GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS",
446 : __func__, request_id);
447 : }
448 :
449 : /* block until the eventual is signaled */
450 : ABT_eventual_wait(eventual, (void**) &data);
451 :
452 : unsigned long long int result = *data;
453 : GKFS_DATA->spdlogger()->debug(
454 : "{}() request {} was unblocked (offset = {})!", __func__, result,
455 : in.offset);
456 :
457 : ABT_eventual_free(&eventual);
458 :
459 : // let AGIOS knows it can release the request, as it is completed
460 : if(!agios_release_request(agios_path, AGIOS_READ, in.total_chunk_size,
461 : in.offset)) {
462 : GKFS_DATA->spdlogger()->error(
463 : "{}() Failed to release request from AGIOS", __func__);
464 : }
465 : #endif
466 :
467 : /*
468 : * 2. Set up buffers for push bulk transfers
469 : */
470 28 : void* bulk_buf; // buffer for bulk transfer
471 56 : vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
472 : // create bulk handle and allocated memory for buffer with buf_sizes
473 : // information
474 28 : ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size,
475 : HG_BULK_READWRITE, &bulk_handle);
476 28 : if(ret != HG_SUCCESS) {
477 0 : GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
478 0 : __func__);
479 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out,
480 : static_cast<hg_bulk_t*>(nullptr));
481 : }
482 : // access the internally allocated memory buffer and put it into buf_ptrs
483 28 : uint32_t actual_count;
484 28 : ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size,
485 : HG_BULK_READWRITE, 1, &bulk_buf,
486 : &in.total_chunk_size, &actual_count);
487 28 : if(ret != HG_SUCCESS || actual_count != 1) {
488 0 : GKFS_DATA->spdlogger()->error(
489 : "{}() Failed to access allocated buffer from bulk handle",
490 0 : __func__);
491 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
492 : }
493 :
494 28 : auto const host_id = in.host_id;
495 :
496 56 : auto path = make_shared<string>(in.path);
497 : // chnk_ids used by this host
498 56 : vector<uint64_t> chnk_ids_host(in.chunk_n);
499 : // counter to track how many chunks have been assigned
500 28 : auto chnk_id_curr = static_cast<uint64_t>(0);
501 : // chnk sizes per chunk for this host
502 56 : vector<uint64_t> chnk_sizes(in.chunk_n);
503 : // local and origin offsets for bulk operations
504 56 : vector<uint64_t> local_offsets(in.chunk_n);
505 56 : vector<uint64_t> origin_offsets(in.chunk_n);
506 : // how much size is left to assign chunks for reading
507 28 : auto chnk_size_left_host = in.total_chunk_size;
508 : // temporary traveling pointer
509 28 : auto chnk_ptr = static_cast<char*>(bulk_buf);
510 : // temporary variables
511 28 : auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize)
512 28 : ? bulk_size
513 : : gkfs::config::rpc::chunksize;
514 : // object for asynchronous disk IO
515 84 : gkfs::data::ChunkReadOperation chunk_read_op{in.path, in.chunk_n};
516 : /*
517 : * 3. Calculate chunk sizes that correspond to this host and start tasks to
518 : * read from disk
519 : */
520 : // Start to look for a chunk that hashes to this host with the first chunk
521 : // in the buffer
522 102 : for(auto chnk_id_file = in.chunk_start;
523 102 : chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
524 : chnk_id_file++) {
525 : // Continue if chunk does not hash to this host
526 :
527 : // We only check if we are not using replicas
528 :
529 74 : if(!(gkfs::rpc::get_bitset(read_bitset_vect,
530 74 : chnk_id_file - in.chunk_start))) {
531 0 : GKFS_DATA->spdlogger()->trace(
532 : "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
533 0 : __func__, chnk_id_file, host_id, chnk_id_curr);
534 0 : continue;
535 : }
536 74 : if(GKFS_DATA->enable_chunkstats()) {
537 148 : GKFS_DATA->stats()->add_read(in.path, chnk_id_file);
538 : }
539 :
540 :
541 74 : chnk_ids_host[chnk_id_curr] =
542 : chnk_id_file; // save this id to host chunk list
543 : // Only relevant in the first iteration of the loop and if the chunk
544 : // hashes to this host
545 74 : if(chnk_id_file == in.chunk_start && in.offset > 0) {
546 : // if only 1 destination and 1 chunk (small read) the transfer_size
547 : // == bulk_size
548 14 : size_t offset_transfer_size = 0;
549 14 : if(in.offset + bulk_size <= gkfs::config::rpc::chunksize)
550 : offset_transfer_size = bulk_size;
551 : else
552 6 : offset_transfer_size = static_cast<size_t>(
553 6 : gkfs::config::rpc::chunksize - in.offset);
554 : // Setting later transfer offsets
555 14 : local_offsets[chnk_id_curr] = 0;
556 14 : origin_offsets[chnk_id_curr] = 0;
557 14 : bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
558 14 : chnk_sizes[chnk_id_curr] = offset_transfer_size;
559 : // utils variables
560 14 : chnk_ptr += offset_transfer_size;
561 14 : chnk_size_left_host -= offset_transfer_size;
562 : } else {
563 60 : local_offsets[chnk_id_curr] =
564 60 : in.total_chunk_size - chnk_size_left_host;
565 : // origin offset of a chunk is dependent on a given offset in a
566 : // write operation
567 60 : if(in.offset > 0)
568 14 : origin_offsets[chnk_id_curr] =
569 14 : (gkfs::config::rpc::chunksize - in.offset) +
570 14 : ((chnk_id_file - in.chunk_start) - 1) *
571 : gkfs::config::rpc::chunksize;
572 : else
573 46 : origin_offsets[chnk_id_curr] = (chnk_id_file - in.chunk_start) *
574 : gkfs::config::rpc::chunksize;
575 : // last chunk might have different transfer_size
576 60 : if(chnk_id_curr == in.chunk_n - 1)
577 20 : transfer_size = chnk_size_left_host;
578 60 : bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
579 60 : chnk_sizes[chnk_id_curr] = transfer_size;
580 : // utils variables
581 60 : chnk_ptr += transfer_size;
582 60 : chnk_size_left_host -= transfer_size;
583 : }
584 74 : try {
585 : // start tasklet for read operation
586 28 : chunk_read_op.read_nonblock(
587 74 : chnk_id_curr, chnk_ids_host[chnk_id_curr],
588 74 : bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr],
589 74 : (chnk_id_file == in.chunk_start) ? in.offset : 0);
590 0 : } catch(const gkfs::data::ChunkReadOpException& e) {
591 : // This exception is caused by setup of Argobots variables. If this
592 : // fails, something is really wrong
593 0 : GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'",
594 0 : __func__, e.what());
595 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
596 : }
597 74 : chnk_id_curr++;
598 : }
599 : // Sanity check that all chunks where detected in previous loop
600 : // TODO error out. If we continue this will crash the server when sending
601 : // results back that don't exist.
602 28 : if(chnk_size_left_host != 0)
603 0 : GKFS_DATA->spdlogger()->warn(
604 : "{}() Not all chunks were detected!!! Size left {}", __func__,
605 0 : chnk_size_left_host);
606 :
607 28 : if(chnk_size_left_host == in.total_chunk_size)
608 : return HG_CANCELED;
609 :
610 : /*
611 : * 4. Read task results and accumulate in out.io_size
612 : */
613 28 : gkfs::data::ChunkReadOperation::bulk_args bulk_args{};
614 28 : bulk_args.mid = mid;
615 28 : bulk_args.origin_addr = hgi->addr;
616 28 : bulk_args.origin_bulk_handle = in.bulk_handle;
617 28 : bulk_args.origin_offsets = &origin_offsets;
618 28 : bulk_args.local_bulk_handle = bulk_handle;
619 28 : bulk_args.local_offsets = &local_offsets;
620 28 : bulk_args.chunk_ids = &chnk_ids_host;
621 : // wait for all tasklets and push read data back to client
622 28 : auto read_result = chunk_read_op.wait_for_tasks_and_push_back(bulk_args);
623 28 : out.err = read_result.first;
624 28 : out.io_size = read_result.second;
625 :
626 : /*
627 : * 5. Respond and cleanup
628 : */
629 28 : GKFS_DATA->spdlogger()->debug("{}() Sending output response, err: {}",
630 28 : __func__, out.err);
631 28 : auto handler_ret =
632 28 : gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
633 28 : if(GKFS_DATA->enable_stats()) {
634 28 : GKFS_DATA->stats()->add_value_size(
635 : gkfs::utils::Stats::SizeOp::read_size, bulk_size);
636 : }
637 : return handler_ret;
638 : }
639 :
640 :
641 : /**
642 : * @brief Serves a file truncate request and remove all corresponding chunk
643 : * files on this daemon.
644 : * @internal
645 : * A truncate operation includes decreasing the file size of the metadata entry
646 : * (if hashing to this daemon) and removing all corresponding chunks exceeding
647 : * the new file size.
648 : *
649 : * All exceptions must be caught here and dealt with accordingly.
650 : * @endinteral
651 : * @param handle Mercury RPC handle
652 : * @return Mercury error code to Mercury
653 : */
654 : hg_return_t
655 2 : rpc_srv_truncate(hg_handle_t handle) {
656 2 : rpc_trunc_in_t in{};
657 2 : rpc_err_out_t out{};
658 2 : out.err = EIO;
659 : // Getting some information from margo
660 2 : auto ret = margo_get_input(handle, &in);
661 2 : if(ret != HG_SUCCESS) {
662 0 : GKFS_DATA->spdlogger()->error(
663 0 : "{}() Could not get RPC input data with err {}", __func__, ret);
664 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
665 : }
666 2 : GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__,
667 2 : in.path, in.length);
668 :
669 6 : gkfs::data::ChunkTruncateOperation chunk_op{in.path};
670 2 : try {
671 : // start tasklet for truncate operation
672 2 : chunk_op.truncate(in.length);
673 0 : } catch(const gkfs::data::ChunkMetaOpException& e) {
674 : // This exception is caused by setup of Argobots variables. If this
675 : // fails, something is really wrong
676 0 : GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__,
677 0 : e.what());
678 0 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
679 : }
680 :
681 : // wait and get output
682 2 : out.err = chunk_op.wait_for_task();
683 :
684 2 : GKFS_DATA->spdlogger()->debug("{}() Sending output response '{}'", __func__,
685 2 : out.err);
686 2 : return gkfs::rpc::cleanup_respond(&handle, &in, &out);
687 : }
688 :
689 :
690 : /**
691 : * @brief Serves a chunk stat request, responding with space information of the
692 : * node local file system.
693 : * @internal
694 : * All exceptions must be caught here and dealt with accordingly.
695 : * @endinteral
696 : * @param handle Mercury RPC handle
697 : * @return Mercury error code to Mercury
698 : */
699 : hg_return_t
700 2 : rpc_srv_get_chunk_stat(hg_handle_t handle) {
701 2 : GKFS_DATA->spdlogger()->debug("{}() enter", __func__);
702 2 : rpc_chunk_stat_out_t out{};
703 2 : out.err = EIO;
704 2 : try {
705 2 : auto chk_stat = GKFS_DATA->storage()->chunk_stat();
706 2 : out.chunk_size = chk_stat.chunk_size;
707 2 : out.chunk_total = chk_stat.chunk_total;
708 2 : out.chunk_free = chk_stat.chunk_free;
709 2 : out.err = 0;
710 0 : } catch(const gkfs::data::ChunkStorageException& err) {
711 0 : GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
712 0 : out.err = err.code().value();
713 0 : } catch(const ::exception& err) {
714 0 : GKFS_DATA->spdlogger()->error(
715 : "{}() Unexpected error when chunk stat '{}'", __func__,
716 0 : err.what());
717 0 : out.err = EAGAIN;
718 : }
719 :
720 : // Create output and send it back
721 2 : return gkfs::rpc::cleanup_respond(&handle, &out);
722 : }
723 :
724 : } // namespace
725 :
726 82 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_write)
727 :
728 56 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_read)
729 :
730 4 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate)
731 :
732 4 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat)
733 :
734 : #ifdef GKFS_ENABLE_AGIOS
735 : void*
736 : agios_eventual_callback(int64_t request_id, void* info) {
737 : GKFS_DATA->spdlogger()->debug("{}() custom callback request {} is ready",
738 : __func__, request_id);
739 :
740 : ABT_eventual_set((ABT_eventual) info, &request_id, sizeof(int64_t));
741 :
742 : return 0;
743 : }
744 : #endif
|