Commit 17ddecef authored by Marc Vef's avatar Marc Vef
Browse files

Working RPC read operation

parent 1052111c
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -43,3 +43,12 @@ int destroy_chunk_space(const fuse_ino_t inode) {
//    return static_cast<int>(!bfs::exists(chnk_path));
    return 0;
}

int read_file(char* buf, size_t& read_size, const char* path, const size_t size, const off_t off) {
    int fd = open(path, R_OK);
    if (fd < 0)
        return EIO;
    read_size = static_cast<size_t>(pread(fd, buf, size, off));
    close(fd);
    return 0;
}
 No newline at end of file
+2 −0
Original line number Diff line number Diff line
@@ -11,4 +11,6 @@ int init_chunk_space(const fuse_ino_t inode);

int destroy_chunk_space(const fuse_ino_t inode);

int read_file(char* buf, size_t& read_size, const char* path, const size_t size, const off_t off);

#endif //FS_IO_H
+1 −1
Original line number Diff line number Diff line
@@ -194,7 +194,7 @@ int get_attr(struct stat& attr, const fuse_ino_t inode) {
            err = get_metadata(md, inode);
            if (err == 0)
                metadata_to_stat(md, attr);
        } else {
        } else { // remote
            // attr is filled in here for rpcs
            err = rpc_send_get_attr(recipient, inode, attr);
        }
+24 −22
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@
#include "../db/db_ops.hpp"
#include "../adafs_ops/mdata_ops.hpp"
#include "../rpc/client/c_data.hpp"
#include "../adafs_ops/io.hpp"

using namespace std;

@@ -36,33 +37,34 @@ using namespace std;
 * @param fi file information
 */
void adafs_ll_read(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) {
    \
    ADAFS_DATA->spdlogger()->debug("adafs_ll_read() enter: inode {} size {} offset {}", ino, size, off);
#ifdef RPC_TEST
    rpc_send_read(0, ino, size, off, nullptr);
#endif

    // TODO Check out how splicing works. This uses fuse_reply_data
    ADAFS_DATA->spdlogger()->debug("adafs_ll_read() enter: inode {} size {} offset {}", ino, size, off);
    size_t read_size;
    auto buf = make_unique<char[]>(size);
    int err;
    if (ADAFS_DATA->host_size() > 1) { // multiple node operation
        auto recipient = RPC_DATA->get_rpc_node(fmt::FormatInt(ino).str());
        if (ADAFS_DATA->is_local_op(recipient)) { // local read operation
            auto chnk_path = bfs::path(ADAFS_DATA->chunk_path());
            chnk_path /= fmt::FormatInt(ino).c_str();
            chnk_path /= "data"s;

    int fd = open(chnk_path.c_str(), R_OK);

    if (fd < 0) {
            err = read_file(buf.get(), read_size, chnk_path.c_str(), size, off);
        } else { // remote read operation
            err = rpc_send_read(recipient, ino, size, off, buf.get(), read_size);
        }
    } else { //single node operation
        auto chnk_path = bfs::path(ADAFS_DATA->chunk_path());
        chnk_path /= fmt::FormatInt(ino).c_str();
        chnk_path /= "data"s;
        err = read_file(buf.get(), read_size, chnk_path.c_str(), size, off);
    }
    if (err != 0) {
        fuse_reply_err(req, EIO);
        return;
    }
    char* buf = new char[size]();

    auto read_size = static_cast<size_t>(pread(fd, buf, size, off));
    ADAFS_DATA->spdlogger()->trace("Read the following buf: {}", buf);

    fuse_reply_buf(req, buf, read_size);


    close(fd);
    free(buf);
    ADAFS_DATA->spdlogger()->trace("Sending buf to Fuse driver: {}", buf.get());
    fuse_reply_buf(req, buf.get(), read_size);
}

/**
+0 −61
Original line number Diff line number Diff line
@@ -3,72 +3,11 @@
//

#include "c_data.hpp"
#include "../rpc_types.hpp"

using namespace std;

static int max_retries = 3;

int rpc_send_read(const size_t recipient, const fuse_ino_t inode, const size_t in_size, const off_t in_offset,
                  char* tar_buf) {
    hg_handle_t handle;
    hg_addr_t svr_addr = HG_ADDR_NULL;
    rpc_data_in_t in;
    rpc_res_out_t out;
    hg_size_t b_size;
    void* b_buf;
    const struct hg_info* hgi;
    // fill in
    in.inode = inode;
    in.size = in_size;
    in.offset = in_offset;
    // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
    if (!RPC_DATA->get_addr_by_hostid(recipient, svr_addr)) {
        ADAFS_DATA->spdlogger()->error("server address not resolvable for host id {}", recipient);
        return 1;
    }
    auto ret = HG_Create(RPC_DATA->client_hg_context(), svr_addr, RPC_DATA->rpc_srv_read_data_id(), &handle);
    if (ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("creating handle FAILED");
        return 1;
    }

    b_size = 512;
    b_buf = calloc(1, 512);
    /* register local target buffer for bulk access */
    hgi = HG_Get_info(handle);
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &b_size, HG_BULK_WRITE_ONLY, &in.bulk_handle);
    if (ret != 0)
        ADAFS_DATA->spdlogger()->error("failed to create bulkd on client");

    int send_ret = HG_FALSE;
    for (int i = 0; i < max_retries; ++i) {
        send_ret = margo_forward_timed(RPC_DATA->client_mid(), handle, &in, 15000);
        if (send_ret == HG_SUCCESS) {
            break;
        }
    }

    if (send_ret == HG_SUCCESS) {

        /* decode response */
        ret = HG_Get_output(handle, &out);

        ADAFS_DATA->spdlogger()->debug("Got response mode {}", out.res);
        ADAFS_DATA->spdlogger()->debug("Filled buffer looks like this: {}", (char*) b_buf);
        /* clean up resources consumed by this rpc */
        HG_Free_output(handle, &out);
    } else {
        ADAFS_DATA->spdlogger()->error("RPC send_get_attr (timed out)");
    }

    HG_Free_input(handle, &in);
    HG_Destroy(handle);

    return 0;

    return 0;
}

int rpc_send_write() {
    // TODO
Loading