Commit 100764f5 authored by Marc Vef's avatar Marc Vef
Browse files

ifs: purging read,write,update ipcs and rpcs, merging them in one

parent f8c664e5
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -57,7 +57,7 @@ set(SOURCE_FILES main.cpp main.hpp configure.hpp util.cpp
        src/rpc/handler/h_metadentry.cpp
        src/adafs_ops/data.cpp include/adafs_ops/data.hpp src/rpc/handler/h_data.cpp
        src/rpc/handler/h_preload.cpp
        include/rpc/rpc_utils.hpp)
        include/rpc/rpc_utils.hpp src/rpc/rpc_utils.cpp)
add_executable(adafs_daemon ${SOURCE_FILES})
target_link_libraries(adafs_daemon ${ROCKSDB_LIBRARIES}
        # rocksdb libs
+0 −44
Original line number Diff line number Diff line
@@ -39,48 +39,4 @@ MERCURY_GEN_PROC(ipc_stat_out_t, ((hg_int32_t) (err))

MERCURY_GEN_PROC(ipc_unlink_in_t, ((hg_const_string_t) (path)))

MERCURY_GEN_PROC(update_metadentry_in_t,
                 ((hg_const_string_t) (path))\
((uint64_t) (nlink))\
((hg_uint32_t) (mode))\
((hg_uint32_t) (uid))\
((hg_uint32_t) (gid))\
((hg_int64_t) (size))\
((hg_uint64_t) (inode_no))\
((hg_int64_t) (blocks))\
((hg_int64_t) (atime))\
((hg_int64_t) (mtime))\
((hg_int64_t) (ctime))\
((hg_bool_t) (nlink_flag))\
((hg_bool_t) (mode_flag))\
((hg_bool_t) (uid_flag))\
((hg_bool_t) (gid_flag))\
((hg_bool_t) (size_flag))\
((hg_bool_t) (inode_no_flag))\
((hg_bool_t) (block_flag))\
((hg_bool_t) (atime_flag))\
((hg_bool_t) (mtime_flag))\
((hg_bool_t) (ctime_flag)))

MERCURY_GEN_PROC(update_metadentry_size_in_t, ((hg_const_string_t) (path))
        ((hg_int64_t) (size)))

// data
MERCURY_GEN_PROC(ipc_read_data_in_t,
                 ((hg_const_string_t) (path))\
((hg_size_t) (size))\
((hg_int64_t) (offset))\
((hg_bulk_t) (bulk_handle)))

MERCURY_GEN_PROC(ipc_data_out_t,
                 ((int32_t) (res))\
((hg_size_t) (io_size)))

MERCURY_GEN_PROC(ipc_write_data_in_t,
                 ((hg_const_string_t) (path))\
((hg_size_t) (size))\
((hg_int64_t) (offset))\
((hg_bool_t) (append))\
((hg_bulk_t) (bulk_handle)))

#endif //IFS_IPC_TYPES_HPP
+0 −68
Original line number Diff line number Diff line
@@ -22,72 +22,4 @@ int ipc_send_stat(const string& path, std::string& attr, const hg_id_t ipc_stat_

int ipc_send_unlink(const string& path, const hg_id_t ipc_unlink_id);

int ipc_send_update_metadentry(const string& path, const hg_id_t ipc_update_metadentry_id, const Metadentry& md,
                               const MetadentryUpdateFlags& md_flags);

int ipc_send_update_metadentry_size(const string& path, const hg_id_t ipc_update_metadentry_size_id, const off_t size);

int ipc_send_write(const string& path, const size_t in_size, const off_t in_offset,
                   const void* buf, size_t& write_size, const bool append, const hg_id_t ipc_write_id);

template<typename T>
int ipc_send_read(const string& path, const size_t in_size, const off_t in_offset,
                  T* tar_buf, size_t& read_size, const hg_id_t ipc_read_id) {

    hg_handle_t handle;
    ipc_read_data_in_t in;
    ipc_data_out_t out;
    int err;
    // fill in
    in.path = path.c_str();
    in.size = in_size;
    in.offset = in_offset;

    auto ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_read_id, &handle);
    if (ret != HG_SUCCESS) {
        LD_LOG_ERROR0(debug_fd, "creating handle FAILED\n");
        return 1;
    }

    auto hgi = HG_Get_info(handle);
    /* register local target buffer for bulk access */
    auto b_buf = static_cast<void*>(tar_buf);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if (ret != 0) {
        LD_LOG_ERROR0(debug_fd, "failed to create bulkd on client when reading\n");
        return 1;
    }

    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(ld_margo_ipc_id(), handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
    }
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        ret = HG_Get_output(handle,
                            &out); // XXX handle ret out.res can inidicate a failure with reading on the other side.
        tar_buf = static_cast<T*>(b_buf); // XXX wtf am I doing here?
        read_size = static_cast<size_t>(out.io_size);
        err = out.res;
        LD_LOG_DEBUG(debug_fd, "Got response %d\n", out.res);

        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "ipc_send_read (timed out)\n");
        err = EAGAIN;
    }

    in.path = nullptr;

    HG_Bulk_free(in.bulk_handle);
    HG_Free_input(handle, &in);
    HG_Destroy(handle);

    return err;
}

#endif //IFS_MARGO_IPC_HPP
+74 −5
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ extern "C" {
#include <iostream>

template<typename T>
int rpc_send_read(const size_t recipient, const std::string& path, const size_t in_size, const off_t in_offset,
int pc_send_read(const size_t recipient, const std::string& path, const size_t in_size, const off_t in_offset,
                 T* tar_buf, size_t& read_size, const hg_id_t rpc_read_data_id) {
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
@@ -77,11 +77,80 @@ int rpc_send_read(const size_t recipient, const std::string& path, const size_t
    HG_Destroy(handle);

    return err;
}

template<typename T>
int rpc_send_read(const hg_id_t ipc_read_data_id, const hg_id_t rpc_read_data_id, const std::string& path,
                  const size_t in_size, const off_t in_offset, T* tar_buf, size_t& read_size) {
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    bool local_op = true;
    rpc_read_data_in_t in;
    rpc_data_out_t out;
    int err;
    hg_return_t ret;
    // fill in
    in.path = path.c_str();
    in.size = in_size;
    in.offset = in_offset;

    auto recipient = get_rpc_node(path);
    if (is_local_op(recipient)) { // local
        ret = HG_Create(margo_get_context(ld_margo_ipc_id()), daemon_addr(), ipc_read_data_id, &handle);
        LD_LOG_TRACE0(debug_fd, "rpc_send_read to local daemon (IPC)\n");
    } else { // remote
        local_op = false;
        // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
        if (!get_addr_by_hostid(recipient, svr_addr)) {
            LD_LOG_ERROR(debug_fd, "server address not resolvable for host id %lu\n", recipient);
            return 1;
        }
        ret = HG_Create(margo_get_context(ld_margo_rpc_id()), svr_addr, rpc_read_data_id, &handle);
        LD_LOG_TRACE0(debug_fd, "rpc_send_read to remote daemon (RPC)\n");
    }
    if (ret != HG_SUCCESS) {
        LD_LOG_ERROR0(debug_fd, "creating handle FAILED\n");
        return 1;
    }
    auto hgi = HG_Get_info(handle);
    /* register local target buffer for bulk access */
    auto b_buf = static_cast<void*>(tar_buf);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if (ret != 0)
        LD_LOG_ERROR0(debug_fd, "failed to create bulk on client\n");

    return 0;
    int send_ret = HG_FALSE;
    for (int i = 0; i < RPC_TRIES; ++i) {
        send_ret = margo_forward_timed(local_op ? ld_margo_ipc_id() : ld_margo_rpc_id(), handle, &in, RPC_TIMEOUT);
        if (send_ret == HG_SUCCESS) {
            break;
        }
    }
    if (send_ret == HG_SUCCESS) {
        /* decode response */
        ret = HG_Get_output(handle,
                            &out); // XXX handle ret out.res can inidicate a failure with reading on the other side.
        tar_buf = static_cast<T*>(b_buf);
        read_size = static_cast<size_t>(out.io_size);
        err = out.res;
        LD_LOG_TRACE(debug_fd, "Got response %d\n", out.res);
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
    } else {
        LD_LOG_ERROR0(debug_fd, "RPC rpc_send_read (timed out)");
        err = EAGAIN;
    }

    in.path = nullptr;

    HG_Bulk_free(in.bulk_handle);
    HG_Free_input(handle, &in);
    HG_Destroy(handle);

    return err;
}

int rpc_send_write(const size_t recipient, const std::string& path, const size_t in_size, const off_t in_offset,
                   const void* buf, size_t& write_size, const bool append, const hg_id_t rpc_write_data_id);
int rpc_send_write(const hg_id_t ipc_write_data_id, const hg_id_t rpc_write_data_id, const string& path,
                   const size_t in_size, const off_t in_offset, const void* buf, size_t& write_size, const bool append);

#endif //IFS_PRELOAD_C_DATA_HPP
+6 −0
Original line number Diff line number Diff line
@@ -19,4 +19,10 @@ rpc_send_get_attr(const hg_id_t rpc_get_attr_id, const size_t recipient, const s

int rpc_send_remove_node(const hg_id_t rpc_remove_node_id, const size_t recipient, const std::string& path);

int rpc_send_update_metadentry(const hg_id_t ipc_update_metadentry_id, const hg_id_t rpc_update_metadentry_id,
                               const string& path, const Metadentry& md, const MetadentryUpdateFlags& md_flags);

int rpc_send_update_metadentry_size(const hg_id_t ipc_update_metadentry_size_id,
                                    const hg_id_t rpc_update_metadentry_size_id, const string& path, const off_t size);

#endif //IFS_PRELOAD_C_METADENTRY_HPP
Loading