Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
//
// Created by evie on 9/8/17.
//
#include <rpc/rpc_types.hpp>
#include <rpc/rpc_defs.hpp>
#include <adafs_ops/data.hpp>
using namespace std;
static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
rpc_read_data_in_t in;
rpc_data_out_t out;
void* b_buf;
int err;
hg_bulk_t bulk_handle;
auto ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
ADAFS_DATA->spdlogger()->debug("Got read RPC with path {} size {} offset {}", in.path, in.size, in.offset);
auto hgi = HG_Get_info(handle);
auto mid = margo_hg_class_to_instance(hgi->hg_class);
// set up buffer to read
auto buf = make_unique<char[]>(in.size);
err = read_file(buf.get(), out.io_size, in.path, in.size, in.offset);
if (err != 0) {
ADAFS_DATA->spdlogger()->error("Could not open file with path: {}", in.path);
out.res = err;
ADAFS_DATA->spdlogger()->debug("Sending output response {}", out.res);
auto hret = margo_respond(mid, handle, &out);
if (hret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("Failed to respond to read request");
}
} else {
// set up buffer for bulk transfer
b_buf = (void*) buf.get();
ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in.size, HG_BULK_READ_ONLY, &bulk_handle);
// push data to client
if (ret == HG_SUCCESS)
margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, in.size);
else {
ADAFS_DATA->spdlogger()->error("Failed to send data to client in read operation");
out.res = EIO;
out.io_size = 0;
}
ADAFS_DATA->spdlogger()->debug("Sending output response {}", out.res);
// respond rpc
auto hret = margo_respond(mid, handle, &out);
if (hret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("Failed to respond to read request");
}
HG_Bulk_free(bulk_handle);
}
in.path = nullptr;
// Destroy handle when finished
HG_Free_input(handle, &in);
HG_Free_output(handle, &out);
HG_Destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data)
static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
rpc_write_data_in_t in;
rpc_data_out_t out;
void* b_buf;
hg_bulk_t bulk_handle;
auto ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
ADAFS_DATA->spdlogger()->debug("Got write RPC with path {} size {} offset {}", in.path, in.size, in.offset);
auto hgi = HG_Get_info(handle);
auto mid = margo_hg_class_to_instance(hgi->hg_class);
// register local buffer to fill for bulk pull
auto b_buf_wrap = make_unique<char[]>(in.size);
b_buf = static_cast<void*>(b_buf_wrap.get());
ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in.size, HG_BULK_WRITE_ONLY, &bulk_handle);
// push data to client
if (ret == HG_SUCCESS) {
// pull data from client here
margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, in.size);
// do write operation
auto buf = static_cast<char*>(b_buf);
out.res = write_file(in.path, buf, out.io_size, in.size, in.offset, (in.append == HG_TRUE), in.updated_size);
if (out.res != 0) {
ADAFS_DATA->spdlogger()->error("Failed to write data to local disk.");
out.io_size = 0;
}
HG_Bulk_free(bulk_handle);
} else {
ADAFS_DATA->spdlogger()->error("Failed to pull data from client in write operation");
out.res = EIO;
out.io_size = 0;
}
ADAFS_DATA->spdlogger()->debug("Sending output response {}", out.res);
auto hret = margo_respond(mid, handle, &out);
if (hret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("Failed to respond to write request");
}
in.path = nullptr;
// Destroy handle when finished
HG_Free_input(handle, &in);
HG_Free_output(handle, &out);
HG_Destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(rpc_srv_write_data)