Commit e6b1e9ac authored by Marc Vef's avatar Marc Vef
Browse files

ifs: preload write ipc/rpc (without chunking and size update)

parent b9e16540
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -12,11 +12,11 @@ void send_minimal_ipc(const hg_id_t minimal_id);

bool ipc_send_get_fs_config(const hg_id_t ipc_get_config_id);

int ipc_send_open(const char* path, int flags, const mode_t mode, const hg_id_t ipc_open_id);
int ipc_send_open(const string& path, int flags, const mode_t mode, const hg_id_t ipc_open_id);

int ipc_send_stat(const char* path, std::string& attr, const hg_id_t ipc_stat_id);
int ipc_send_stat(const string& path, std::string& attr, const hg_id_t ipc_stat_id);

int ipc_send_unlink(const char* path, const hg_id_t ipc_unlink_id);
int ipc_send_unlink(const string& path, const hg_id_t ipc_unlink_id);

int ipc_send_write(const string& path, const size_t in_size, const off_t in_offset,
                   const void* buf, size_t& write_size, const bool append, const hg_id_t ipc_write_id);
+7 −6
Original line number Diff line number Diff line
@@ -7,26 +7,27 @@

#include <map>
#include <mutex>
#include <memory>

class OpenFile {
private:
    const char* path_;
    std::string path_;
    bool append_flag_;

    int fd_;
    FILE* tmp_file_;

public:
    OpenFile(const char* path, const bool append_flag);
    OpenFile(const std::string& path, const bool append_flag);

    ~OpenFile();

    void annul_fd();

    // getter/setter
    const char* path() const;
    std::string path() const;

    void path(const char* path_);
    void path(const std::string& path_);

    int fd() const;

@@ -42,7 +43,7 @@ public:
class OpenFileMap {

private:
    std::map<int, OpenFile*> files_;
    std::map<int, std::shared_ptr<OpenFile>> files_;
    std::mutex files_mutex_;


@@ -52,7 +53,7 @@ public:
    OpenFile* get(int fd);
    bool exist(const int fd);

    int add(const char* path, const bool append);
    int add(std::string path, const bool append);
    bool remove(const int fd);

};
+1 −1
Original line number Diff line number Diff line
@@ -82,6 +82,6 @@ int rpc_send_read(const size_t recipient, const std::string& path, const size_t
}

int rpc_send_write(const size_t recipient, const std::string& path, const size_t in_size, const off_t in_offset,
                   const char* buf, size_t& write_size, const bool append);
                   const void* buf, size_t& write_size, const bool append, const hg_id_t rpc_write_data_id);

#endif //IFS_PRELOAD_C_DATA_HPP
+3 −2
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@ std::string path_to_fspath(const std::string& path) {
    string fs_path;
    set_difference(path.begin(), path.end(), ADAFS_DATA->mountdir().begin(), ADAFS_DATA->mountdir().end(),
                   std::back_inserter(fs_path));
    if (fs_path.at(1) == '/') {
    if (fs_path.at(0) == '/') {
        fs_path = fs_path.substr(1, fs_path.size());
    }
    // replace / with : to avoid making a bunch of mkdirs to store the data in the underlying fs. XXX Can this be done with hashing?
@@ -90,9 +90,10 @@ int write_file(const string& path, const char* buf, size_t& write_size, const si
    auto fs_path = path_to_fspath(path);
    auto chnk_path = bfs::path(ADAFS_DATA->chunk_path());
    chnk_path /= fs_path;
    bfs::create_directories(chnk_path);
    chnk_path /= "data"s;
    // write to local file
    int fd = open(chnk_path.c_str(), W_OK);
    int fd = open(chnk_path.c_str(), O_WRONLY | O_CREAT, 0777);
    if (fd < 0)
        return EIO;
    write_size = static_cast<size_t>(pwrite(fd, buf, size, off));
+13 −13
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ void send_minimal_ipc(const hg_id_t minimal_id) {
    printf("minimal RPC is running...\n");

    /* create handle */
    auto ret = HG_Create(ld_mercury_ipc_context(), daemon_addr(), minimal_id, &handle);
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), minimal_id, &handle);
    assert(ret == HG_SUCCESS);

    /* Send rpc. Note that we are also transmitting the bulk handle in the
@@ -49,7 +49,7 @@ bool ipc_send_get_fs_config(const hg_id_t ipc_get_config_id) {
    ipc_config_out_t out;
    // fill in
    in.dummy = 0; // XXX should be removed. havent checked yet how empty input with margo works
    auto ret = HG_Create(ld_mercury_ipc_context(), daemon_addr(), ipc_get_config_id, &handle);
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_get_config_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -111,16 +111,16 @@ bool ipc_send_get_fs_config(const hg_id_t ipc_get_config_id) {
    return ret == HG_SUCCESS;
}

int ipc_send_open(const char* path, int flags, const mode_t mode, const hg_id_t ipc_open_id) {
int ipc_send_open(const string& path, int flags, const mode_t mode, const hg_id_t ipc_open_id) {
    hg_handle_t handle;
    ipc_open_in_t in;
    ipc_err_out_t out;
    // fill in
    in.mode = mode;
    in.flags = flags;
    in.path = path;
    in.path = path.c_str();
    int err = EUNKNOWN;
    auto ret = HG_Create(ld_mercury_ipc_context(), daemon_addr(), ipc_open_id, &handle);
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_open_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -153,14 +153,14 @@ int ipc_send_open(const char* path, int flags, const mode_t mode, const hg_id_t
    return err;
}

int ipc_send_stat(const char* path, string& attr, const hg_id_t ipc_stat_id) {
int ipc_send_stat(const string& path, string& attr, const hg_id_t ipc_stat_id) {
    hg_handle_t handle;
    ipc_stat_in_t in;
    ipc_stat_out_t out;
    // fill in
    in.path = path;
    in.path = path.c_str();
    int err;
    auto ret = HG_Create(ld_mercury_ipc_context(), daemon_addr(), ipc_stat_id, &handle);
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_stat_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -198,14 +198,14 @@ int ipc_send_stat(const char* path, string& attr, const hg_id_t ipc_stat_id) {
    return err;
}

int ipc_send_unlink(const char* path, const hg_id_t ipc_unlink_id) {
int ipc_send_unlink(const string& path, const hg_id_t ipc_unlink_id) {
    hg_handle_t handle;
    ipc_unlink_in_t in;
    ipc_err_out_t out;
    // fill in
    in.path = path;
    in.path = path.c_str();
    int err = EUNKNOWN;
    auto ret = HG_Create(ld_mercury_ipc_context(), daemon_addr(), ipc_unlink_id, &handle);
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_unlink_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_DEBUG0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -253,7 +253,7 @@ int ipc_send_write(const string& path, const size_t in_size, const off_t in_offs
        in.append = HG_TRUE;
    else
        in.append = HG_FALSE;
    auto ret = HG_Create(ld_mercury_ipc_context(), daemon_addr(), ipc_write_id, &handle);
    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_write_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_ERROR0(debug_fd, "creating handle FAILED\n");
        return 1;
@@ -286,7 +286,7 @@ int ipc_send_write(const string& path, const size_t in_size, const off_t in_offs
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "RPC rpc_send_read (timed out)\n");
        LD_LOG_ERROR0(debug_fd, "RPC rpc_send_write (timed out)\n");
        err = EAGAIN;
    }

Loading