Commit 422d630f authored by Ramon Nou's avatar Ramon Nou
Browse files

fix verbs

parent 753a1901
Loading
Loading
Loading
Loading
Loading
+772 −769
Original line number Diff line number Diff line
@@ -1160,6 +1160,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,

            ret_write = gkfs::rpc::forward_write(*path, buf, offset, count, 0);
        }
    }
    err = ret_write.first;
    write_size = ret_write.second;

@@ -1176,8 +1177,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    }

    if(err) {
            LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'",
                err);
        LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err);
        errno = err;
        return -1;
    }
@@ -1204,8 +1204,9 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * operations (see man 2 pwrite)
 * @return written size or -1 on error
 */
    ssize_t gkfs_write_ws(gkfs::filemap::OpenFile & file, const char* buf,
                          size_t count, off64_t offset, bool update_pos) {
ssize_t
gkfs_write_ws(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
              off64_t offset, bool update_pos) {
#ifdef GKFS_ENABLE_CLIENT_METRICS
    auto start_t = std::chrono::high_resolution_clock::now();
    auto written = gkfs_do_write(file, buf, count, offset, update_pos);
@@ -1225,7 +1226,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param offset
 * @return written size or -1 on error
 */
    ssize_t gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) {
ssize_t
gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) {
    auto file = CTX->file_map()->get(fd);
    if(!file)
        return 0;
@@ -1241,7 +1243,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param count
 * @return written size or -1 on error
 */
    ssize_t gkfs_write(int fd, const void* buf, size_t count) {
ssize_t
gkfs_write(int fd, const void* buf, size_t count) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    if(!gkfs_fd)
        return 0;
@@ -1260,8 +1263,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param offset
 * @return written size or -1 on error
 */
    ssize_t gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt,
                         off_t offset) {
ssize_t
gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) {

    auto file = CTX->file_map()->get(fd);
    if(!file)
@@ -1275,8 +1278,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
            continue;
        }
        auto buf = (iov + i)->iov_base;
            ret = gkfs_write_ws(*file, reinterpret_cast<char*>(buf), count,
                                pos);
        ret = gkfs_write_ws(*file, reinterpret_cast<char*>(buf), count, pos);
        if(ret == -1) {
            break;
        }
@@ -1302,7 +1304,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param iovcnt
 * @return written size or -1 on error
 */
    ssize_t gkfs_writev(int fd, const struct iovec* iov, int iovcnt) {
ssize_t
gkfs_writev(int fd, const struct iovec* iov, int iovcnt) {

    auto gkfs_fd = CTX->file_map()->get(fd);
    if(!gkfs_fd)
@@ -1325,8 +1328,9 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param offset
 * @return read size or -1 on error
 */
    ssize_t gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf,
                         size_t count, off64_t offset) {
ssize_t
gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
             off64_t offset) {
    if(file.type() != gkfs::filemap::FileType::regular) {
        assert(file.type() == gkfs::filemap::FileType::directory);
        LOG(WARNING, "Cannot read from directory");
@@ -1343,8 +1347,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    pair<int, long> ret;
    if(gkfs::config::proxy::fwd_io && CTX->use_proxy() &&
       count > gkfs::config::proxy::fwd_io_count_threshold) {
            ret = gkfs::rpc::forward_read_proxy(file.path(), buf, offset,
                                                count);
        ret = gkfs::rpc::forward_read_proxy(file.path(), buf, offset, count);
    } else {
        std::set<int8_t> failed; // set with failed targets.
        if(CTX->get_replicas() != 0) {
@@ -1352,17 +1355,15 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
            ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                          CTX->get_replicas(), failed);
            while(ret.first == EIO) {
                    ret = gkfs::rpc::forward_read(file.path(), buf, offset,
                                                  count, CTX->get_replicas(),
                                                  failed);
                    LOG(WARNING,
                        "gkfs::rpc::forward_read() failed with ret '{}'",
                ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                              CTX->get_replicas(), failed);
                LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'",
                    ret.first);
            }

        } else {
                ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                              0, failed);
            ret = gkfs::rpc::forward_read(file.path(), buf, offset, count, 0,
                                          failed);
        }
    }
    auto err = ret.first;
@@ -1383,8 +1384,9 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param offset
 * @return read size or -1 on error
 */
    ssize_t gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf,
                         size_t count, off64_t offset) {
ssize_t
gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
             off64_t offset) {
#ifdef GKFS_ENABLE_CLIENT_METRICS
    auto start_t = std::chrono::high_resolution_clock::now();
    auto read = gkfs_do_read(file, buf, count, offset);
@@ -1404,12 +1406,12 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param offset
 * @return read size or -1 on error
 */
    ssize_t gkfs_pread(int fd, void* buf, size_t count, off64_t offset) {
ssize_t
gkfs_pread(int fd, void* buf, size_t count, off64_t offset) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    if(!gkfs_fd)
        return 0;
        return gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count,
                            offset);
    return gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count, offset);
}

/**
@@ -1420,13 +1422,13 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param count
 * @return read size or -1 on error
 */
    ssize_t gkfs_read(int fd, void* buf, size_t count) {
ssize_t
gkfs_read(int fd, void* buf, size_t count) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    if(!gkfs_fd)
        return 0;
    auto pos = gkfs_fd->pos(); // retrieve the current offset
        auto ret = gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count,
                                pos);
    auto ret = gkfs_read_ws(*gkfs_fd, reinterpret_cast<char*>(buf), count, pos);
    // Update offset in file descriptor in the file map
    if(ret > 0) {
        gkfs_fd->pos(pos + ret);
@@ -1443,8 +1445,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param offset
 * @return read size or -1 on error
 */
    ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt,
                        off_t offset) {
ssize_t
gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) {

    auto file = CTX->file_map()->get(fd);
    if(!file)
@@ -1484,7 +1486,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param iovcnt
 * @return read size or -1 on error
 */
    ssize_t gkfs_readv(int fd, const struct iovec* iov, int iovcnt) {
ssize_t
gkfs_readv(int fd, const struct iovec* iov, int iovcnt) {

    auto gkfs_fd = CTX->file_map()->get(fd);
    if(!gkfs_fd)
@@ -1505,7 +1508,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param path
 * @return 0 on success or -1 on error
 */
    int gkfs_opendir(const std::string& path) {
int
gkfs_opendir(const std::string& path) {
    auto md = gkfs::utils::get_metadata(path);
    if(!md) {
        return -1;
@@ -1530,12 +1534,10 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
            __func__, path, CTX->hosts().size());
        // Launch RPC calls asynchronously
        for(uint64_t i = 0; i < CTX->hosts().size(); i++) {
                dcache_futures.push_back(std::async(std::launch::async, [&,
                                                                         i]() {
            dcache_futures.push_back(std::async(std::launch::async, [&, i]() {
                if(gkfs::config::proxy::fwd_get_dirents_single &&
                   CTX->use_proxy()) {
                        return gkfs::rpc::forward_get_dirents_single_proxy(path,
                                                                           i);
                    return gkfs::rpc::forward_get_dirents_single_proxy(path, i);
                } else {
                    return gkfs::rpc::forward_get_dirents_single(path, i);
                }
@@ -1553,8 +1555,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
                LOG(DEBUG, "name: {} type: {} size: {} ctime: {}",
                    get<0>(dentry), get<1>(dentry), get<2>(dentry),
                    get<3>(dentry));
                    auto ftype = get<1>(dentry)
                                         ? gkfs::filemap::FileType::directory
                auto ftype = get<1>(dentry) ? gkfs::filemap::FileType::directory
                                            : gkfs::filemap::FileType::regular;
                // filename, is_dir, size, ctime
                ret.second->add(get<0>(dentry), ftype);
@@ -1566,8 +1567,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
            }
            ret.first = res.first;
        }
            LOG(DEBUG,
                "{}() Unpacked dirents for path '{}' counted '{}' entries",
        LOG(DEBUG, "{}() Unpacked dirents for path '{}' counted '{}' entries",
            __func__, path, cnt);
    } else {
        ret = gkfs::rpc::forward_get_dirents(path);
@@ -1587,7 +1587,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param path
 * @return 0 on success or -1 on error
 */
    int gkfs_rmdir(const std::string& path) {
int
gkfs_rmdir(const std::string& path) {
    int err;
    // check that directory is empty if a strict dir hierarchy is
    // enforced
@@ -1595,8 +1596,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
#if GKFS_CREATE_CHECK_PARENTS
    auto md = gkfs::utils::get_metadata(path);
    if(!md) {
            LOG(DEBUG, "Error: Path '{}' err code '{}' ", path,
                strerror(errno));
        LOG(DEBUG, "Error: Path '{}' err code '{}' ", path, strerror(errno));
        return -1;
    }
    if(!S_ISDIR(md->mode())) {
@@ -1637,8 +1637,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param count
 * @return 0 on success or -1 on error
 */
    int gkfs_getdents(unsigned int fd, struct linux_dirent* dirp,
                      unsigned int count) {
int
gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
    // Get opendir object (content was downloaded with opendir() call)
    auto open_dir = CTX->file_map()->get_dir(fd);
    if(open_dir == nullptr) {
@@ -1658,8 +1658,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    while(pos < open_dir->size()) {
        // get dentry fir current position
        auto de = open_dir->getdent(pos);
            if(CTX->protect_files_consumer() or
               CTX->protect_files_generator()) {
        if(CTX->protect_files_consumer() or CTX->protect_files_generator()) {
            // if de.name ends with lockgekko jump to the next file
            if(de.name().size() >= 10 &&
               de.name().substr(de.name().size() - 10) == ".lockgekko") {
@@ -1687,8 +1686,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
        }
        current_dirp = reinterpret_cast<struct linux_dirent*>(
                reinterpret_cast<char*>(dirp) + written);
            current_dirp->d_ino = std::hash<std::string>()(open_dir->path() +
                                                           "/" + de.name());
        current_dirp->d_ino =
                std::hash<std::string>()(open_dir->path() + "/" + de.name());

        current_dirp->d_reclen = total_size;

@@ -1720,7 +1719,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param count
 * @return 0 on success or -1 on error
 */
    int gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
int
gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
                unsigned int count) {
    auto open_dir = CTX->file_map()->get_dir(fd);
    if(open_dir == nullptr) {
@@ -1736,8 +1736,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    struct linux_dirent64* current_dirp = nullptr;
    while(pos < open_dir->size()) {
        auto de = open_dir->getdent(pos);
            if(CTX->protect_files_consumer() or
               CTX->protect_files_generator()) {
        if(CTX->protect_files_consumer() or CTX->protect_files_generator()) {
            // if de.name ends with lockgekko jump to the next file
            if(de.name().size() >= 10 &&
               de.name().substr(de.name().size() - 10) == ".lockgekko") {
@@ -1769,8 +1768,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
        }
        current_dirp = reinterpret_cast<struct linux_dirent64*>(
                reinterpret_cast<char*>(dirp) + written);
            current_dirp->d_ino = std::hash<std::string>()(open_dir->path() +
                                                           "/" + de.name());
        current_dirp->d_ino =
                std::hash<std::string>()(open_dir->path() + "/" + de.name());

        current_dirp->d_reclen = total_size;
        current_dirp->d_type =
@@ -1792,7 +1791,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    return written;
}

    int gkfs_fsync(unsigned int fd) {
int
gkfs_fsync(unsigned int fd) {
    auto file = CTX->file_map()->get(fd);
    if(!file) {
        errno = 0;
@@ -1802,8 +1802,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    if(CTX->use_write_size_cache()) {
        auto err = CTX->write_size_cache()->flush(file->path(), true).first;
        if(err) {
                LOG(ERROR, "{}() write_size_cache() failed with err '{}'",
                    __func__, err);
            LOG(ERROR, "{}() write_size_cache() failed with err '{}'", __func__,
                err);
            errno = err;
            return -1;
        }
@@ -1818,14 +1818,13 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param fd
 * @return int
 */
    int gkfs_close(unsigned int fd) {
int
gkfs_close(unsigned int fd) {
    auto file = CTX->file_map()->get(fd);
    if(file) {
        // flush write size cache to be server consistent
        if(CTX->use_write_size_cache()) {
                auto err = CTX->write_size_cache()
                                   ->flush(file->path(), true)
                                   .first;
            auto err = CTX->write_size_cache()->flush(file->path(), true).first;
            if(err) {
                LOG(ERROR, "{}() write_size_cache() failed with err '{}'",
                    __func__, err);
@@ -1874,8 +1873,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param target_path
 * @return 0 on success or -1 on error
 */
    int gkfs_mk_symlink(const std::string& path,
                        const std::string& target_path) {
int
gkfs_mk_symlink(const std::string& path, const std::string& target_path) {
    /* The following check is not POSIX compliant.
     * In POSIX the target is not checked at all.
     *  Here if the target is a directory we raise a NOTSUP error.
@@ -1929,7 +1928,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
 * @param bufsize
 * @return 0 on success or -1 on error
 */
    int gkfs_readlink(const std::string& path, char* buf, int bufsize) {
int
gkfs_readlink(const std::string& path, char* buf, int bufsize) {
    auto md = gkfs::utils::get_metadata(path, false);
    if(!md) {
        LOG(DEBUG, "Named link doesn't exist");
@@ -1955,7 +1955,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
#endif


    std::vector<std::string> gkfs_get_file_list(const std::string& path) {
std::vector<std::string>
gkfs_get_file_list(const std::string& path) {
    auto ret = gkfs::rpc::forward_get_dirents(path);
    auto err = ret.first;
    if(err) {
@@ -1970,8 +1971,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,

    while(pos < open_dir->size()) {
        auto de = open_dir->getdent(pos++);
            if(CTX->protect_files_consumer() or
               CTX->protect_files_generator()) {
        if(CTX->protect_files_consumer() or CTX->protect_files_generator()) {
            // if de.name ends with lockgekko jump to the next file
            if(de.name().size() >= 10 &&
               de.name().substr(de.name().size() - 10) == ".lockgekko") {
@@ -1984,7 +1984,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
}


    void* gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd,
void*
gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd,
          off_t offset) {
    void* ptr = malloc(length);
    if(ptr == nullptr) {
@@ -1996,7 +1997,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    return ptr;
}

    int gkfs_msync(void* addr, size_t length, int flags) {
int
gkfs_msync(void* addr, size_t length, int flags) {
    // check if addr is from gekkofs (mmap_set)
    // if so, get the fd and offset
    // pwrite length to the original offset
@@ -2013,7 +2015,8 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
}


    int gkfs_munmap(void* addr, size_t length) {
int
gkfs_munmap(void* addr, size_t length) {
    // check if addr is from gekkofs (mmap_set)
    // if so, get the fd and offset
    // pwrite length to the original offset