Verified Commit 73506a6b authored by Marc Vef's avatar Marc Vef
Browse files

Daemon: Using tasklets for parallel writes

parent a282f2a5
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -53,7 +53,6 @@ find_package(Mercury REQUIRED)
find_package(MercuryUtil REQUIRED)
find_package(Abt REQUIRED)
find_package(Abt-Snoozer REQUIRED)
find_package(Abt-IO REQUIRED)
find_package(Margo REQUIRED)

option(USE_OFI_VERBS "Use libfabric plugin with verbs." OFF)
+1 −1
Original line number Diff line number Diff line
@@ -33,7 +33,6 @@
extern "C" {
#include <abt.h>
#include <abt-snoozer.h>
#include <abt-io.h>
#include <mercury.h>
#include <margo.h>
}
@@ -50,6 +49,7 @@ namespace bfs = boost::filesystem;
bool init_environment();
void destroy_enviroment();

bool init_io_tasklet_pool();
bool init_ipc_server();
bool init_rpc_server();

+0 −1
Original line number Diff line number Diff line
@@ -10,7 +10,6 @@ struct write_chunk_args {
    const rpc_chnk_id_t* chnk_id;
    size_t size;
    off64_t off;
    abt_io_instance_id aid;
    ABT_eventual* eventual;
};

+8 −42
Original line number Diff line number Diff line
@@ -14,38 +14,21 @@ private:
    margo_instance_id server_rpc_mid_;
    margo_instance_id server_ipc_mid_;

    lru11::Cache<uint64_t, hg_addr_t> address_cache_{32768, 4096}; // XXX Set values are not based on anything...

    // TODO RPC client IDs
    // RPC client IDs
    hg_id_t rpc_minimal_id_;
    hg_id_t rpc_srv_create_node_id_;
    hg_id_t rpc_srv_attr_id_;
    hg_id_t rpc_srv_remove_node_id_;
    hg_id_t rpc_srv_read_data_id_;
    hg_id_t rpc_srv_write_data_id_;

    // Argobots I/O pools and execution streams
    ABT_pool io_pool_;
    std::vector<ABT_xstream> io_streams_;

public:

    static RPCData* getInstance() {
        static RPCData instance;
        return &instance;
    }

    hg_addr_t svr_addr_ = HG_ADDR_NULL; // XXX TEMPORARY! server addresses will be put into a LRU map

    RPCData(RPCData const&) = delete;

    void operator=(RPCData const&) = delete;

    // Utility functions

    bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr);

    size_t get_rpc_node(const std::string& to_hash);

//    std::string get_dentry_hashable(const fuse_ino_t parent, const char* name);

    // Getter/Setter

    margo_instance* server_rpc_mid();
@@ -56,31 +39,14 @@ public:

    void server_ipc_mid(margo_instance* server_ipc_mid);

    hg_id_t rpc_minimal_id() const;

    void rpc_minimal_id(hg_id_t rpc_minimal_id);

    lru11::Cache<uint64_t, hg_addr_t>& address_cache();

    hg_id_t rpc_srv_create_node_id() const;

    void rpc_srv_create_node_id(hg_id_t rpc_srv_create_node_id);

    hg_id_t rpc_srv_attr_id() const;

    void rpc_srv_attr_id(hg_id_t rpc_srv_attr_id);

    hg_id_t rpc_srv_read_data_id() const;

    hg_id_t rpc_srv_remove_node_id() const;
    ABT_pool io_pool() const;

    void rpc_srv_remove_node_id(hg_id_t rpc_srv_remove_node_id);
    void io_pool(ABT_pool io_pool);

    void rpc_srv_read_data_id(hg_id_t rpc_srv_read_data_id);
    std::vector<ABT_xstream>& io_streams();

    hg_id_t rpc_srv_write_data_id() const;
    void io_streams(const std::vector<ABT_xstream>& io_streams);

    void rpc_srv_write_data_id(hg_id_t rpc_srv_write_data_id);

};

+4 −2
Original line number Diff line number Diff line
@@ -47,9 +47,11 @@

// Margo configuration

// Number of threads used for concurrent I/O
#define IO_THREADS 8
// Number of threads used for RPC and IPC handlers at the daemon
#define RPC_HANDLER_THREADS 16
#define IPC_HANDLER_THREADS 16
#define RPC_HANDLER_THREADS 8
#define IPC_HANDLER_THREADS 8
#define RPC_PORT 4433
#define RPC_TRIES 3
// rpc timeout to try again in milliseconds
Loading