Commit 3dadee39 authored by Marc Vef's avatar Marc Vef
Browse files

parallel write with argobots over rpc done (TODO ipc, offset, append)

parent 14a95a6b
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
// If ACM time should be considered
#define ACMtime //unused
#define BLOCKSIZE 4 // in kilobytes
#define CHUNKSIZE 400 // in bytes

// What metadata is used TODO this has to be parametrized or put into a configuration file
#define MDATA_USE_ATIME false
+7 −3
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
#define IFS_DATA_HPP

#include "../../main.hpp"
#include <preload/preload_util.hpp>

std::string path_to_fspath(const std::string& path);

@@ -10,9 +11,12 @@ int init_chunk_space(const std::string& path);

int destroy_chunk_space(const std::string& path);

int read_file(char* buf, size_t& read_size, const std::string& path, const size_t size, const off_t off);
int read_file(char* buf, size_t& read_size, const std::string& path, size_t size, off_t off);

int write_file(const std::string& path, const char* buf, size_t& write_size, const size_t size, const off_t off,
               const bool append, const off_t updated_size);
int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id, size_t size, off_t off,
               bool append, off_t updated_size, size_t& write_size);

int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                 size_t& write_size);

#endif //IFS_DATA_HPP
+5 −4
Original line number Diff line number Diff line
@@ -8,8 +8,6 @@
#include <extern/lrucache/LRUCache11.hpp>
#include <string>

using namespace std;


struct FsConfig {
    // configurable metadata
@@ -95,6 +93,9 @@ extern std::shared_ptr<spdlog::logger> ld_logger;
typedef lru11::Cache<uint64_t, hg_addr_t> KVCache;
extern KVCache rpc_address_cache;

// typedefs
typedef unsigned long rpc_chnk_id_t;

bool is_fs_path(const char* path);

// TODO template these two suckers
@@ -110,7 +111,7 @@ size_t get_rpc_node(const std::string& to_hash);

bool is_local_op(size_t recipient);

hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
                            hg_addr_t& svr_addr);
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
                            hg_addr_t& svr_addr, bool force_rpc);

#endif //IFS_PRELOAD_UTIL_HPP
+15 −2
Original line number Diff line number Diff line
@@ -15,6 +15,19 @@ extern "C" {

#include <iostream>

struct write_args {
    std::string& path;
    size_t in_size;
    off_t in_offset;
    const void* buf;
    bool append;
    off_t updated_size;
    std::vector<unsigned long>& chnk_ids;
    size_t write_size;
};

void rpc_send_write_abt(void* _arg);

template<typename T>
int rpc_send_read(const std::string& path, const size_t in_size, const off_t in_offset, T* tar_buf, size_t& read_size) {
    hg_handle_t handle;
@@ -28,7 +41,7 @@ int rpc_send_read(const std::string& path, const size_t in_size, const off_t in_
    in.size = in_size;
    in.offset = in_offset;

    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, path, handle, svr_addr);
    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, path, handle, svr_addr, false);

    auto used_mid = margo_hg_handle_get_instance(handle);
    /* register local target buffer for bulk access */
@@ -65,7 +78,7 @@ int rpc_send_read(const std::string& path, const size_t in_size, const off_t in_
    return err;
}

int rpc_send_write(const string& path, size_t in_size, off_t in_offset, const void* buf, size_t& write_size,
int rpc_send_write(const std::string& path, size_t in_size, off_t in_offset, void* buf, size_t& write_size,
                   bool append, off_t updated_size);

#endif //IFS_PRELOAD_C_DATA_HPP
+3 −3
Original line number Diff line number Diff line
@@ -10,12 +10,12 @@ void send_minimal_rpc(hg_id_t minimal_id);

int rpc_send_open(const std::string& path, mode_t mode, int flags);

int rpc_send_stat(const std::string& path, string& attr);
int rpc_send_stat(const std::string& path, std::string& attr);

int rpc_send_unlink(const std::string& path);

int rpc_send_update_metadentry(const string& path, const Metadentry& md, const MetadentryUpdateFlags& md_flags);
int rpc_send_update_metadentry(const std::string& path, const Metadentry& md, const MetadentryUpdateFlags& md_flags);

int rpc_send_update_metadentry_size(const string& path, off_t size, bool append_flag, off_t& ret_size);
int rpc_send_update_metadentry_size(const std::string& path, off_t size, bool append_flag, off_t& ret_size);

#endif //IFS_PRELOAD_C_METADENTRY_HPP
Loading