Commit 753a1901 authored by Ramon Nou's avatar Ramon Nou
Browse files

enable_chunk_read_before_write

parent ebc23cb9
Loading
Loading
Loading
Loading
Loading
+823 −771
Original line number Diff line number Diff line
@@ -1104,6 +1104,60 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
       count > gkfs::config::proxy::fwd_io_count_threshold) {
        ret_write = gkfs::rpc::forward_write_proxy(*path, buf, offset, count);
    } else {
        // If we are going to write chunk 0 (0 to chunksize), with an offset >
        // 0, and we are using ofi+verbs Do a forward_read from 0 to offset,
        // concat read buffer (0..offset) with the rest of the chunk and write
        // the first chunk. Then do a forward write of the rest of the data.
        if(gkfs::config::io::enable_chunk_read_before_write &&
           gkfs::config::rpc::chunksize > 0 &&
           offset < gkfs::config::rpc::chunksize &&
           gkfs::config::rpc::chunksize < count) {
            // Read the first chunk
            char* read_buf = new char[offset];
            std::set<int8_t> failed;
            auto ret_read = gkfs::rpc::forward_read(*path, read_buf, 0, offset,
                                                    0, failed);
            if(ret_read.first != 0) {
                LOG(ERROR, "Failed to read first chunk for RMW: {}",
                    ret_read.first);
                delete[] read_buf;
                errno = ret_read.first;
                return -1;
            }

            // Create a new buffer for the first chunk write
            char* first_chunk_write_buf =
                    new char[gkfs::config::rpc::chunksize];
            // Copy existing data up to offset
            memcpy(first_chunk_write_buf, read_buf, offset);
            // Copy new data into the first chunk
            memcpy(first_chunk_write_buf + offset, buf,
                   gkfs::config::rpc::chunksize - offset);

            // Write the first chunk
            ret_write =
                    gkfs::rpc::forward_write(*path, first_chunk_write_buf, 0,
                                             gkfs::config::rpc::chunksize, 0);
            delete[] read_buf;
            delete[] first_chunk_write_buf;

            if(ret_write.first != 0) {
                LOG(ERROR, "Failed to write first chunk during RMW: {}",
                    ret_write.first);
                errno = ret_write.first;
                return -1;
            }

            // Write the rest of the data
            if(count > gkfs::config::rpc::chunksize) {
                ret_write = gkfs::rpc::forward_write(
                        *path, buf + (gkfs::config::rpc::chunksize - offset),
                        gkfs::config::rpc::chunksize,
                        count - (gkfs::config::rpc::chunksize - offset), 0);
            }
        } else {


            ret_write = gkfs::rpc::forward_write(*path, buf, offset, count, 0);
        }
        err = ret_write.first;
@@ -1122,7 +1176,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
        }

        if(err) {
        LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err);
            LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'",
                err);
            errno = err;
            return -1;
        }
@@ -1149,9 +1204,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
     * operations (see man 2 pwrite)
     * @return written size or -1 on error
     */
ssize_t
gkfs_write_ws(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
              off64_t offset, bool update_pos) {
    ssize_t gkfs_write_ws(gkfs::filemap::OpenFile & file, const char* buf,
                          size_t count, off64_t offset, bool update_pos) {
#ifdef GKFS_ENABLE_CLIENT_METRICS
        auto start_t = std::chrono::high_resolution_clock::now();
        auto written = gkfs_do_write(file, buf, count, offset, update_pos);
@@ -1171,8 +1225,7 @@ gkfs_write_ws(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
     * @param offset
     * @return written size or -1 on error
     */
ssize_t
gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) {
    ssize_t gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) {
        auto file = CTX->file_map()->get(fd);
        if(!file)
            return 0;
@@ -1188,8 +1241,7 @@ gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) {
     * @param count
     * @return written size or -1 on error
     */
ssize_t
gkfs_write(int fd, const void* buf, size_t count) {
    ssize_t gkfs_write(int fd, const void* buf, size_t count) {
        auto gkfs_fd = CTX->file_map()->get(fd);
        if(!gkfs_fd)
            return 0;
@@ -1208,8 +1260,8 @@ gkfs_write(int fd, const void* buf, size_t count) {
     * @param offset
     * @return written size or -1 on error
     */
ssize_t
gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) {
    ssize_t gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt,
                         off_t offset) {

        auto file = CTX->file_map()->get(fd);
        if(!file)
@@ -1223,7 +1275,8 @@ gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) {
                continue;
            }
            auto buf = (iov + i)->iov_base;
        ret = gkfs_write_ws(*file, reinterpret_cast<char*>(buf), count, pos);
            ret = gkfs_write_ws(*file, reinterpret_cast<char*>(buf), count,
                                pos);
            if(ret == -1) {
                break;
            }
@@ -1249,8 +1302,7 @@ gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) {
     * @param iovcnt
     * @return written size or -1 on error
     */
ssize_t
gkfs_writev(int fd, const struct iovec* iov, int iovcnt) {
    ssize_t gkfs_writev(int fd, const struct iovec* iov, int iovcnt) {

        auto gkfs_fd = CTX->file_map()->get(fd);
        if(!gkfs_fd)
@@ -1273,9 +1325,8 @@ gkfs_writev(int fd, const struct iovec* iov, int iovcnt) {
     * @param offset
     * @return read size or -1 on error
     */
ssize_t
gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
             off64_t offset) {
    ssize_t gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf,
                         size_t count, off64_t offset) {
        if(file.type() != gkfs::filemap::FileType::regular) {
            assert(file.type() == gkfs::filemap::FileType::directory);
            LOG(WARNING, "Cannot read from directory");
@@ -1292,7 +1343,8 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
        pair<int, long> ret;
        if(gkfs::config::proxy::fwd_io && CTX->use_proxy() &&
           count > gkfs::config::proxy::fwd_io_count_threshold) {
        ret = gkfs::rpc::forward_read_proxy(file.path(), buf, offset, count);
            ret = gkfs::rpc::forward_read_proxy(file.path(), buf, offset,
                                                count);
        } else {
            std::set<int8_t> failed; // set with failed targets.
            if(CTX->get_replicas() != 0) {
@@ -1300,15 +1352,17 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
                ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                              CTX->get_replicas(), failed);
                while(ret.first == EIO) {
                ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                              CTX->get_replicas(), failed);
                LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'",
                    ret = gkfs::rpc::forward_read(file.path(), buf, offset,
                                                  count, CTX->get_replicas(),
                                                  failed);
                    LOG(WARNING,
                        "gkfs::rpc::forward_read() failed with ret '{}'",
                        ret.first);
                }

            } else {
            ret = gkfs::rpc::forward_read(file.path(), buf, offset, count, 0,
                                          failed);
                ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                              0, failed);
            }
        }
        auto err = ret.first;
@@ -1329,9 +1383,8 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
     * @param offset
     * @return read size or -1 on error
     */
ssize_t
gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
             off64_t offset) {
    ssize_t gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf,
                         size_t count, off64_t offset) {
#ifdef GKFS_ENABLE_CLIENT_METRICS
        auto start_t = std::chrono::high_resolution_clock::now();
        auto read = gkfs_do_read(file, buf, count, offset);
@@ -1351,12 +1404,12 @@ gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
     * @param offset
     * @return read size or -1 on error
     */
ssize_t
gkfs_pread(int fd, void* buf, size_t count, off64_t offset) {
    ssize_t gkfs_pread(int fd, void* buf, size_t count, off64_t offset) {
        auto gkfs_fd = CTX->file_map()->get(fd);
        if(!gkfs_fd)
            return 0;
    return gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count, offset);
        return gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count,
                            offset);
    }

    /**
@@ -1367,13 +1420,13 @@ gkfs_pread(int fd, void* buf, size_t count, off64_t offset) {
     * @param count
     * @return read size or -1 on error
     */
ssize_t
gkfs_read(int fd, void* buf, size_t count) {
    ssize_t gkfs_read(int fd, void* buf, size_t count) {
        auto gkfs_fd = CTX->file_map()->get(fd);
        if(!gkfs_fd)
            return 0;
        auto pos = gkfs_fd->pos(); // retrieve the current offset
    auto ret = gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count, pos);
        auto ret = gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count,
                                pos);
        // Update offset in file descriptor in the file map
        if(ret > 0) {
            gkfs_fd->pos(pos + ret);
@@ -1390,8 +1443,8 @@ gkfs_read(int fd, void* buf, size_t count) {
     * @param offset
     * @return read size or -1 on error
     */
ssize_t
gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) {
    ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt,
                        off_t offset) {

        auto file = CTX->file_map()->get(fd);
        if(!file)
@@ -1431,8 +1484,7 @@ gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) {
     * @param iovcnt
     * @return read size or -1 on error
     */
ssize_t
gkfs_readv(int fd, const struct iovec* iov, int iovcnt) {
    ssize_t gkfs_readv(int fd, const struct iovec* iov, int iovcnt) {

        auto gkfs_fd = CTX->file_map()->get(fd);
        if(!gkfs_fd)
@@ -1453,8 +1505,7 @@ gkfs_readv(int fd, const struct iovec* iov, int iovcnt) {
     * @param path
     * @return 0 on success or -1 on error
     */
int
gkfs_opendir(const std::string& path) {
    int gkfs_opendir(const std::string& path) {
        auto md = gkfs::utils::get_metadata(path);
        if(!md) {
            return -1;
@@ -1479,10 +1530,12 @@ gkfs_opendir(const std::string& path) {
                __func__, path, CTX->hosts().size());
            // Launch RPC calls asynchronously
            for(uint64_t i = 0; i < CTX->hosts().size(); i++) {
            dcache_futures.push_back(std::async(std::launch::async, [&, i]() {
                dcache_futures.push_back(std::async(std::launch::async, [&,
                                                                         i]() {
                    if(gkfs::config::proxy::fwd_get_dirents_single &&
                       CTX->use_proxy()) {
                    return gkfs::rpc::forward_get_dirents_single_proxy(path, i);
                        return gkfs::rpc::forward_get_dirents_single_proxy(path,
                                                                           i);
                    } else {
                        return gkfs::rpc::forward_get_dirents_single(path, i);
                    }
@@ -1500,7 +1553,8 @@ gkfs_opendir(const std::string& path) {
                    LOG(DEBUG, "name: {} type: {} size: {} ctime: {}",
                        get<0>(dentry), get<1>(dentry), get<2>(dentry),
                        get<3>(dentry));
                auto ftype = get<1>(dentry) ? gkfs::filemap::FileType::directory
                    auto ftype = get<1>(dentry)
                                         ? gkfs::filemap::FileType::directory
                                         : gkfs::filemap::FileType::regular;
                    // filename, is_dir, size, ctime
                    ret.second->add(get<0>(dentry), ftype);
@@ -1512,7 +1566,8 @@ gkfs_opendir(const std::string& path) {
                }
                ret.first = res.first;
            }
        LOG(DEBUG, "{}() Unpacked dirents for path '{}' counted '{}' entries",
            LOG(DEBUG,
                "{}() Unpacked dirents for path '{}' counted '{}' entries",
                __func__, path, cnt);
        } else {
            ret = gkfs::rpc::forward_get_dirents(path);
@@ -1532,8 +1587,7 @@ gkfs_opendir(const std::string& path) {
     * @param path
     * @return 0 on success or -1 on error
     */
int
gkfs_rmdir(const std::string& path) {
    int gkfs_rmdir(const std::string& path) {
        int err;
        // check that directory is empty if a strict dir hierarchy is
        // enforced
@@ -1541,7 +1595,8 @@ gkfs_rmdir(const std::string& path) {
#if GKFS_CREATE_CHECK_PARENTS
        auto md = gkfs::utils::get_metadata(path);
        if(!md) {
        LOG(DEBUG, "Error: Path '{}' err code '{}' ", path, strerror(errno));
            LOG(DEBUG, "Error: Path '{}' err code '{}' ", path,
                strerror(errno));
            return -1;
        }
        if(!S_ISDIR(md->mode())) {
@@ -1582,8 +1637,8 @@ gkfs_rmdir(const std::string& path) {
     * @param count
     * @return 0 on success or -1 on error
     */
int
gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
    int gkfs_getdents(unsigned int fd, struct linux_dirent* dirp,
                      unsigned int count) {
        // Get opendir object (content was downloaded with opendir() call)
        auto open_dir = CTX->file_map()->get_dir(fd);
        if(open_dir == nullptr) {
@@ -1603,7 +1658,8 @@ gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
        while(pos < open_dir->size()) {
            // get dentry fir current position
            auto de = open_dir->getdent(pos);
        if(CTX->protect_files_consumer() or CTX->protect_files_generator()) {
            if(CTX->protect_files_consumer() or
               CTX->protect_files_generator()) {
                // if de.name ends with lockgekko jump to the next file
                if(de.name().size() >= 10 &&
                   de.name().substr(de.name().size() - 10) == ".lockgekko") {
@@ -1631,8 +1687,8 @@ gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
            }
            current_dirp = reinterpret_cast<struct linux_dirent*>(
                    reinterpret_cast<char*>(dirp) + written);
        current_dirp->d_ino =
                std::hash<std::string>()(open_dir->path() + "/" + de.name());
            current_dirp->d_ino = std::hash<std::string>()(open_dir->path() +
                                                           "/" + de.name());

            current_dirp->d_reclen = total_size;

@@ -1664,8 +1720,7 @@ gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
     * @param count
     * @return 0 on success or -1 on error
     */
int
gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
    int gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
                        unsigned int count) {
        auto open_dir = CTX->file_map()->get_dir(fd);
        if(open_dir == nullptr) {
@@ -1681,7 +1736,8 @@ gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
        struct linux_dirent64* current_dirp = nullptr;
        while(pos < open_dir->size()) {
            auto de = open_dir->getdent(pos);
        if(CTX->protect_files_consumer() or CTX->protect_files_generator()) {
            if(CTX->protect_files_consumer() or
               CTX->protect_files_generator()) {
                // if de.name ends with lockgekko jump to the next file
                if(de.name().size() >= 10 &&
                   de.name().substr(de.name().size() - 10) == ".lockgekko") {
@@ -1713,8 +1769,8 @@ gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
            }
            current_dirp = reinterpret_cast<struct linux_dirent64*>(
                    reinterpret_cast<char*>(dirp) + written);
        current_dirp->d_ino =
                std::hash<std::string>()(open_dir->path() + "/" + de.name());
            current_dirp->d_ino = std::hash<std::string>()(open_dir->path() +
                                                           "/" + de.name());

            current_dirp->d_reclen = total_size;
            current_dirp->d_type =
@@ -1736,8 +1792,7 @@ gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
        return written;
    }

int
gkfs_fsync(unsigned int fd) {
    int gkfs_fsync(unsigned int fd) {
        auto file = CTX->file_map()->get(fd);
        if(!file) {
            errno = 0;
@@ -1747,8 +1802,8 @@ gkfs_fsync(unsigned int fd) {
        if(CTX->use_write_size_cache()) {
            auto err = CTX->write_size_cache()->flush(file->path(), true).first;
            if(err) {
            LOG(ERROR, "{}() write_size_cache() failed with err '{}'", __func__,
                err);
                LOG(ERROR, "{}() write_size_cache() failed with err '{}'",
                    __func__, err);
                errno = err;
                return -1;
            }
@@ -1763,13 +1818,14 @@ gkfs_fsync(unsigned int fd) {
     * @param fd
     * @return int
     */
int
gkfs_close(unsigned int fd) {
    int gkfs_close(unsigned int fd) {
        auto file = CTX->file_map()->get(fd);
        if(file) {
            // flush write size cache to be server consistent
            if(CTX->use_write_size_cache()) {
            auto err = CTX->write_size_cache()->flush(file->path(), true).first;
                auto err = CTX->write_size_cache()
                                   ->flush(file->path(), true)
                                   .first;
                if(err) {
                    LOG(ERROR, "{}() write_size_cache() failed with err '{}'",
                        __func__, err);
@@ -1818,8 +1874,8 @@ gkfs_close(unsigned int fd) {
     * @param target_path
     * @return 0 on success or -1 on error
     */
int
gkfs_mk_symlink(const std::string& path, const std::string& target_path) {
    int gkfs_mk_symlink(const std::string& path,
                        const std::string& target_path) {
        /* The following check is not POSIX compliant.
         * In POSIX the target is not checked at all.
         *  Here if the target is a directory we raise a NOTSUP error.
@@ -1873,8 +1929,7 @@ gkfs_mk_symlink(const std::string& path, const std::string& target_path) {
     * @param bufsize
     * @return 0 on success or -1 on error
     */
int
gkfs_readlink(const std::string& path, char* buf, int bufsize) {
    int gkfs_readlink(const std::string& path, char* buf, int bufsize) {
        auto md = gkfs::utils::get_metadata(path, false);
        if(!md) {
            LOG(DEBUG, "Named link doesn't exist");
@@ -1900,8 +1955,7 @@ gkfs_readlink(const std::string& path, char* buf, int bufsize) {
#endif


std::vector<std::string>
gkfs_get_file_list(const std::string& path) {
    std::vector<std::string> gkfs_get_file_list(const std::string& path) {
        auto ret = gkfs::rpc::forward_get_dirents(path);
        auto err = ret.first;
        if(err) {
@@ -1916,7 +1970,8 @@ gkfs_get_file_list(const std::string& path) {

        while(pos < open_dir->size()) {
            auto de = open_dir->getdent(pos++);
        if(CTX->protect_files_consumer() or CTX->protect_files_generator()) {
            if(CTX->protect_files_consumer() or
               CTX->protect_files_generator()) {
                // if de.name ends with lockgekko jump to the next file
                if(de.name().size() >= 10 &&
                   de.name().substr(de.name().size() - 10) == ".lockgekko") {
@@ -1929,8 +1984,7 @@ gkfs_get_file_list(const std::string& path) {
    }


void*
gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd,
    void* gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd,
                    off_t offset) {
        void* ptr = malloc(length);
        if(ptr == nullptr) {
@@ -1942,8 +1996,7 @@ gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd,
        return ptr;
    }

int
gkfs_msync(void* addr, size_t length, int flags) {
    int gkfs_msync(void* addr, size_t length, int flags) {
        // check if addr is from gekkofs (mmap_set)
        // if so, get the fd and offset
        // pwrite length to the original offset
@@ -1960,8 +2013,7 @@ gkfs_msync(void* addr, size_t length, int flags) {
    }


int
gkfs_munmap(void* addr, size_t length) {
    int gkfs_munmap(void* addr, size_t length) {
        // check if addr is from gekkofs (mmap_set)
        // if so, get the fd and offset
        // pwrite length to the original offset