Loading ifs/include/global/configure.hpp +13 −6 Original line number Diff line number Diff line Loading @@ -45,14 +45,21 @@ // However, when full the application blocks until **all** entries are flushed to disk. //#define KV_WRITE_BUFFER 16384 // Margo configuration // Margo and Argobots configuration // Number of threads used for concurrent I/O in the daemon and preload library per process #define IO_THREADS 8 #define IO_LIBRARY_THREADS 8 /* * Indicates the number of concurrent progress to drive I/O operations of chunk files to and from local file systems * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler */ #define DAEMON_IO_XSTREAMS 8 /* * Sets the number of concurrent progress for sending I/O related RPCs to daemons * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler */ #define PRELOAD_IORPC_XSTREAMS 8 // Number of threads used for RPC and IPC handlers at the daemon #define RPC_HANDLER_THREADS 8 #define IPC_HANDLER_THREADS 8 #define DAEMON_RPC_HANDLER_XSTREAMS 8 #define DAEMON_IPC_HANDLER_XSTREAMS 8 #define RPC_PORT 4433 #define RPC_TRIES 3 // rpc timeout to try again in milliseconds Loading ifs/src/daemon/adafs_daemon.cpp +4 −4 Original line number Diff line number Diff line Loading @@ -92,9 +92,9 @@ void destroy_enviroment() { } bool init_io_tasklet_pool() { vector<ABT_xstream> io_streams_tmp(IO_THREADS); vector<ABT_xstream> io_streams_tmp(DAEMON_IO_XSTREAMS); ABT_pool io_pools_tmp; auto ret = ABT_snoozer_xstream_create(IO_THREADS, &io_pools_tmp, io_streams_tmp.data()); auto ret = ABT_snoozer_xstream_create(DAEMON_IO_XSTREAMS, &io_pools_tmp, io_streams_tmp.data()); if (ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error( "{}() ABT_snoozer_xstream_create() failed to initialize ABT_pool for I/O operations", __func__); Loading @@ -114,7 +114,7 @@ bool init_ipc_server() { ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo IPC server...", __func__); // Start Margo (this will also initialize Argobots and Mercury internally) auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, IPC_HANDLER_THREADS); auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, DAEMON_IPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init() failed to initialize the Margo IPC server", __func__); Loading Loading @@ -158,7 +158,7 @@ bool init_rpc_server() { char addr_self_cstring[128]; ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo RPC server...", __func__); // Start Margo (this will also initialize Argobots and Mercury internally) auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, RPC_HANDLER_THREADS); auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, DAEMON_RPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init failed to initialize the Margo RPC server", __func__); return false; Loading ifs/src/preload/preload.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -78,8 +78,8 @@ bool init_ld_argobots() { */ putenv(const_cast<char*>("ABT_MEM_MAX_NUM_STACKS=8")); // Creating pool for driving IO RPCs vector<ABT_xstream> io_streams_tmp(IO_LIBRARY_THREADS); argo_err = ABT_snoozer_xstream_create(IO_LIBRARY_THREADS, &io_pool, io_streams_tmp.data()); vector<ABT_xstream> io_streams_tmp(PRELOAD_IORPC_XSTREAMS); argo_err = ABT_snoozer_xstream_create(PRELOAD_IORPC_XSTREAMS, &io_pool, io_streams_tmp.data()); if (argo_err != ABT_SUCCESS) { ld_logger->error("{}() ABT_snoozer_xstream_create() (client)", __func__); return false; Loading Loading
ifs/include/global/configure.hpp +13 −6 Original line number Diff line number Diff line Loading @@ -45,14 +45,21 @@ // However, when full the application blocks until **all** entries are flushed to disk. //#define KV_WRITE_BUFFER 16384 // Margo configuration // Margo and Argobots configuration // Number of threads used for concurrent I/O in the daemon and preload library per process #define IO_THREADS 8 #define IO_LIBRARY_THREADS 8 /* * Indicates the number of concurrent progress to drive I/O operations of chunk files to and from local file systems * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler */ #define DAEMON_IO_XSTREAMS 8 /* * Sets the number of concurrent progress for sending I/O related RPCs to daemons * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler */ #define PRELOAD_IORPC_XSTREAMS 8 // Number of threads used for RPC and IPC handlers at the daemon #define RPC_HANDLER_THREADS 8 #define IPC_HANDLER_THREADS 8 #define DAEMON_RPC_HANDLER_XSTREAMS 8 #define DAEMON_IPC_HANDLER_XSTREAMS 8 #define RPC_PORT 4433 #define RPC_TRIES 3 // rpc timeout to try again in milliseconds Loading
ifs/src/daemon/adafs_daemon.cpp +4 −4 Original line number Diff line number Diff line Loading @@ -92,9 +92,9 @@ void destroy_enviroment() { } bool init_io_tasklet_pool() { vector<ABT_xstream> io_streams_tmp(IO_THREADS); vector<ABT_xstream> io_streams_tmp(DAEMON_IO_XSTREAMS); ABT_pool io_pools_tmp; auto ret = ABT_snoozer_xstream_create(IO_THREADS, &io_pools_tmp, io_streams_tmp.data()); auto ret = ABT_snoozer_xstream_create(DAEMON_IO_XSTREAMS, &io_pools_tmp, io_streams_tmp.data()); if (ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error( "{}() ABT_snoozer_xstream_create() failed to initialize ABT_pool for I/O operations", __func__); Loading @@ -114,7 +114,7 @@ bool init_ipc_server() { ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo IPC server...", __func__); // Start Margo (this will also initialize Argobots and Mercury internally) auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, IPC_HANDLER_THREADS); auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, DAEMON_IPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init() failed to initialize the Margo IPC server", __func__); Loading Loading @@ -158,7 +158,7 @@ bool init_rpc_server() { char addr_self_cstring[128]; ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo RPC server...", __func__); // Start Margo (this will also initialize Argobots and Mercury internally) auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, RPC_HANDLER_THREADS); auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, DAEMON_RPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init failed to initialize the Margo RPC server", __func__); return false; Loading
ifs/src/preload/preload.cpp +2 −2 Original line number Diff line number Diff line Loading @@ -78,8 +78,8 @@ bool init_ld_argobots() { */ putenv(const_cast<char*>("ABT_MEM_MAX_NUM_STACKS=8")); // Creating pool for driving IO RPCs vector<ABT_xstream> io_streams_tmp(IO_LIBRARY_THREADS); argo_err = ABT_snoozer_xstream_create(IO_LIBRARY_THREADS, &io_pool, io_streams_tmp.data()); vector<ABT_xstream> io_streams_tmp(PRELOAD_IORPC_XSTREAMS); argo_err = ABT_snoozer_xstream_create(PRELOAD_IORPC_XSTREAMS, &io_pool, io_streams_tmp.data()); if (argo_err != ABT_SUCCESS) { ld_logger->error("{}() ABT_snoozer_xstream_create() (client)", __func__); return false; Loading