Verified Commit 97c53483 authored by Marc Vef's avatar Marc Vef
Browse files

Refactoring readdir

- more error handling
- removed runtime exception from client
- Streamlining error handling in client and making it consistent. Adding comments
- Improved readdir logging
parent 7f577b39
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -24,14 +24,14 @@ struct ChunkStat {
    unsigned long chunk_free;
};

ssize_t forward_write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset,
std::pair<int, ssize_t> forward_write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset,
                                      size_t write_size, int64_t updated_metadentry_size);

ssize_t forward_read(const std::string& path, void* buf, off64_t offset, size_t read_size);
std::pair<int, ssize_t> forward_read(const std::string& path, void* buf, off64_t offset, size_t read_size);

int forward_truncate(const std::string& path, size_t current_size, size_t new_size);

ChunkStat forward_get_chunk_stat();
std::pair<int, ChunkStat> forward_get_chunk_stat();

} // namespace rpc
} // namespace gkfs
+4 −4
Original line number Diff line number Diff line
@@ -40,12 +40,12 @@ int forward_decr_size(const std::string& path, size_t length);
int forward_update_metadentry(const std::string& path, const gkfs::metadata::Metadata& md,
                              const gkfs::metadata::MetadentryUpdateFlags& md_flags);

int forward_update_metadentry_size(const std::string& path, size_t size, off64_t offset, bool append_flag,
                                   off64_t& ret_size);
std::pair<int, off64_t>
forward_update_metadentry_size(const std::string& path, size_t size, off64_t offset, bool append_flag);

int forward_get_metadentry_size(const std::string& path, off64_t& ret_size);
std::pair<int, off64_t> forward_get_metadentry_size(const std::string& path);

void forward_get_dirents(gkfs::filemap::OpenDir& open_dir);
int forward_get_dirents(gkfs::filemap::OpenDir& open_dir);

#ifdef HAS_SYMLINKS

+81 −40
Original line number Diff line number Diff line
@@ -197,7 +197,12 @@ int gkfs_create(const std::string& path, mode_t mode) {
    if (check_parent_dir(path)) {
        return -1;
    }
    return gkfs::rpc::forward_create(path, mode);
    auto err = gkfs::rpc::forward_create(path, mode);
    if (err) {
        errno = err;
        return -1;
    }
    return 0;
}

/**
@@ -211,7 +216,12 @@ int gkfs_remove(const std::string& path) {
        return -1;
    }
    bool has_data = S_ISREG(md->mode()) && (md->size() != 0);
    return gkfs::rpc::forward_remove(path, !has_data, md->size());
    auto err = gkfs::rpc::forward_remove(path, !has_data, md->size());
    if (err) {
        errno = err;
        return -1;
    }
    return 0;
}

int gkfs_access(const std::string& path, const int mask, bool follow_links) {
@@ -271,13 +281,15 @@ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask,
#endif

int gkfs_statfs(struct statfs* buf) {
    gkfs::rpc::ChunkStat blk_stat{};
    try {
        blk_stat = gkfs::rpc::forward_get_chunk_stat();
    } catch (const std::exception& e) {
        LOG(ERROR, "{}() Failure with error: '{}'", e.what());

    auto ret = gkfs::rpc::forward_get_chunk_stat();
    auto err = ret.first;
    if (err) {
        LOG(ERROR, "{}() Failure with error: '{}'", err);
        errno = err;
        return -1;
    }
    auto blk_stat = ret.second;
    buf->f_type = 0;
    buf->f_bsize = blk_stat.chunk_size;
    buf->f_blocks = blk_stat.chunk_total;
@@ -294,13 +306,14 @@ int gkfs_statfs(struct statfs* buf) {
}

int gkfs_statvfs(struct statvfs* buf) {
    gkfs::rpc::ChunkStat blk_stat{};
    try {
        blk_stat = gkfs::rpc::forward_get_chunk_stat();
    } catch (const std::exception& e) {
        LOG(ERROR, "{}() Failure with error: '{}'", e.what());
    auto ret = gkfs::rpc::forward_get_chunk_stat();
    auto err = ret.first;
    if (err) {
        LOG(ERROR, "{}() Failure with error: '{}'", err);
        errno = err;
        return -1;
    }
    auto blk_stat = ret.second;
    buf->f_bsize = blk_stat.chunk_size;
    buf->f_blocks = blk_stat.chunk_total;
    buf->f_bfree = blk_stat.chunk_free;
@@ -329,15 +342,15 @@ off_t gkfs_lseek(shared_ptr<gkfs::filemap::OpenFile> gkfs_fd, off_t offset, unsi
            gkfs_fd->pos(gkfs_fd->pos() + offset);
            break;
        case SEEK_END: {
            off64_t file_size;
            auto err = gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path(), file_size);
          
            if (err < 0) {
                errno = err; // Negative numbers are explicitly for error codes
            auto ret = gkfs::rpc::forward_get_metadentry_size(gkfs_fd->path());
            auto err = ret.first;
            if (err) {
                errno = err;
                return -1;
            }

            if (offset < 0 and file_size < -offset) {
            auto file_size = ret.second;
            if (offset < 0 && file_size < -offset) {
                errno = EINVAL;
                return -1;
            }
@@ -369,14 +382,17 @@ int gkfs_truncate(const std::string& path, off_t old_size, off_t new_size) {
    if (new_size == old_size) {
        return 0;
    }

    if (gkfs::rpc::forward_decr_size(path, new_size)) {
    auto err = gkfs::rpc::forward_decr_size(path, new_size);
    if (err) {
        LOG(DEBUG, "Failed to decrease size");
        errno = err;
        return -1;
    }

    if (gkfs::rpc::forward_truncate(path, old_size, new_size)) {
    err = gkfs::rpc::forward_truncate(path, old_size, new_size);
    if (err) {
        LOG(DEBUG, "Failed to truncate data");
        errno = err;
        return -1;
    }
    return 0;
@@ -427,19 +443,24 @@ ssize_t gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* b
    }
    auto path = make_shared<string>(file->path());
    auto append_flag = file->get_flag(gkfs::filemap::OpenFile_flags::append);
    ssize_t ret = 0;
    long updated_size = 0;

    ret = gkfs::rpc::forward_update_metadentry_size(*path, count, offset, append_flag, updated_size);
    if (ret != 0) {
        LOG(ERROR, "update_metadentry_size() failed with ret {}", ret);
        return ret; // ERR
    auto ret_update_size = gkfs::rpc::forward_update_metadentry_size(*path, count, offset, append_flag);
    auto err = ret_update_size.first;
    if (err) {
        LOG(ERROR, "update_metadentry_size() failed with err '{}'", err);
        errno = err;
        return -1;
    }
    ret = gkfs::rpc::forward_write(*path, buf, append_flag, offset, count, updated_size);
    if (ret < 0) {
        LOG(WARNING, "gkfs::rpc::forward_write() failed with ret {}", ret);
    auto updated_size = ret_update_size.second;

    auto ret_write = gkfs::rpc::forward_write(*path, buf, append_flag, offset, count, updated_size);
    err = ret_write.first;
    if (err) {
        LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err);
        errno = err;
        return -1;
    }
    return ret; // return written size or -1 as error
    return ret_write.second; // return written size
}

ssize_t gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
@@ -521,11 +542,14 @@ ssize_t gkfs_pread(std::shared_ptr<gkfs::filemap::OpenFile> file, char* buf, siz
        memset(buf, 0, sizeof(char) * count);
    }
    auto ret = gkfs::rpc::forward_read(file->path(), buf, offset, count);
    if (ret < 0) {
        LOG(WARNING, "gkfs::rpc::forward_read() failed with ret {}", ret);
    auto err = ret.first;
    if (err) {
        LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'", err);
        errno = err;
        return -1;
    }
    // XXX check that we don't try to read past end of the file
    return ret; // return read size or -1 as error
    return ret.second; // return read size
}

ssize_t gkfs_read(int fd, void* buf, size_t count) {
@@ -600,7 +624,11 @@ int gkfs_opendir(const std::string& path) {
    }

    auto open_dir = std::make_shared<gkfs::filemap::OpenDir>(path);
    gkfs::rpc::forward_get_dirents(*open_dir);
    auto err = gkfs::rpc::forward_get_dirents(*open_dir);
    if (err) {
        errno = err;
        return -1;
    }
    return CTX->file_map()->add(open_dir);
}

@@ -618,12 +646,21 @@ int gkfs_rmdir(const std::string& path) {
    }

    auto open_dir = std::make_shared<gkfs::filemap::OpenDir>(path);
    gkfs::rpc::forward_get_dirents(*open_dir);
    auto err = gkfs::rpc::forward_get_dirents(*open_dir);
    if (err) {
        errno = err;
        return -1;
    }
    if (open_dir->size() != 0) {
        errno = ENOTEMPTY;
        return -1;
    }
    return gkfs::rpc::forward_remove(path, true, 0);
    err = gkfs::rpc::forward_remove(path, true, 0);
    if (err) {
        errno = err;
        return -1;
    }
    return 0;
}

int gkfs_getdents(unsigned int fd,
@@ -777,8 +814,12 @@ int gkfs_mk_symlink(const std::string& path, const std::string& target_path) {
        errno = EEXIST;
        return -1;
    }

    return gkfs::rpc::forward_mk_symlink(path, target_path);
    auto err = gkfs::rpc::forward_mk_symlink(path, target_path);
    if (err) {
        errno = err;
        return -1;
    }
    return 0;
}

int gkfs_readlink(const std::string& path, char* buf, int bufsize) {
+2 −0
Original line number Diff line number Diff line
@@ -77,6 +77,7 @@ std::shared_ptr<gkfs::metadata::Metadata> get_metadata(const string& path, bool
    std::string attr;
    auto err = gkfs::rpc::forward_stat(path, attr);
    if (err) {
        errno = err;
        return nullptr;
    }
#ifdef HAS_SYMLINKS
@@ -85,6 +86,7 @@ std::shared_ptr<gkfs::metadata::Metadata> get_metadata(const string& path, bool
        while (md.is_link()) {
            err = gkfs::rpc::forward_stat(md.target_path(), attr);
            if (err) {
                errno = err;
                return nullptr;
            }
            md = gkfs::metadata::Metadata{attr};
+80 −58
Original line number Diff line number Diff line
@@ -26,13 +26,25 @@ using namespace std;
namespace gkfs {
namespace rpc {

/*
 * This file includes all metadata RPC calls.
 * NOTE: No errno is defined here!
 */

// TODO If we decide to keep this functionality with one segment, the function can be merged mostly.
// Code is mostly redundant

/**
 * Sends an RPC request to a specific node to pull all chunks that belong to him
 * Send an RPC request to write from a buffer.
 * @param path
 * @param buf
 * @param append_flag
 * @param in_offset
 * @param write_size
 * @param updated_metadentry_size
 * @return pair<error code, written size>
 */
ssize_t forward_write(const string& path, const void* buf, const bool append_flag,
pair<int, ssize_t> forward_write(const string& path, const void* buf, const bool append_flag,
                      const off64_t in_offset, const size_t write_size,
                      const int64_t updated_metadentry_size) {

@@ -90,8 +102,7 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla

    } catch (const std::exception& ex) {
        LOG(ERROR, "Failed to expose buffers for RMA");
        errno = EBUSY;
        return -1;
        return make_pair(EBUSY, 0);
    }

    std::vector<hermes::rpc_handle<gkfs::rpc::write_data>> handles;
@@ -152,15 +163,14 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla
        } catch (const std::exception& ex) {
            LOG(ERROR, "Unable to send non-blocking rpc for "
                       "path \"{}\" [peer: {}]", path, target);
            errno = EBUSY;
            return -1;
            return make_pair(EBUSY, 0);
        }
    }

    // Wait for RPC responses and then get response and add it to out_size
    // which is the written size All potential outputs are served to free
    // resources regardless of errors, although an errorcode is set.
    bool error = false;
    auto err = 0;
    ssize_t out_size = 0;
    std::size_t idx = 0;

@@ -172,8 +182,7 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla

            if (out.err() != 0) {
                LOG(ERROR, "Daemon reported error: {}", out.err());
                error = true;
                errno = out.err();
                err = out.err();
            }

            out_size += static_cast<size_t>(out.io_size());
@@ -181,20 +190,30 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla
        } catch (const std::exception& ex) {
            LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
                path, targets[idx]);
            error = true;
            errno = EIO;
            err = EIO;
        }

        ++idx;
        idx++;
    }

    return error ? -1 : out_size;
    /*
     * Typically file systems return the size even if only a part of it was written.
     * In our case, we do not keep track which daemon fully wrote its workload. Thus, we always return size 0 on error.
     */
    if (err)
        return make_pair(err, 0);
    else
        return make_pair(0, out_size);
}

/**
 * Sends an RPC request to a specific node to push all chunks that belong to him
 * Send an RPC request to read to a buffer.
 * @param path
 * @param buf
 * @param offset
 * @param read_size
 * @return pair<error code, read size>
 */
ssize_t forward_read(const string& path, void* buf, const off64_t offset, const size_t read_size) {
pair<int, ssize_t> forward_read(const string& path, void* buf, const off64_t offset, const size_t read_size) {

    // Calculate chunkid boundaries and numbers so that daemons know in which
    // interval to look for chunks
@@ -246,8 +265,7 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const

    } catch (const std::exception& ex) {
        LOG(ERROR, "Failed to expose buffers for RMA");
        errno = EBUSY;
        return -1;
        return make_pair(EBUSY, 0);
    }

    std::vector<hermes::rpc_handle<gkfs::rpc::read_data>> handles;
@@ -309,15 +327,14 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const
        } catch (const std::exception& ex) {
            LOG(ERROR, "Unable to send non-blocking rpc for path \"{}\" "
                       "[peer: {}]", path, target);
            errno = EBUSY;
            return -1;
            return make_pair(EBUSY, 0);
        }
    }

    // Wait for RPC responses and then get response and add it to out_size
    // which is the read size. All potential outputs are served to free
    // resources regardless of errors, although an errorcode is set.
    bool error = false;
    auto err = 0;
    ssize_t out_size = 0;
    std::size_t idx = 0;

@@ -329,8 +346,7 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const

            if (out.err() != 0) {
                LOG(ERROR, "Daemon reported error: {}", out.err());
                error = true;
                errno = out.err();
                err = out.err();
            }

            out_size += static_cast<size_t>(out.io_size());
@@ -338,20 +354,30 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const
        } catch (const std::exception& ex) {
            LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
                path, targets[idx]);
            error = true;
            errno = EIO;
            err = EIO;
        }

        ++idx;
        idx++;
    }

    return error ? -1 : out_size;
    /*
     * Typically file systems return the size even if only a part of it was read.
     * In our case, we do not keep track which daemon fully read its workload. Thus, we always return size 0 on error.
     */
    if (err)
        return make_pair(err, 0);
    else
        return make_pair(0, out_size);
}

/**
 * Send an RPC request to truncate a file to given new size
 * @param path
 * @param current_size
 * @param new_size
 * @return error code
 */
int forward_truncate(const std::string& path, size_t current_size, size_t new_size) {

    assert(current_size > new_size);
    bool error = false;

    // Find out which data servers need to delete data chunks in order to
    // contact only them
@@ -366,6 +392,8 @@ int forward_truncate(const std::string& path, size_t current_size, size_t new_si

    std::vector<hermes::rpc_handle<gkfs::rpc::trunc_data>> handles;

    auto err = 0;

    for (const auto& host: hosts) {

        auto endp = CTX->hosts().at(host);
@@ -386,44 +414,41 @@ int forward_truncate(const std::string& path, size_t current_size, size_t new_si
            // TODO(amiranda): we should cancel all previously posted requests
            // here, unfortunately, Hermes does not support it yet :/
            LOG(ERROR, "Failed to send request to host: {}", host);
            errno = EIO;
            return -1;
            err = EIO;
            break; // We need to gather all responses so we can't return here
        }

    }

    // Wait for RPC responses and then get response
    for (const auto& h : handles) {

        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            auto out = h.get().at(0);

            if (out.err() != 0) {
            if (out.err()) {
                LOG(ERROR, "received error response: {}", out.err());
                error = true;
                errno = EIO;
                err = EIO;
            }
        } catch (const std::exception& ex) {
            LOG(ERROR, "while getting rpc output");
            error = true;
            errno = EIO;
            err = EIO;
        }
    }

    return error ? -1 : 0;
    return err ? err : 0;
}

/**
 * Performs a chunk stat RPC to all hosts
 * @return rpc::ChunkStat
 * @throws std::runtime_error
 * Send an RPC request to chunk stat all hosts
 * @return pair<error code, rpc::ChunkStat>
 */
ChunkStat forward_get_chunk_stat() {
pair<int, ChunkStat> forward_get_chunk_stat() {

    std::vector<hermes::rpc_handle<gkfs::rpc::chunk_stat>> handles;

    auto err = 0;

    for (const auto& endp : CTX->hosts()) {
        try {
            LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
@@ -441,7 +466,8 @@ ChunkStat forward_get_chunk_stat() {
            // TODO(amiranda): we should cancel all previously posted requests
            // here, unfortunately, Hermes does not support it yet :/
            LOG(ERROR, "Failed to send request to host: {}", endp.to_string());
            throw std::runtime_error("Failed to forward non-blocking rpc request");
            err = EBUSY;
            break; // We need to gather all responses so we can't return here
        }
    }

@@ -449,8 +475,6 @@ ChunkStat forward_get_chunk_stat() {
    unsigned long chunk_total = 0;
    unsigned long chunk_free = 0;

    int error = 0;

    // wait for RPC responses
    for (std::size_t i = 0; i < handles.size(); ++i) {

@@ -461,10 +485,10 @@ ChunkStat forward_get_chunk_stat() {
            // output that never comes?
            out = handles[i].get().at(0);

            if (out.err() != 0) {
                error = out.err();
            if (out.err()) {
                err = out.err();
                LOG(ERROR, "Host '{}' reported err code '{}' during stat chunk.", CTX->hosts().at(i).to_string(),
                    error);
                    err);
                // we don't break here to ensure all responses are processed
                continue;
            }
@@ -472,16 +496,14 @@ ChunkStat forward_get_chunk_stat() {
            chunk_total += out.chunk_total();
            chunk_free += out.chunk_free();
        } catch (const std::exception& ex) {
            errno = EBUSY;
            throw std::runtime_error(fmt::format("Failed to get RPC output from host: {}", i));
            LOG(ERROR, "Failed to get RPC output from host: {}", i);
            err = EBUSY;
        }
    }
    if (error != 0) {
        errno = error;
        throw std::runtime_error("chunk stat failed on one host");
    }

    return {chunk_size, chunk_total, chunk_free};
    if (err)
        return make_pair(err, ChunkStat{});
    else
        return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free});
}

} // namespace rpc
Loading