Verified Commit 4e7d301c authored by Marc Vef's avatar Marc Vef
Browse files

Adding gkfs read/write wrappers for client IO tracing

parent 3e80d1ad
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -51,9 +51,9 @@ static constexpr auto CWD = ADD_PREFIX("CWD");
static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE");
static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE");
#ifdef GKFS_ENABLE_CLIENT_METRICS
static constexpr auto METRICS_PATH = ADD_PREFIX("METRICS_PATH");
static constexpr auto METRICS_INTERVAL = ADD_PREFIX("METRICS_INTERVAL");
static constexpr auto ENABLE_METRICS = ADD_PREFIX("ENABLE_METRICS");
static constexpr auto METRICS_INTERVAL = ADD_PREFIX("METRICS_INTERVAL");
static constexpr auto METRICS_PATH = ADD_PREFIX("METRICS_PATH");
#endif

static constexpr auto NUM_REPL = ADD_PREFIX("NUM_REPL");
+10 −6
Original line number Diff line number Diff line
@@ -105,11 +105,13 @@ gkfs_readlink(const std::string& path, char* buf, int bufsize);
#endif

ssize_t
gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
            size_t count, off64_t offset, bool update_pos = false);
gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, off64_t offset, bool update_pos);

ssize_t
gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset);
gkfs_write_ws(gkfs::filemap::OpenFile& file, const char* buf, size_t count, off64_t offset, bool update_pos = false);

ssize_t
gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset);

ssize_t
gkfs_write(int fd, const void* buf, size_t count);
@@ -121,11 +123,13 @@ ssize_t
gkfs_writev(int fd, const struct iovec* iov, int iovcnt);

ssize_t
gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf,
           size_t count, off64_t offset);
gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count, off64_t offset);

ssize_t
gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count, off64_t offset);

ssize_t
gkfs_pread_ws(int fd, void* buf, size_t count, off64_t offset);
gkfs_pread(int fd, void* buf, size_t count, off64_t offset);

ssize_t
gkfs_read(int fd, void* buf, size_t count);
+81 −42
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@

#include <common/path_util.hpp>
#include <common/msgpack_util.hpp>
#include <utility>

extern "C" {
#include <dirent.h> // used for file types in the getdents{,64}() functions
@@ -860,7 +861,7 @@ gkfs_dup2(const int oldfd, const int newfd) {
}

/**
 * Wrapper function for all gkfs write operations
 * Actual write function for all gkfs write operations
 * errno may be set
 * @param file
 * @param buf
@@ -871,17 +872,15 @@ gkfs_dup2(const int oldfd, const int newfd) {
 * @return written size or -1 on error
 */
ssize_t
gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
            size_t count, off64_t offset, bool update_pos) {
    auto start_t = std::chrono::high_resolution_clock::now();
    if(file->type() != gkfs::filemap::FileType::regular) {
        assert(file->type() == gkfs::filemap::FileType::directory);
gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, off64_t offset, bool update_pos) {
    if(file.type() != gkfs::filemap::FileType::regular) {
        assert(file.type() == gkfs::filemap::FileType::directory);
        LOG(WARNING, "Cannot write to directory");
        errno = EISDIR;
        return -1;
    }
    auto path = make_unique<string>(file->path());
    auto is_append = file->get_flag(gkfs::filemap::OpenFile_flags::append);
    auto path = make_unique<string>(file.path());
    auto is_append = file.get_flag(gkfs::filemap::OpenFile_flags::append);
    auto write_size = 0;
    auto num_replicas = CTX->get_replicas();

@@ -931,7 +930,7 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
    }
    if(update_pos) {
        // Update offset in file descriptor in the file map
        file->pos(offset + write_size);
        file.pos(offset + write_size);
    }
    if(static_cast<size_t>(write_size) != count) {
        LOG(WARNING,
@@ -942,6 +941,28 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
    return write_size; // return written size
}

/**
 * Wrapper function for all gkfs write operations
 * errno may be set
 * @param file
 * @param buf
 * @param count
 * @param offset
 * @param update_pos pos should only be updated for some write operations (see
 * man 2 pwrite)
 * @return written size or -1 on error
 */
ssize_t
gkfs_write_ws(gkfs::filemap::OpenFile& file, const char* buf, size_t count, off64_t offset, bool update_pos) {
#ifdef GKFS_ENABLE_CLIENT_METRICS
    auto start_t = std::chrono::high_resolution_clock::now();
    auto written = gkfs_do_write(file, buf, count, offset, update_pos);
    CTX->write_metrics().add_event(written, start_t);
    return written;
#endif
    return gkfs_do_write(file, buf, count, offset, update_pos);
}

/**
 * gkfs wrapper for pwrite() system calls
 * errno may be set
@@ -952,9 +973,10 @@ gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
 * @return written size or -1 on error
 */
ssize_t
gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) {
    auto file = CTX->file_map()->get(fd);
    return gkfs_pwrite(file, reinterpret_cast<const char*>(buf), count, offset);
    return gkfs_write_ws(*file, reinterpret_cast<const char*>(buf), count,
                         offset);
}

/**
@@ -967,10 +989,10 @@ gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
 */
ssize_t
gkfs_write(int fd, const void* buf, size_t count) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    auto gkfs_file = CTX->file_map()->get(fd);
    // call pwrite and update pos
    auto ret = gkfs_pwrite(gkfs_fd, reinterpret_cast<const char*>(buf), count,
                           gkfs_fd->pos(), true);
    auto ret = gkfs_write_ws(*gkfs_file, reinterpret_cast<const char*>(buf),
                             count, gkfs_file->pos(), true);
    return ret;
}

@@ -996,7 +1018,7 @@ gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) {
            continue;
        }
        auto buf = (iov + i)->iov_base;
        ret = gkfs_pwrite(file, reinterpret_cast<char*>(buf), count, pos);
        ret = gkfs_write_ws(*file, reinterpret_cast<char*>(buf), count, pos);
        if(ret == -1) {
            break;
        }
@@ -1037,7 +1059,7 @@ gkfs_writev(int fd, const struct iovec* iov, int iovcnt) {
}

/**
 * Wrapper function for all gkfs read operations
 * Actual read function for all gkfs read operations
 * @param file
 * @param buf
 * @param count
@@ -1045,11 +1067,9 @@ gkfs_writev(int fd, const struct iovec* iov, int iovcnt) {
 * @return read size or -1 on error
 */
ssize_t
gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf,
           size_t count, off64_t offset) {
    auto start_t = std::chrono::high_resolution_clock::now();
    if(file->type() != gkfs::filemap::FileType::regular) {
        assert(file->type() == gkfs::filemap::FileType::directory);
gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count, off64_t offset) {
    if(file.type() != gkfs::filemap::FileType::regular) {
        assert(file.type() == gkfs::filemap::FileType::directory);
        LOG(WARNING, "Cannot read from directory");
        errno = EISDIR;
        return -1;
@@ -1064,17 +1084,17 @@ gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf,
    std::set<int8_t> failed; // set with failed targets.
    if(CTX->get_replicas() != 0) {

        ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
        ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                      CTX->get_replicas(), failed);
        while(ret.first == EIO) {
            ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
            ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                          CTX->get_replicas(), failed);
            LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'",
                ret.first);
        }

    } else {
        ret = gkfs::rpc::forward_read(file->path(), buf, offset, count, 0,
        ret = gkfs::rpc::forward_read(file.path(), buf, offset, count, 0,
                                      failed);
    }

@@ -1084,11 +1104,45 @@ gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf,
        errno = err;
        return -1;
    }
    CTX->read_metrics().add_event(ret.second, start_t);
    // XXX check that we don't try to read past end of the file
    return ret.second; // return read size
}

/**
 * Wrapper function for all gkfs read operations
 * @param file
 * @param buf
 * @param count
 * @param offset
 * @return read size or -1 on error
 */
ssize_t
gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count, off64_t offset) {
#ifdef GKFS_ENABLE_CLIENT_METRICS
    auto start_t = std::chrono::high_resolution_clock::now();
    auto read = gkfs_do_read(file, buf, count, offset);
    CTX->read_metrics().add_event(read, start_t);
    return written;
#else
    return gkfs_do_read(file, buf, count, offset);
#endif
}

/**
 * gkfs wrapper for pread() system calls
 * errno may be set
 * @param fd
 * @param buf
 * @param count
 * @param offset
 * @return read size or -1 on error
 */
ssize_t
gkfs_pread(int fd, void* buf, size_t count, off64_t offset) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    return gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count, offset);
}

/**
 * gkfs wrapper for read() system calls
 * errno may be set
@@ -1101,7 +1155,7 @@ ssize_t
gkfs_read(int fd, void* buf, size_t count) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    auto pos = gkfs_fd->pos(); // retrieve the current offset
    auto ret = gkfs_pread(gkfs_fd, reinterpret_cast<char*>(buf), count, pos);
    auto ret = gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count, pos);
    // Update offset in file descriptor in the file map
    if(ret > 0) {
        gkfs_fd->pos(pos + ret);
@@ -1131,7 +1185,7 @@ gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) {
            continue;
        }
        auto buf = (iov + i)->iov_base;
        ret = gkfs_pread(file, reinterpret_cast<char*>(buf), count, pos);
        ret = gkfs_read_ws(*file, reinterpret_cast<char*>(buf), count, pos);
        if(ret == -1) {
            break;
        }
@@ -1171,21 +1225,6 @@ gkfs_readv(int fd, const struct iovec* iov, int iovcnt) {
    return ret;
}

/**
 * gkfs wrapper for pread() system calls
 * errno may be set
 * @param fd
 * @param buf
 * @param count
 * @param offset
 * @return read size or -1 on error
 */
ssize_t
gkfs_pread_ws(int fd, void* buf, size_t count, off64_t offset) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    return gkfs_pread(gkfs_fd, reinterpret_cast<char*>(buf), count, offset);
}

/**
 * wrapper function for opening directories
 * errno may be set
+2 −2
Original line number Diff line number Diff line
@@ -235,7 +235,7 @@ hook_pread(unsigned int fd, char* buf, size_t count, loff_t pos) {
        fd, fmt::ptr(buf), count, pos);

    if(CTX->file_map()->exist(fd)) {
        return with_errno(gkfs::syscall::gkfs_pread_ws(fd, buf, count, pos));
        return with_errno(gkfs::syscall::gkfs_pread(fd, buf, count, pos));
    }
    /* Since kernel 2.6: pread() became pread64(), and pwrite() became
     * pwrite64(). */
@@ -289,7 +289,7 @@ hook_pwrite(unsigned int fd, const char* buf, size_t count, loff_t pos) {
        fd, fmt::ptr(buf), count, pos);

    if(CTX->file_map()->exist(fd)) {
        return with_errno(gkfs::syscall::gkfs_pwrite_ws(fd, buf, count, pos));
        return with_errno(gkfs::syscall::gkfs_pwrite(fd, buf, count, pos));
    }
    /* Since kernel 2.6: pread() became pread64(), and pwrite() became
     * pwrite64(). */