Newer
Older
//
// Created by evie on 8/31/17.
//
#include "daemon/adafs_daemon.hpp"
#include "db/db_util.hpp"
#include "rpc/rpc_types.hpp"
#include "rpc/rpc_defs.hpp"
void daemon_loop(void* arg) {
ADAFS_DATA->spdlogger()->info("Starting application loop ...");
sleep(1);
/* TODO for Nafiseh
* Connect to the IPC socket with the looping thread and listed for messages from the preload lib
* When new message is received spawn a new thread that will trigger the operation and respond to preload lib
* Ensure that messages from the lib are not lost. XXX
*/
// connect to the ipc socket and a separate thread retrieves the message from the preload lib. in
ADAFS_DATA->spdlogger()->info("done sleeping. exiting ...");
break;
}
}
void run_daemon() {
ABT_xstream xstream;
ABT_pool pool;
ABT_thread loop_thread;
auto argo_ret = ABT_xstream_self(
&xstream); // get the current execution stream (1 core) to use (started with ABT_init())
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error getting the execution stream when starting the daemon.");
return;
}
argo_ret = ABT_xstream_get_main_pools(xstream, 1, &pool); // get the thread pool
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error getting the thread pool when starting the daemon.");
return;
}
argo_ret = ABT_thread_create(pool, daemon_loop, nullptr, ABT_THREAD_ATTR_NULL, &loop_thread);
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error creating loop thread");
return;
}
// wait for the daemon to be closed and free the loop thread
ABT_thread_yield_to(loop_thread);
argo_ret = ABT_thread_join(loop_thread);
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error joining loop thread");
return;
}
argo_ret = ABT_thread_free(&loop_thread);
if (argo_ret != 0) {
ADAFS_DATA->spdlogger()->error("Error freeing loop thread.");
return;
}
}
void init_environment() {
// Initialize rocksdb
auto err = init_rocksdb();
assert(err);
err = init_argobots();
assert(err);
// init margo
err = init_rpc_server();
assert(err);
err = init_rpc_client();
assert(err);
// TODO set metadata configurations. these have to go into a user configurable file that is parsed here
ADAFS_DATA->atime_state(false);
ADAFS_DATA->mtime_state(false);
ADAFS_DATA->ctime_state(false);
ADAFS_DATA->uid_state(false);
ADAFS_DATA->gid_state(false);
ADAFS_DATA->inode_no_state(false);
ADAFS_DATA->link_cnt_state(false);
ADAFS_DATA->blocks_state(false);
}
/**
* Destroys the margo, argobots, and mercury environments
*/
void destroy_enviroment() {
ADAFS_DATA->spdlogger()->info("About to finalize the margo server and client");
margo_finalize(RPC_DATA->client_mid());
margo_finalize(RPC_DATA->server_mid());
ADAFS_DATA->spdlogger()->info("Success.");
destroy_argobots();
ADAFS_DATA->spdlogger()->info("About to destroy the mercury context");
HG_Context_destroy(RPC_DATA->client_hg_context());
HG_Context_destroy(RPC_DATA->server_hg_context());
ADAFS_DATA->spdlogger()->info("Success.");
ADAFS_DATA->spdlogger()->info("About to destroy the mercury class");
HG_Finalize(RPC_DATA->client_hg_class());
HG_Finalize(RPC_DATA->server_hg_class());
ADAFS_DATA->spdlogger()->info("Success.");
ADAFS_DATA->spdlogger()->info("All services shut down.");
}
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
/**
* Initializes the Argobots environment
* @return
*/
bool init_argobots() {
ADAFS_DATA->spdlogger()->info("Initializing Argobots ...");
// We need no arguments to init
auto argo_err = ABT_init(0, nullptr);
if (argo_err != 0) {
ADAFS_DATA->spdlogger()->error("ABT_init() Failed to init Argobots (client)");
return false;
}
// Set primary execution stream to idle without polling. Normally xstreams cannot sleep. This is what ABT_snoozer does
argo_err = ABT_snoozer_xstream_self_set();
if (argo_err != 0) {
ADAFS_DATA->spdlogger()->error("ABT_snoozer_xstream_self_set() (client)");
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
return true;
}
/**
* Shuts down Argobots
*/
void destroy_argobots() {
ADAFS_DATA->spdlogger()->info("About to shut Argobots down"s);
auto ret = ABT_finalize();
if (ret == ABT_SUCCESS) {
ADAFS_DATA->spdlogger()->info("Argobots successfully shutdown.");
} else {
ADAFS_DATA->spdlogger()->error("Argobots shutdown FAILED with err code {}", ret);
}
}
bool init_rpc_server() {
auto protocol_port = "bmi+tcp://localhost:" + to_string(RPCPORT);
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
hg_addr_t addr_self;
hg_size_t addr_self_cstring_sz = 128;
char addr_self_cstring[128];
// Mercury class and context pointer that go into RPC_data class
hg_class_t* hg_class;
hg_context_t* hg_context;
ADAFS_DATA->spdlogger()->info("Initializing Mercury server ...");
/* MERCURY PART */
// Init Mercury layer (must be finalized when finished)
hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
if (hg_class == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury server layer");
return false;
}
// Create a new Mercury context (must be destroyed when finished)
hg_context = HG_Context_create(hg_class);
if (hg_context == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury server context");
HG_Finalize(hg_class);
return false;
}
// Below is just for logging purposes
// Figure out what address this server is listening on (must be freed when finished)
auto hg_ret = HG_Addr_self(hg_class, &addr_self);
if (hg_ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve server address");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
// Convert the address to a cstring (with \0 terminator).
hg_ret = HG_Addr_to_string(hg_class, addr_self_cstring, &addr_self_cstring_sz, addr_self);
if (hg_ret != HG_SUCCESS) {
ADAFS_DATA->spdlogger()->error("HG_Addr_to_string Failed to convert address to cstring");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
HG_Addr_free(hg_class, addr_self);
return false;
}
HG_Addr_free(hg_class, addr_self);
ADAFS_DATA->spdlogger()->info("Success. Accepting RPCs on address {}", addr_self_cstring);
/* MARGO PART */
ADAFS_DATA->spdlogger()->info("Initializing Margo server...");
// Start Margo
auto mid = margo_init(1, 16, hg_context);
if (mid == MARGO_INSTANCE_NULL) {
ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo server");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
// Put context and class into RPC_data object
RPC_DATA->server_hg_class(hg_class);
RPC_DATA->server_hg_context(hg_context);
RPC_DATA->server_mid(mid);
// register RPCs
// register_server_rpcs();
/**
* Register the rpcs for the server. There is no need to store rpc ids for the server
* @param hg_class
*/
void register_server_rpcs() {
auto hg_class = RPC_DATA->server_hg_class();
MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, rpc_minimal_handler);
MERCURY_REGISTER(hg_class, "rpc_srv_create_node", rpc_create_node_in_t, rpc_res_out_t, rpc_srv_create_node_handler);
}
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
bool init_rpc_client() {
auto protocol_port = "bmi+tcp"s;
ADAFS_DATA->spdlogger()->info("Initializing Mercury client ...");
/* MERCURY PART */
// Init Mercury layer (must be finalized when finished)
hg_class_t* hg_class;
hg_context_t* hg_context;
hg_class = HG_Init(protocol_port.c_str(), HG_FALSE);
if (hg_class == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury client layer");
return false;
}
// Create a new Mercury context (must be destroyed when finished)
hg_context = HG_Context_create(hg_class);
if (hg_context == nullptr) {
ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury client context");
HG_Finalize(hg_class);
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
/* MARGO PART */
ADAFS_DATA->spdlogger()->info("Initializing Margo client ...");
// Start Margo
auto mid = margo_init(0, 0,
hg_context);
if (mid == MARGO_INSTANCE_NULL) {
ADAFS_DATA->spdlogger()->info("[ERR]: margo_init failed to initialize the Margo client");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return false;
}
ADAFS_DATA->spdlogger()->info("Success.");
// Put context and class into RPC_data object
RPC_DATA->client_hg_class(hg_class);
RPC_DATA->client_hg_context(hg_context);
RPC_DATA->client_mid(mid);
// register_client_rpcs();
return true;
}
/**
* Register rpcs for the client and add the rpc id to rpc_data
* @param hg_class
*/
void register_client_rpcs() {
auto hg_class = RPC_DATA->client_hg_class();
RPC_DATA->rpc_minimal_id(MERCURY_REGISTER(hg_class, "rpc_minimal", rpc_minimal_in_t, rpc_minimal_out_t, nullptr));
RPC_DATA->rpc_srv_create_node_id(
MERCURY_REGISTER(hg_class, "rpc_srv_create_node", rpc_create_node_in_t, rpc_res_out_t, nullptr));