Commit e4163dae authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch 'ior_support' into 'master'

IOR support and I/O threading improvements

See merge request zdvresearch_bsc/adafs!12
parents 9ffcaaa0 0112dcc7
Loading
Loading
Loading
Loading
+44 −0
Original line number Diff line number Diff line
@@ -10,7 +10,6 @@ find_path(ABT_IO_INCLUDE_DIR abt-io.h
    HINTS
    ${ABT_IO_DIR}
    ${ADAFS_DEPS_INSTALL}
        $ENV{HOME}/opt
    /usr
    /usr/local
    /usr/local/adafs
@@ -23,7 +22,6 @@ find_library(ABT_IO_LIBRARY abt-io
    HINTS
    ${ABT_IO_DIR}
    ${ADAFS_DEPS_INSTALL}
        $ENV{HOME}/opt
    /usr
    /usr/local
    /usr/local/adafs
+2 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
// margo
extern "C" {
#include <abt.h>
#include <abt-snoozer.h>
#include <mercury.h>
#include <margo.h>
}
@@ -48,6 +49,7 @@ namespace bfs = boost::filesystem;
bool init_environment();
void destroy_enviroment();

bool init_io_tasklet_pool();
bool init_ipc_server();
bool init_rpc_server();

+19 −3
Original line number Diff line number Diff line
@@ -4,16 +4,32 @@

#include <daemon/adafs_daemon.hpp>

struct write_chunk_args {
    const std::string* path;
    const char* buf;
    rpc_chnk_id_t chnk_id;
    size_t size;
    off64_t off;
    ABT_eventual eventual;
};
struct read_chunk_args {
    const std::string* path;
    char* buf;
    rpc_chnk_id_t chnk_id;
    size_t size;
    off64_t off;
    ABT_eventual eventual;
};

std::string path_to_fspath(const std::string& path);

int init_chunk_space(const std::string& path);

int destroy_chunk_space(const std::string& path);

int read_file(const std::string& path, rpc_chnk_id_t chnk_id, size_t size, off_t off, char* buf, size_t& read_size);
void read_file_abt(void* _arg);

int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id, size_t size, off_t off,
               size_t& write_size);
void write_file_abt(void* _arg);

int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                 off_t offset, size_t& write_size);
+8 −42
Original line number Diff line number Diff line
@@ -14,38 +14,21 @@ private:
    margo_instance_id server_rpc_mid_;
    margo_instance_id server_ipc_mid_;

    lru11::Cache<uint64_t, hg_addr_t> address_cache_{32768, 4096}; // XXX Set values are not based on anything...

    // TODO RPC client IDs
    // RPC client IDs
    hg_id_t rpc_minimal_id_;
    hg_id_t rpc_srv_create_node_id_;
    hg_id_t rpc_srv_attr_id_;
    hg_id_t rpc_srv_remove_node_id_;
    hg_id_t rpc_srv_read_data_id_;
    hg_id_t rpc_srv_write_data_id_;

    // Argobots I/O pools and execution streams
    ABT_pool io_pool_;
    std::vector<ABT_xstream> io_streams_;

public:

    static RPCData* getInstance() {
        static RPCData instance;
        return &instance;
    }

    hg_addr_t svr_addr_ = HG_ADDR_NULL; // XXX TEMPORARY! server addresses will be put into a LRU map

    RPCData(RPCData const&) = delete;

    void operator=(RPCData const&) = delete;

    // Utility functions

    bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr);

    size_t get_rpc_node(const std::string& to_hash);

//    std::string get_dentry_hashable(const fuse_ino_t parent, const char* name);

    // Getter/Setter

    margo_instance* server_rpc_mid();
@@ -56,31 +39,14 @@ public:

    void server_ipc_mid(margo_instance* server_ipc_mid);

    hg_id_t rpc_minimal_id() const;

    void rpc_minimal_id(hg_id_t rpc_minimal_id);

    lru11::Cache<uint64_t, hg_addr_t>& address_cache();

    hg_id_t rpc_srv_create_node_id() const;

    void rpc_srv_create_node_id(hg_id_t rpc_srv_create_node_id);

    hg_id_t rpc_srv_attr_id() const;

    void rpc_srv_attr_id(hg_id_t rpc_srv_attr_id);

    hg_id_t rpc_srv_read_data_id() const;

    hg_id_t rpc_srv_remove_node_id() const;
    ABT_pool io_pool() const;

    void rpc_srv_remove_node_id(hg_id_t rpc_srv_remove_node_id);
    void io_pool(ABT_pool io_pool);

    void rpc_srv_read_data_id(hg_id_t rpc_srv_read_data_id);
    std::vector<ABT_xstream>& io_streams();

    hg_id_t rpc_srv_write_data_id() const;
    void io_streams(const std::vector<ABT_xstream>& io_streams);

    void rpc_srv_write_data_id(hg_id_t rpc_srv_write_data_id);

};

+16 −5
Original line number Diff line number Diff line
@@ -12,8 +12,9 @@

// If ACM time should be considered
#define ACMtime //unused
#define BLOCKSIZE 4 // in kilobytes
#define CHUNKSIZE 400 // in bytes
// XXX Should blocksize and chunksize be merged?
#define BLOCKSIZE 524288 // in bytes 512KB
#define CHUNKSIZE 524288 // in bytes 512KB

// What metadata is used TODO this has to be parametrized or put into a configuration file
#define MDATA_USE_ATIME false
@@ -44,11 +45,21 @@
// However, when full the application blocks until **all** entries are flushed to disk.
//#define KV_WRITE_BUFFER 16384

// Margo configuration
// Margo and Argobots configuration

/*
 * Indicates the number of concurrent progress to drive I/O operations of chunk files to and from local file systems
 * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler
 */
#define DAEMON_IO_XSTREAMS 8
/*
 * Sets the number of concurrent progress for sending I/O related RPCs to daemons
 * The value is directly mapped to created Argobots xstreams, controlled in a single pool with ABT_snoozer scheduler
 */
#define PRELOAD_IORPC_XSTREAMS 8
// Number of threads used for RPC and IPC handlers at the daemon
#define RPC_HANDLER_THREADS 16
#define IPC_HANDLER_THREADS 16
#define DAEMON_RPC_HANDLER_XSTREAMS 8
#define DAEMON_IPC_HANDLER_XSTREAMS 8
#define RPC_PORT 4433
#define RPC_TRIES 3
// rpc timeout to try again in milliseconds
Loading