Commit a3ba1f97 authored by Tommaso Tocci's avatar Tommaso Tocci Committed by Marc Vef
Browse files

Atomic file size update

In order to make the file-size-update operation atomic it is necessary
to make the following action atomic:

 - READ old size value from rocksDB
 - COMPUTE new size based on the write operation
 - WRITE back the new value rocksDB

We use rocksDB merge operation [1] in order to implement the atomic update.

[1]: https://github.com/facebook/rocksdb/wiki/Merge-Operator
parent 716bc755
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -33,6 +33,8 @@ public:

    std::string to_KVentry();

    void serialize(std::string& s);

    //Getter and Setter
    time_t atime() const;

+3 −0
Original line number Diff line number Diff line
@@ -16,6 +16,9 @@ bool db_is_dir_entry(const std::string& dir_path);

bool db_update_metadentry(const std::string& old_key, const std::string& new_key, const std::string& val);

bool db_update_metadentry_size(const std::string& key,
        size_t size, off64_t offset, bool append);

void db_iterate_all_entries();

#endif //IFS_DB_OPS_HPP
+44 −0
Original line number Diff line number Diff line
#ifndef DB_MERGE_HPP
#define DB_MERGE_HPP


#include "rocksdb/merge_operator.h"
#include <daemon/classes/metadata.hpp>

namespace rdb = rocksdb;


class UpdateSizeOperand {
    public:
        const static char separator;
        const static char true_char;
        const static char false_char;

        size_t size;
        off64_t offset;
        bool append;

        UpdateSizeOperand(const size_t size, const off64_t offset, const bool append);
        UpdateSizeOperand(const std::string& serialized_op);

        std::string serialize() const;
};

class MetadataMergeOperator: public rocksdb::MergeOperator {
    public:
        MetadataMergeOperator(){};
        virtual ~MetadataMergeOperator(){};
        virtual bool FullMergeV2(const MergeOperationInput& merge_in,
                MergeOperationOutput* merge_out) const override;

        virtual bool PartialMergeMulti(const rdb::Slice& key,
                const std::deque<rdb::Slice>& operand_list,
                std::string* new_value, rdb::Logger* logger) const override;

        virtual const char* Name() const override;

        virtual bool AllowSingleOperand() const override;
};


#endif // DB_MERGE_HPP
+2 −0
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ set(DAEMON_SRC
    classes/metadata.cpp
    db/db_util.cpp
    db/db_ops.cpp
    db/merge.cpp
    handler/h_metadentry.cpp
    handler/h_data.cpp
    handler/h_preload.cpp
@@ -29,6 +30,7 @@ set(DAEMON_HEADERS
    ../../include/daemon/classes/rpc_data.hpp
    ../../include/daemon/db/db_ops.hpp
    ../../include/daemon/db/db_util.hpp
    ../../include/daemon/db/merge.hpp
    ../../include/daemon/handler/rpc_defs.hpp
    )
add_executable(adafs_daemon ${DAEMON_SRC} ${DAEMON_HEADERS})
+6 −19
Original line number Diff line number Diff line
@@ -108,31 +108,18 @@ int get_metadentry_size(const string& path, size_t& ret_size) {
 * @return the updated size
 */
int update_metadentry_size(const string& path, size_t io_size, off64_t offset, bool append,  size_t& read_size) {
    // XXX This function will be replaced soon by the rocksdb function - just to test for the random IO
#ifdef LOG_TRACE
    db_iterate_all_entries();
#endif
    string val;
    auto err = db_get_metadentry(path, val);
    if (!err || val.empty()) {
        return ENOENT;
    }
    Metadata md{path, val};
    // update io_size
    if (append)
        md.size(md.size() + io_size);
    else { // if no append but io_size exceeds the file's size, update the size correspondingly
        if (io_size + static_cast<unsigned long>(offset) > md.size())
            md.size(io_size + offset);
        else { // if not keep the current size
            read_size = md.size();
            return 0;
        }
    auto err = db_update_metadentry_size(path, io_size, offset, append);
    if (!err) {
        return EBUSY;
    }
    read_size = db_update_metadentry(path, path, md.to_KVentry()) ? md.size() : -1; // update database atomically
#ifdef LOG_TRACE
    db_iterate_all_entries();
#endif
    //XXX This breaks append writes, needs to be fixed
    read_size = 0;
    return 0;
}

Loading