Newer
Older
#include <daemon/adafs_daemon.hpp>
#include <db/db_util.hpp>
#include <rpc/rpc_types.hpp>
#include <rpc/rpc_defs.hpp>
#include <preload/ipc_types.hpp>
void daemon_loop(void* arg) {
ADAFS_DATA->spdlogger()->info("Starting application loop ...");
/* TODO for Nafiseh
* Connect to the IPC socket with the looping thread and listed for messages from the preload lib
* When new message is received spawn a new thread that will trigger the operation and respond to preload lib
* Ensure that messages from the lib are not lost. XXX
*/
// connect to the ipc socket and a separate thread retrieves the message from the preload lib. in
ADAFS_DATA->spdlogger()->info("done sleeping. exiting ...");
break;
}
}
void run_daemon() {
ABT_xstream xstream;
ABT_pool pool;
ABT_thread loop_thread;
auto argo_ret = ABT_xstream_self(
&xstream); // get the current execution stream (1 core) to use (started with ABT_init())
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error getting the execution stream when starting the daemon.");
return;
}
argo_ret = ABT_xstream_get_main_pools(xstream, 1, &pool); // get the thread pool
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error getting the thread pool when starting the daemon.");
return;
}
argo_ret = ABT_thread_create(pool, daemon_loop, nullptr, ABT_THREAD_ATTR_NULL, &loop_thread);
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error creating loop thread");
return;
}
// wait for the daemon to be closed and free the loop thread
ABT_thread_yield_to(loop_thread);
argo_ret = ABT_thread_join(loop_thread);
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error joining loop thread");
return;
}
argo_ret = ABT_thread_free(&loop_thread);
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error freeing loop thread.");
return;
}
}
void init_environment() {
// Initialize rocksdb
auto err = init_rocksdb();
assert(err);
err = init_argobots();
assert(err);
// init margo
err = init_rpc_server();
assert(err);
err = init_ipc_server();
assert(err);
Marc Vef
committed
err = init_rpc_client();
// TODO set metadata configurations. these have to go into a user configurable file that is parsed here
ADAFS_DATA->atime_state(false);
ADAFS_DATA->mtime_state(false);
ADAFS_DATA->ctime_state(false);
ADAFS_DATA->uid_state(false);
ADAFS_DATA->gid_state(false);
ADAFS_DATA->inode_no_state(false);
ADAFS_DATA->link_cnt_state(false);
ADAFS_DATA->blocks_state(false);
}
/**
* Destroys the margo, argobots, and mercury environments
*/
void destroy_enviroment() {
ADAFS_DATA->spdlogger()->info("About to finalize the margo server and client");
margo_finalize(RPC_DATA->client_mid());
margo_finalize(RPC_DATA->server_rpc_mid());
margo_finalize(RPC_DATA->server_ipc_mid());
ADAFS_DATA->spdlogger()->info("Success.");
destroy_argobots();
ADAFS_DATA->spdlogger()->info("About to destroy the mercury context");
HG_Context_destroy(RPC_DATA->client_hg_context());
HG_Context_destroy(RPC_DATA->server_rpc_hg_context());
HG_Context_destroy(RPC_DATA->server_ipc_hg_context());
ADAFS_DATA->spdlogger()->info("Success.");
ADAFS_DATA->spdlogger()->info("About to destroy the mercury class");
HG_Finalize(RPC_DATA->client_hg_class());
HG_Finalize(RPC_DATA->server_rpc_hg_class());
HG_Finalize(RPC_DATA->server_ipc_hg_class());
ADAFS_DATA->spdlogger()->info("Success.");
ADAFS_DATA->spdlogger()->info("All services shut down.");
}
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* Initializes the Argobots environment
* @return
*/
bool init_argobots() {
ADAFS_DATA->spdlogger()->info("Initializing Argobots ...");
// We need no arguments to init
auto argo_err = ABT_init(0, nullptr);
if (argo_err != 0) {
ADAFS_DATA->spdlogger()->error("ABT_init() Failed to init Argobots (client)");
return false;
}
// Set primary execution stream to idle without polling. Normally xstreams cannot sleep. This is what ABT_snoozer does
argo_err = ABT_snoozer_xstream_self_set();
if (argo_err != 0) {
ADAFS_DATA->spdlogger()->error("ABT_snoozer_xstream_self_set() (client)");
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
return true;
}
/**
* Shuts down Argobots
*/
void destroy_argobots() {
ADAFS_DATA->spdlogger()->info("About to shut Argobots down"s);
auto ret = ABT_finalize();
if (ret == ABT_SUCCESS) {
ADAFS_DATA->spdlogger()->info("Argobots successfully shutdown.");
} else {
ADAFS_DATA->spdlogger()->error("Argobots shutdown FAILED with err code {}", ret);
}
}
auto protocol_port = RPC_PROTOCOL + "://localhost:"s + to_string(RPCPORT);
hg_addr_t addr_self;
hg_size_t addr_self_cstring_sz = 128;
char addr_self_cstring[128];
// Mercury class and context pointer that go into RPC_data class
hg_class_t* hg_class;
hg_context_t* hg_context;
ADAFS_DATA->spdlogger()->info("Initializing Mercury RPC server ...");
/* MERCURY PART */
// Init Mercury layer (must be finalized when finished)
hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
if (hg_class == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury RPC server layer");
return false;
}
// Create a new Mercury context (must be destroyed when finished)
hg_context = HG_Context_create(hg_class);
if (hg_context == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury RPC server context");
HG_Finalize(hg_class);
return false;
}
// Below is just for logging purposes
// Figure out what address this server is listening on (must be freed when finished)
auto hg_ret = HG_Addr_self(hg_class, &addr_self);
if (hg_ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve server RPC address");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
// Convert the address to a cstring (with \0 terminator).
hg_ret = HG_Addr_to_string(hg_class, addr_self_cstring, &addr_self_cstring_sz, addr_self);
if (hg_ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("HG_Addr_to_string Failed to convert address to cstring");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
HG_Addr_free(hg_class, addr_self);
return false;
}
HG_Addr_free(hg_class, addr_self);
ADAFS_DATA->spdlogger()->info("Success. Accepting RPCs on address {}", addr_self_cstring);
/* MARGO PART */
ADAFS_DATA->spdlogger()->info("Initializing Margo RPC server...");
// Start Margo
auto mid = margo_init(1, 16, hg_context);
if (mid == MARGO_INSTANCE_NULL) {
ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo RPC server");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
// Put context and class into RPC_data object
RPC_DATA->server_rpc_hg_class(hg_class);
RPC_DATA->server_rpc_hg_context(hg_context);
RPC_DATA->server_rpc_mid(mid);
register_server_rpcs();
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
bool init_ipc_server() {
auto protocol_port = "na+sm://"s;
hg_addr_t addr_self;
hg_size_t addr_self_cstring_sz = 128;
char addr_self_cstring[128];
// Mercury class and context pointer that go into RPC_data class
hg_class_t* hg_class;
hg_context_t* hg_context;
ADAFS_DATA->spdlogger()->info("Initializing Mercury IPC server ...");
/* MERCURY PART */
// Init Mercury layer (must be finalized when finished)
hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
if (hg_class == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury IPC server layer");
return false;
}
// Create a new Mercury context (must be destroyed when finished)
hg_context = HG_Context_create(hg_class);
if (hg_context == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury IPC server context");
HG_Finalize(hg_class);
return false;
}
// Below is just for logging purposes
// Figure out what address this server is listening on (must be freed when finished)
auto hg_ret = HG_Addr_self(hg_class, &addr_self);
if (hg_ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve IPC server address");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
// Convert the address to a cstring (with \0 terminator).
hg_ret = HG_Addr_to_string(hg_class, addr_self_cstring, &addr_self_cstring_sz, addr_self);
if (hg_ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("HG_Addr_to_string Failed to convert address to cstring");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
HG_Addr_free(hg_class, addr_self);
return false;
}
HG_Addr_free(hg_class, addr_self);
ADAFS_DATA->spdlogger()->info("Success. Accepting IPCs on PID {}", addr_self_cstring);
/* MARGO PART */
ADAFS_DATA->spdlogger()->info("Initializing Margo IPC server...");
// Start Margo
auto mid = margo_init(1, 16, hg_context);
if (mid == MARGO_INSTANCE_NULL) {
ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo IPC server");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
// Put context and class into RPC_data object
RPC_DATA->server_ipc_hg_class(hg_class);
RPC_DATA->server_ipc_hg_context(hg_context);
RPC_DATA->server_ipc_mid(mid);
// register RPCs
register_server_ipcs();
return true;
}
/**
* Register the rpcs for the server. There is no need to store rpc ids for the server
* @param hg_class
*/
void register_server_rpcs() {
auto hg_class = RPC_DATA->server_rpc_hg_class();
MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, rpc_minimal_handler);
MERCURY_REGISTER(hg_class, "rpc_srv_create_node", rpc_create_node_in_t, rpc_res_out_t, rpc_srv_create_node_handler);
MERCURY_REGISTER(hg_class, "rpc_srv_attr", rpc_get_attr_in_t, rpc_get_attr_out_t, rpc_srv_attr_handler);
MERCURY_REGISTER(hg_class, "rpc_srv_remove_node", rpc_remove_node_in_t, rpc_res_out_t, rpc_srv_remove_node_handler);
MERCURY_REGISTER(hg_class, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t, rpc_srv_write_data_handler);
MERCURY_REGISTER(hg_class, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t, rpc_srv_read_data_handler);
}
void register_server_ipcs() {
auto hg_class = RPC_DATA->server_ipc_hg_class();
MERCURY_REGISTER(hg_class, "ipc_srv_fs_config", ipc_config_in_t, ipc_config_out_t, ipc_srv_fs_config_handler);
MERCURY_REGISTER(hg_class, "ipc_srv_open", ipc_open_in_t, ipc_res_out_t, ipc_srv_open_handler);
MERCURY_REGISTER(hg_class, "ipc_srv_stat", ipc_stat_in_t, ipc_stat_out_t, ipc_srv_stat_handler);
MERCURY_REGISTER(hg_class, "ipc_srv_unlink", ipc_unlink_in_t, ipc_res_out_t, ipc_srv_unlink_handler);
Marc Vef
committed
bool init_rpc_client() {
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
ADAFS_DATA->spdlogger()->info("Initializing Mercury client ...");
/* MERCURY PART */
// Init Mercury layer (must be finalized when finished)
hg_class_t* hg_class;
hg_context_t* hg_context;
hg_class = HG_Init(protocol_port.c_str(), HG_FALSE);
if (hg_class == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury client layer");
return false;
}
// Create a new Mercury context (must be destroyed when finished)
hg_context = HG_Context_create(hg_class);
if (hg_context == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury client context");
HG_Finalize(hg_class);
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
/* MARGO PART */
ADAFS_DATA->spdlogger()->info("Initializing Margo client ...");
// Start Margo
auto mid = margo_init(0, 0,
hg_context);
if (mid == MARGO_INSTANCE_NULL) {
ADAFS_DATA->spdlogger()->info("[ERR]: margo_init failed to initialize the Margo client");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
// Put context and class into RPC_data object
RPC_DATA->client_hg_class(hg_class);
RPC_DATA->client_hg_context(hg_context);
RPC_DATA->client_mid(mid);
register_client_rpcs();
}
/**
* Register rpcs for the client and add the rpc id to rpc_data
* @param hg_class
*/
void register_client_rpcs() {
auto hg_class = RPC_DATA->client_hg_class();
RPC_DATA->rpc_minimal_id(MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, nullptr));
RPC_DATA->rpc_srv_create_node_id(
MERCURY_REGISTER(hg_class, "rpc_srv_create_node", rpc_create_node_in_t, rpc_res_out_t, nullptr));
RPC_DATA->rpc_srv_attr_id(
MERCURY_REGISTER(hg_class, "rpc_srv_attr", rpc_get_attr_in_t, rpc_get_attr_out_t, nullptr));
RPC_DATA->rpc_srv_remove_node_id(
MERCURY_REGISTER(hg_class, "rpc_srv_remove_node", rpc_remove_node_in_t, rpc_res_out_t, nullptr));
RPC_DATA->rpc_srv_write_data_id(
MERCURY_REGISTER(hg_class, "rpc_srv_write_data", rpc_write_data_in_t, rpc_data_out_t, nullptr));
RPC_DATA->rpc_srv_read_data_id(
MERCURY_REGISTER(hg_class, "rpc_srv_read_data", rpc_read_data_in_t, rpc_data_out_t, nullptr));