Verified Commit d3d9f170 authored by Marc Vef's avatar Marc Vef
Browse files

Client: Adding append support

parent a2b88702
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -106,7 +106,7 @@ gkfs_readlink(const std::string& path, char* buf, int bufsize);

ssize_t
gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
            size_t count, off64_t offset);
            size_t count, off64_t offset, bool update_pos = false);

ssize_t
gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset);
+2 −3
Original line number Diff line number Diff line
@@ -42,9 +42,8 @@ struct ChunkStat {
// an exception.

std::pair<int, ssize_t>
forward_write(const std::string& path, const void* buf, bool append_flag,
              off64_t in_offset, size_t write_size,
              int64_t updated_metadentry_size);
forward_write(const std::string& path, const void* buf, off64_t offset,
              size_t write_size);

std::pair<int, ssize_t>
forward_read(const std::string& path, void* buf, off64_t offset,
+5 −5
Original line number Diff line number Diff line
@@ -1158,10 +1158,10 @@ struct update_metadentry_size {
        hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() : m_err(), m_ret_size() {}
        output() : m_err(), m_ret_offset() {}

        output(int32_t err, int64_t ret_size)
            : m_err(err), m_ret_size(ret_size) {}
            : m_err(err), m_ret_offset(ret_size) {}

        output(output&& rhs) = default;

@@ -1175,7 +1175,7 @@ struct update_metadentry_size {

        explicit output(const rpc_update_metadentry_size_out_t& out) {
            m_err = out.err;
            m_ret_size = out.ret_size;
            m_ret_offset = out.ret_offset;
        }

        int32_t
@@ -1185,12 +1185,12 @@ struct update_metadentry_size {

        int64_t
        ret_size() const {
            return m_ret_size;
            return m_ret_offset;
        }

    private:
        int32_t m_err;
        int64_t m_ret_size;
        int64_t m_ret_offset;
    };
};

+35 −26
Original line number Diff line number Diff line
@@ -138,12 +138,6 @@ gkfs_open(const std::string& path, mode_t mode, int flags) {
        return -1;
    }

    if(flags & O_APPEND) {
        LOG(ERROR, "`O_APPEND` flag is not supported");
        errno = ENOTSUP;
        return -1;
    }

    // metadata object filled during create or stat
    gkfs::metadata::Metadata md{};
    if(flags & O_CREAT) {
@@ -855,38 +849,61 @@ gkfs_dup2(const int oldfd, const int newfd) {
 * @param buf
 * @param count
 * @param offset
 * @param update_pos pos should only be updated for some write operations (see
 * man 2 pwrite)
 * @return written size or -1 on error
 */
ssize_t
gkfs_pwrite(std::shared_ptr<gkfs::filemap::OpenFile> file, const char* buf,
            size_t count, off64_t offset) {
            size_t count, off64_t offset, bool update_pos) {
    if(file->type() != gkfs::filemap::FileType::regular) {
        assert(file->type() == gkfs::filemap::FileType::directory);
        LOG(WARNING, "Cannot read from directory");
        LOG(WARNING, "Cannot write to directory");
        errno = EISDIR;
        return -1;
    }
    auto path = make_shared<string>(file->path());
    auto append_flag = file->get_flag(gkfs::filemap::OpenFile_flags::append);
    auto path = make_unique<string>(file->path());
    auto is_append = file->get_flag(gkfs::filemap::OpenFile_flags::append);

    auto ret_update_size = gkfs::rpc::forward_update_metadentry_size(
            *path, count, offset, append_flag);
    auto err = ret_update_size.first;
    auto ret_offset = gkfs::rpc::forward_update_metadentry_size(
            *path, count, offset, is_append);
    auto err = ret_offset.first;
    if(err) {
        LOG(ERROR, "update_metadentry_size() failed with err '{}'", err);
        errno = err;
        return -1;
    }
    auto updated_size = ret_update_size.second;
    if(is_append) {
        // When append is set the EOF is set to the offset
        // forward_update_metadentry_size returns. This is because it is an
        // atomic operation on the server and reserves the space for this append
        if(ret_offset.second == -1) {
            LOG(ERROR,
                "update_metadentry_size() received -1 as starting offset. "
                "This occurs when the staring offset could not be extracted "
                "from RocksDB's merge operations. Inform GekkoFS devs.");
            errno = EIO;
            return -1;
        }
        offset = ret_offset.second;
    }

    auto ret_write = gkfs::rpc::forward_write(*path, buf, append_flag, offset,
                                              count, updated_size);
    auto ret_write = gkfs::rpc::forward_write(*path, buf, offset, count);
    err = ret_write.first;
    if(err) {
        LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err);
        errno = err;
        return -1;
    }
    if(update_pos) {
        // Update offset in file descriptor in the file map
        file->pos(offset + ret_write.second);
    }
    if(static_cast<size_t>(ret_write.second) != count) {
        LOG(WARNING,
            "gkfs::rpc::forward_write() wrote '{}' bytes instead of '{}'",
            ret_write.second, count);
    }
    return ret_write.second; // return written size
}

@@ -916,17 +933,9 @@ gkfs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) {
ssize_t
gkfs_write(int fd, const void* buf, size_t count) {
    auto gkfs_fd = CTX->file_map()->get(fd);
    auto pos = gkfs_fd->pos(); // retrieve the current offset
    if(gkfs_fd->get_flag(gkfs::filemap::OpenFile_flags::append)) {
        gkfs_lseek(gkfs_fd, 0, SEEK_END);
        pos = gkfs_fd->pos(); // Pos should be updated with append
    }
    // call pwrite and update pos
    auto ret = gkfs_pwrite(gkfs_fd, reinterpret_cast<const char*>(buf), count,
                           pos);
    // Update offset in file descriptor in the file map
    if(ret > 0) {
        gkfs_fd->pos(pos + count);
    }
                           gkfs_fd->pos(), true);
    return ret;
}

+2 −11
Original line number Diff line number Diff line
@@ -46,23 +46,17 @@ namespace gkfs::rpc {
 * NOTE: No errno is defined here!
 */

// TODO If we decide to keep this functionality with one segment, the function
// can be merged mostly. Code is mostly redundant

/**
 * Send an RPC request to write from a buffer.
 * @param path
 * @param buf
 * @param append_flag
 * @param in_offset
 * @param write_size
 * @param updated_metadentry_size
 * @return pair<error code, written size>
 */
pair<int, ssize_t>
forward_write(const string& path, const void* buf, const bool append_flag,
              const off64_t in_offset, const size_t write_size,
              const int64_t updated_metadentry_size) {
forward_write(const string& path, const void* buf, const off64_t offset,
              const size_t write_size) {

    // import pow2-optimized arithmetic functions
    using namespace gkfs::utils::arithmetic;
@@ -71,9 +65,6 @@ forward_write(const string& path, const void* buf, const bool append_flag,

    // Calculate chunkid boundaries and numbers so that daemons know in
    // which interval to look for chunks
    off64_t offset =
            append_flag ? in_offset : (updated_metadentry_size - write_size);

    auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
    auto chnk_end = block_index((offset + write_size) - 1,
                                gkfs::config::rpc::chunksize);