Commit e345f219 authored by Marc Vef's avatar Marc Vef
Browse files

First cut, read/write rpc infrastructure

parent 1aae0185
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -78,4 +78,3 @@ Thumbs.db
cmake-build-debug
playground
.fuse_hidden*
configure.hpp
+5 −3
Original line number Diff line number Diff line
@@ -60,7 +60,7 @@ set(SOURCE_FILES src/main.cpp src/main.hpp src/fuse_ops.hpp src/configure.hpp
        src/rpc/rpc_util.hpp src/rpc/rpc_types.hpp src/rpc/rpc_defs.hpp

        # rpcs header
        src/rpc/client/c_metadata.hpp src/rpc/client/c_dentry.hpp
        src/rpc/client/c_metadata.hpp src/rpc/client/c_dentry.hpp src/rpc/client/c_data.hpp

        # util
        src/util.cpp
@@ -75,8 +75,10 @@ set(SOURCE_FILES src/main.cpp src/main.hpp src/fuse_ops.hpp src/configure.hpp
        # rpc src
        src/rpc/rpc_util.cpp

        # rpcs src
        src/rpc/server/s_metadata.cpp src/rpc/server/s_dentry.cpp src/rpc/client/c_metadata.cpp src/rpc/client/c_dentry.cpp
        # rpcs src server
        src/rpc/server/s_metadata.cpp src/rpc/server/s_dentry.cpp src/rpc/server/s_data.cpp
        # rpcs src client
        src/rpc/client/c_metadata.cpp src/rpc/client/c_dentry.cpp src/rpc/client/c_data.cpp
        )
add_executable(adafs ${SOURCE_FILES} src/main.cpp)
target_link_libraries(adafs ${FUSE3_LIBRARIES} ${ROCKSDB_LIBRARIES}
+16 −0
Original line number Diff line number Diff line
@@ -145,6 +145,22 @@ void RPCData::rpc_srv_remove_mdata_id(hg_id_t rpc_srv_remove_mdata_id) {
    RPCData::rpc_srv_remove_mdata_id_ = rpc_srv_remove_mdata_id;
}

hg_id_t RPCData::rpc_srv_read_data_id() const {
    return rpc_srv_read_data_id_;
}

void RPCData::rpc_srv_read_data_id(hg_id_t rpc_srv_read_data_id) {
    RPCData::rpc_srv_read_data_id_ = rpc_srv_read_data_id;
}

hg_id_t RPCData::rpc_srv_write_data_id() const {
    return rpc_srv_write_data_id_;
}

void RPCData::rpc_srv_write_data_id(hg_id_t rpc_srv_write_data_id) {
    RPCData::rpc_srv_write_data_id_ = rpc_srv_write_data_id;
}




+10 −0
Original line number Diff line number Diff line
@@ -38,6 +38,8 @@ private:
    hg_id_t rpc_srv_lookup_id_;
    hg_id_t rpc_srv_remove_dentry_id_;
    hg_id_t rpc_srv_remove_mdata_id_;
    hg_id_t rpc_srv_read_data_id_;
    hg_id_t rpc_srv_write_data_id_;


public:
@@ -115,6 +117,14 @@ public:
    hg_id_t rpc_srv_remove_mdata_id() const;

    void rpc_srv_remove_mdata_id(hg_id_t rpc_srv_remove_mdata_id);

    hg_id_t rpc_srv_read_data_id() const;

    void rpc_srv_read_data_id(hg_id_t rpc_srv_read_data_id);

    hg_id_t rpc_srv_write_data_id() const;

    void rpc_srv_write_data_id(hg_id_t rpc_srv_write_data_id);
};


+77 −0
Original line number Diff line number Diff line
//
// Created by evie on 7/13/17.
//

#include "c_data.hpp"
#include "../rpc_types.hpp"

using namespace std;

static int max_retries = 3;

int rpc_send_read(const size_t recipient, const fuse_ino_t inode, const size_t in_size, const off_t in_offset,
                  char* tar_buf) {
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_data_in_t in;
    rpc_res_out_t out;
    hg_size_t b_size;
    void* b_buf;
    const struct hg_info* hgi;
    // fill in
    in.inode = inode;
    in.size = in_size;
    in.offset = in_offset;
    // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
    if (!RPC_DATA->get_addr_by_hostid(recipient, svr_addr)) {
        ADAFS_DATA->spdlogger()->error("server address not resolvable for host id {}", recipient);
        return 1;
    }
    auto ret = HG_Create(RPC_DATA->client_hg_context(), svr_addr, RPC_DATA->rpc_srv_read_data_id(), &handle);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("creating handle FAILED");
        return 1;
    }

    b_size = 512;
    b_buf = calloc(1, 512);
    /* register local target buffer for bulk access */
    hgi = HG_Get_info(handle);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &b_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if (ret != 0)
        ADAFS_DATA->spdlogger()->error("failed to create bulkd on client");

    int send_ret = HG_FALSE;
    for (int i = 0; i < max_retries; ++i) {
        send_ret = margo_forward_timed(RPC_DATA->client_mid(), handle, &in, 15000);
        if (send_ret == HG_SUCCESS) {
            break;
        }
    }

    if (send_ret == HG_SUCCESS) {

        /* decode response */
        ret = HG_Get_output(handle, &out);

        ADAFS_DATA->spdlogger()->debug("Got response mode {}", out.res);
        ADAFS_DATA->spdlogger()->debug("Filled buffer looks like this: {}", b_buf);
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
    } else {
        ADAFS_DATA->spdlogger()->error("RPC send_get_attr (timed out)");
    }

    HG_Free_input(handle, &in);
    HG_Destroy(handle);

    return 0;

    return 0;
}

int rpc_send_write() {
    // TODO

    return 0;
}
 No newline at end of file
Loading