Newer
Older
Marc Vef
committed
#include <global/rpc/rpc_types.hpp>
Marc Vef
committed
#include <daemon/handler/rpc_defs.hpp>
Marc Vef
committed
#include <global/rpc/rpc_utils.hpp>
#include <global/rpc/distributor.hpp>
#include <daemon/adafs_daemon.hpp>
#include <daemon/backend/data/chunk_storage.hpp>
struct write_chunk_args {
const std::string* path;
const char* buf;
rpc_chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
/**
* Used by an argobots threads. Argument args has the following fields:
* const std::string* path;
const char* buf;
const rpc_chnk_id_t* chnk_id;
size_t size;
off64_t off;
ABT_eventual* eventual;
* This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon.
* This function is called by tasklets, as this function cannot be allowed to block.
* @return written_size<size_t> is put into eventual and returned that way
*/
void write_file_abt(void* _arg) {
// Unpack args
auto* arg = static_cast<struct write_chunk_args*>(_arg);
const std::string& path = *(arg->path);
try {
ADAFS_DATA->storage()->write_chunk(path, arg->chnk_id,
arg->buf, arg->size, arg->off, arg->eventual);
} catch (const std::exception& e){
ADAFS_DATA->spdlogger()->error("{}() Error writing chunk {} of file {}", __func__, arg->chnk_id, path);
auto err = static_cast<size_t>(EIO);
ABT_eventual_set(arg->eventual, &err, sizeof(size_t));
}
}
struct read_chunk_args {
const std::string* path;
char* buf;
rpc_chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
/**
* Used by an argobots threads. Argument args has the following fields:
* const std::string* path;
char* buf;
const rpc_chnk_id_t* chnk_id;
size_t size;
off64_t off;
ABT_eventual* eventual;
* This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon.
* This function is called by tasklets, as this function cannot be allowed to block.
* @return read_size<size_t> is put into eventual and returned that way
*/
void read_file_abt(void* _arg) {
//unpack args
auto* arg = static_cast<struct read_chunk_args*>(_arg);
const std::string& path = *(arg->path);
try {
ADAFS_DATA->storage()->read_chunk(path, arg->chnk_id,
arg->buf, arg->size, arg->off, arg->eventual);
} catch (const std::exception& e){
ADAFS_DATA->spdlogger()->error("{}() Error reading chunk {} of file {}", __func__, arg->chnk_id, path);
auto err = static_cast<size_t>(EIO);
ABT_eventual_set(arg->eventual, &err, sizeof(size_t));
/**
* Free Argobots tasks and eventual constructs in a given vector until max_idx.
* Nothing is done for a vector if nullptr is given
* @param abt_tasks
* @param abt_eventuals
* @param max_idx
* @return
*/
void cancel_abt_io(vector<ABT_task>* abt_tasks, vector<ABT_eventual>* abt_eventuals, uint64_t max_idx) {
if (abt_tasks != nullptr) {
for (uint64_t i = 0; i < max_idx; i++) {
ABT_task_cancel(abt_tasks->at(i));
ABT_task_free(&abt_tasks->at(i));
if (abt_eventuals != nullptr) {
for (uint64_t i = 0; i < max_idx; i++) {
ABT_eventual_reset(abt_eventuals->at(i));
ABT_eventual_free(&abt_eventuals->at(i));
}
}
static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
/*
* 1. Setup
*/
rpc_write_data_in_t in{};
rpc_data_out_t out{};
hg_bulk_t bulk_handle = nullptr;
out.res = EIO;
out.io_size = 0;
// Getting some information from margo
auto ret = margo_get_input(handle, &in);
if (ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("{}() Could not get RPC input data with err {}", __func__, ret);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
}
auto hgi = margo_get_info(handle);
auto mid = margo_hg_info_get_instance(hgi);
auto bulk_size = margo_bulk_get_size(in.bulk_handle);
ADAFS_DATA->spdlogger()->debug("{}() Got write RPC (local {}) with path {} size {} offset {}", __func__,
(margo_get_info(handle)->context_id == ADAFS_DATA->host_id()), in.path, bulk_size,
in.offset);
/*
* 2. Set up buffers for pull bulk transfers
*/
void* bulk_buf; // buffer for bulk transfer
vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
// create bulk handle and allocated memory for buffer with buf_sizes information
ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size, HG_BULK_READWRITE, &bulk_handle);
if (ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
// access the internally allocated memory buffer and put it into buf_ptrs
uint32_t actual_count;
ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size, HG_BULK_READWRITE, 1, &bulk_buf,
&in.total_chunk_size, &actual_count);
ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
}
// chnk_ids used by this host
// counter to track how many chunks have been assigned
auto chnk_id_curr = static_cast<uint64_t>(0);
// chnk sizes per chunk for this host
vector<uint64_t> chnk_sizes(in.chunk_n);
// how much size is left to assign chunks for writing
auto chnk_size_left_host = in.total_chunk_size;
// temporary traveling pointer
auto chnk_ptr = static_cast<char*>(bulk_buf);
/*
* consider the following cases:
* 1. Very first chunk has offset or not and is serviced by this node
* 2. If offset, will still be only 1 chunk written (small IO): (offset + bulk_size <= CHUNKSIZE) ? bulk_size
* 3. If no offset, will only be 1 chunk written (small IO): (bulk_size <= CHUNKSIZE) ? bulk_size
* 4. Chunks between start and end chunk have size of the CHUNKSIZE
* 5. Last chunk (if multiple chunks are written): Don't write CHUNKSIZE but chnk_size_left for this destination
* Last chunk can also happen if only one chunk is written. This is covered by 2 and 3.
*/
auto transfer_size = (bulk_size <= CHUNKSIZE) ? bulk_size : CHUNKSIZE;
uint64_t origin_offset;
uint64_t local_offset;
// task structures for async writing
vector<ABT_task> abt_tasks(in.chunk_n);
vector<ABT_eventual> task_eventuals(in.chunk_n);
vector<unique_ptr<struct write_chunk_args>> task_args(in.chunk_n);
/*
* 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk
*/
// Start to look for a chunk that hashes to this host with the first chunk in the buffer
for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
// Continue if chunk does not hash to this host
if (ADAFS_DATA->distributor()->locate_data(in.path, chnk_id_file) != ADAFS_DATA->host_id())
chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
// offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host
if (chnk_id_file == in.chunk_start && in.offset > 0) {
// if only 1 destination and 1 chunk (small write) the transfer_size == bulk_size
auto offset_transfer_size = (in.offset + bulk_size <= CHUNKSIZE) ? bulk_size : static_cast<size_t>(
CHUNKSIZE - in.offset);
ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, offset_transfer_size);
if (ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error(
"{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__,
chnk_id_file, in.chunk_start, in.chunk_end - 1);
cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
chnk_sizes[chnk_id_curr] = offset_transfer_size;
chnk_ptr += offset_transfer_size;
local_offset = in.total_chunk_size - chnk_size_left_host;
// origin offset of a chunk is dependent on a given offset in a write operation
if (in.offset > 0)
origin_offset = (CHUNKSIZE - in.offset) + ((chnk_id_file - in.chunk_start) - 1) * CHUNKSIZE;
origin_offset = (chnk_id_file - in.chunk_start) * CHUNKSIZE;
// last chunk might have different transfer_size
if (chnk_id_curr == in.chunk_n - 1)
transfer_size = chnk_size_left_host;
ADAFS_DATA->spdlogger()->trace(
"{}() BULK_TRANSFER hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}",
__func__, ADAFS_DATA->host_id(), in.path, chnk_id_file, in.total_chunk_size, chnk_size_left_host,
origin_offset, local_offset, transfer_size);
// RDMA the data to here
ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, origin_offset,
bulk_handle, local_offset, transfer_size);
if (ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error(
"{}() Failed to pull data from client. file {} chunk {} (startchunk {}; endchunk {})", __func__,
*path, chnk_id_file, in.chunk_start, (in.chunk_end - 1));
cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
}
bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
chnk_sizes[chnk_id_curr] = transfer_size;
chnk_ptr += transfer_size;
// Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool
// Starting tasklets for parallel I/O
ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value
auto task_arg = make_unique<struct write_chunk_args>();
task_arg->path = path.get();
task_arg->buf = bulk_buf_ptrs[chnk_id_curr];
task_arg->chnk_id = chnk_ids_host[chnk_id_curr];
task_arg->size = chnk_sizes[chnk_id_curr];
// only the first chunk gets the offset. the chunks are sorted on the client side
task_arg->off = (chnk_id_file == in.chunk_start) ? in.offset : 0;
task_arg->eventual = task_eventuals[chnk_id_curr];
task_args[chnk_id_curr] = std::move(task_arg);
auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &(*task_args[chnk_id_curr]),
&abt_tasks[chnk_id_curr]);
if (abt_ret != ABT_SUCCESS) {
ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
// next chunk
// Sanity check that all chunks where detected in previous loop
if (chnk_size_left_host != 0)
ADAFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__,
chnk_size_left_host);
/*
* 4. Read task results and accumulate in out.io_size
*/
for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) {
size_t* task_written_size;
// wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler
ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_written_size);
if (task_written_size == nullptr || *task_written_size == 0) {
ADAFS_DATA->spdlogger()->error("{}() Writing file task for chunk {} failed and did return anything.",
__func__, chnk_id_curr);
/*
* XXX We have to talk about how chunk errors are handled? Should we try to write again?
* In any case we just ignore this for now and return the out.io_size with as much has been written
* After all, we can decide on the semantics.
*/
out.io_size += *task_written_size; // add task written size to output size
ABT_eventual_free(&task_eventuals[chnk_id_curr]);
// Sanity check to see if all data has been written
if (in.total_chunk_size != out.io_size)
ADAFS_DATA->spdlogger()->warn("{}() total chunk size {} and out.io_size {} mismatch!", __func__,
in.total_chunk_size, out.io_size);
/*
* 5. Respond and cleanup
*/
out.res = 0; // Set errorcode to succcess
ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
// free tasks after responding
for (auto&& task : abt_tasks) {
ABT_task_join(task);
ABT_task_free(&task);
}
return ret;
}
DEFINE_MARGO_RPC_HANDLER(rpc_srv_write_data)
static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
/*
* 1. Setup
*/
rpc_read_data_in_t in{};
rpc_data_out_t out{};
hg_bulk_t bulk_handle = nullptr;
// Set default out for error
out.res = EIO;
out.io_size = 0;
// Getting some information from margo
auto ret = margo_get_input(handle, &in);
if (ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("{}() Could not get RPC input data with err {}", __func__, ret);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
}
auto hgi = margo_get_info(handle);
auto mid = margo_hg_info_get_instance(hgi);
auto bulk_size = margo_bulk_get_size(in.bulk_handle);
ADAFS_DATA->spdlogger()->debug("{}() Got read RPC (local {}) with path {} size {} offset {}", __func__,
(margo_get_info(handle)->context_id == ADAFS_DATA->host_id()), in.path, bulk_size,
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
in.offset);
/*
* 2. Set up buffers for pull bulk transfers
*/
void* bulk_buf; // buffer for bulk transfer
vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
// create bulk handle and allocated memory for buffer with buf_sizes information
ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size, HG_BULK_READWRITE, &bulk_handle);
if (ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("{}() Failed to create bulk handle", __func__);
return rpc_cleanup_respond(&handle, &in, &out, static_cast<hg_bulk_t*>(nullptr));
}
// access the internally allocated memory buffer and put it into buf_ptrs
uint32_t actual_count;
ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size, HG_BULK_READWRITE, 1, &bulk_buf,
&in.total_chunk_size, &actual_count);
if (ret != HG_SUCCESS || actual_count != 1) {
ADAFS_DATA->spdlogger()->error("{}() Failed to access allocated buffer from bulk handle", __func__);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
}
auto path = make_shared<string>(in.path);
// chnk_ids used by this host
vector<uint64_t> chnk_ids_host(in.chunk_n);
// counter to track how many chunks have been assigned
auto chnk_id_curr = static_cast<uint64_t>(0);
// chnk sizes per chunk for this host
vector<uint64_t> chnk_sizes(in.chunk_n);
// local and origin offsets for bulk operations
vector<uint64_t> local_offsets(in.chunk_n);
vector<uint64_t> origin_offsets(in.chunk_n);
// how much size is left to assign chunks for reading
auto chnk_size_left_host = in.total_chunk_size;
// temporary traveling pointer
auto chnk_ptr = static_cast<char*>(bulk_buf);
// temporary variables
auto transfer_size = (bulk_size <= CHUNKSIZE) ? bulk_size : CHUNKSIZE;
// tasks structures
vector<ABT_task> abt_tasks(in.chunk_n);
vector<ABT_eventual> task_eventuals(in.chunk_n);
vector<unique_ptr<struct read_chunk_args>> task_args(in.chunk_n);
/*
* 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk
*/
// Start to look for a chunk that hashes to this host with the first chunk in the buffer
for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
// Continue if chunk does not hash to this host
if (ADAFS_DATA->distributor()->locate_data(in.path, chnk_id_file) != ADAFS_DATA->host_id())
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
continue;
chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
// Only relevant in the first iteration of the loop and if the chunk hashes to this host
if (chnk_id_file == in.chunk_start && in.offset > 0) {
// if only 1 destination and 1 chunk (small read) the transfer_size == bulk_size
auto offset_transfer_size = (in.offset + bulk_size <= CHUNKSIZE) ? bulk_size : static_cast<size_t>(
CHUNKSIZE - in.offset);
// Setting later transfer offsets
local_offsets[chnk_id_curr] = 0;
origin_offsets[chnk_id_curr] = 0;
bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
chnk_sizes[chnk_id_curr] = offset_transfer_size;
// util variables
chnk_ptr += offset_transfer_size;
chnk_size_left_host -= offset_transfer_size;
} else {
local_offsets[chnk_id_curr] = in.total_chunk_size - chnk_size_left_host;
// origin offset of a chunk is dependent on a given offset in a write operation
if (in.offset > 0)
origin_offsets[chnk_id_curr] =
(CHUNKSIZE - in.offset) + ((chnk_id_file - in.chunk_start) - 1) * CHUNKSIZE;
else
origin_offsets[chnk_id_curr] = (chnk_id_file - in.chunk_start) * CHUNKSIZE;
// last chunk might have different transfer_size
if (chnk_id_curr == in.chunk_n - 1)
transfer_size = chnk_size_left_host;
bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
chnk_sizes[chnk_id_curr] = transfer_size;
// util variables
chnk_ptr += transfer_size;
chnk_size_left_host -= transfer_size;
}
// Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool
// Starting tasklets for parallel I/O
ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value
auto task_arg = make_unique<read_chunk_args>();
task_arg->path = path.get();
task_arg->buf = bulk_buf_ptrs[chnk_id_curr];
task_arg->chnk_id = chnk_ids_host[chnk_id_curr];
task_arg->size = chnk_sizes[chnk_id_curr];
// only the first chunk gets the offset. the chunks are sorted on the client side
task_arg->off = (chnk_id_file == in.chunk_start) ? in.offset : 0;
task_arg->eventual = task_eventuals[chnk_id_curr];
task_args[chnk_id_curr] = std::move(task_arg);
auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*task_args[chnk_id_curr]),
&abt_tasks[chnk_id_curr]);
if (abt_ret != ABT_SUCCESS) {
ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
}
chnk_id_curr++;
}
// Sanity check that all chunks where detected in previous loop
if (chnk_size_left_host != 0)
ADAFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__,
chnk_size_left_host);
/*
* 4. Read task results and accumulate in out.io_size
*/
for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) {
size_t* task_read_size;
// wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler
ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size);
if (task_read_size == nullptr || *task_read_size == 0) {
ADAFS_DATA->spdlogger()->error("{}() Reading file task for chunk {} failed and did return anything.",
__func__, chnk_id_curr);
/*
* XXX We have to talk about how chunk errors are handled? Should we try to read again?
* In any case we just ignore this for now and return the out.io_size with as much has been read
* After all, we can decide on the semantics.
*/
} else {
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id_curr],
bulk_handle, local_offsets[chnk_id_curr], chnk_sizes[chnk_id_curr]);
if (ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error(
"{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}",
__func__, chnk_id_curr, in.path, origin_offsets[chnk_id_curr], local_offsets[chnk_id_curr],
chnk_sizes[chnk_id_curr]);
cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n);
return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
}
out.io_size += *task_read_size; // add task read size to output size
}
ABT_eventual_free(&task_eventuals[chnk_id_curr]);
}
// Sanity check to see if all data has been written
if (in.total_chunk_size != out.io_size)
ADAFS_DATA->spdlogger()->warn("{}() total chunk size {} and out.io_size {} mismatch!", __func__,
in.total_chunk_size, out.io_size);
/*
* 5. Respond and cleanup
*/
out.res = 0; // Set errorcode to succcess
ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
// free tasks after responding
for (auto&& task : abt_tasks) {
ABT_task_join(task);
ABT_task_free(&task);
}
return ret;