Commit a7a1ba64 authored by Tommaso Tocci's avatar Tommaso Tocci Committed by Marc Vef
Browse files

bugfix: Atomic fiile creation

The creation was performed through a simple rocksDB PUT operation.

Multiple creation attempts on the same file would override the
previously inserted metadata, in particular the file size.

The creation is now handled with a new MergeOperand.
Now a creation attampt on an already existing file will be converted in
a NOP.
parent deb9d124
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -31,9 +31,9 @@ public:

    void update_ACM_time(bool a, bool c, bool m);

    std::string to_KVentry();
    std::string to_KVentry() const;

    void serialize(std::string& s);
    void serialize(std::string& s) const;

    //Getter and Setter
    time_t atime() const;
+11 −0
Original line number Diff line number Diff line
@@ -16,6 +16,8 @@ class MergeOperand {
    public:
        constexpr static char operand_id_suffix = ':';
        std::string serialize() const;
        static OperandID get_id(const rdb::Slice& serialized_op);
        static rdb::Slice get_params(const rdb::Slice& serialized_op);

    protected:
        std::string serialize_id() const;
@@ -39,6 +41,15 @@ class IncreaseSizeOperand: public MergeOperand {
        virtual std::string serialize_params() const override;
};

class CreateOperand: public MergeOperand {
    public:
        std::string metadata;
        CreateOperand(const std::string& metadata);

        virtual const OperandID id() const override;
        virtual std::string serialize_params() const override;
};

class MetadataMergeOperator: public rocksdb::MergeOperator {
    public:
        MetadataMergeOperator(){};
+2 −2
Original line number Diff line number Diff line
@@ -126,13 +126,13 @@ void Metadata::update_ACM_time(bool a, bool c, bool m) {
 * Creates a key value metadentry string that is used as a value in the KV store
 * @return
 */
std::string Metadata::to_KVentry() {
std::string Metadata::to_KVentry() const {
    std::string val;
    this->serialize(val);
    return val;
}

void Metadata::serialize(std::string& s) {
void Metadata::serialize(std::string& s) const {
    // The order is important. don't change.
    s += fmt::FormatInt(mode_).c_str(); // add mandatory mode
    s += dentry_val_delim + fmt::FormatInt(size_).c_str(); // add mandatory size
+6 −1
Original line number Diff line number Diff line
@@ -15,7 +15,12 @@ bool db_get_metadentry(const std::string& key, std::string& val) {

bool db_put_metadentry(const std::string& key, const std::string& val) {
    auto db = ADAFS_DATA->rdb();
    return db->Put(ADAFS_DATA->rdb_write_options(), key, val).ok();
    auto cop = CreateOperand(val);
    auto s = db->Merge(ADAFS_DATA->rdb_write_options(), key, cop.serialize());
    if(!s.ok()){
        ADAFS_DATA->spdlogger()->error("Failed to create metadentry size. RDB error: [{}]", s.ToString());
    }
    return s.ok();
}

bool db_delete_metadentry(const std::string& key) {
+40 −9
Original line number Diff line number Diff line
@@ -15,6 +15,14 @@ std::string MergeOperand::serialize() const {
    return s;
}

OperandID MergeOperand::get_id(const rdb::Slice& serialized_op){
        return static_cast<OperandID>(serialized_op[0]);
}

rdb::Slice MergeOperand::get_params(const rdb::Slice& serialized_op){
    assert(serialized_op[1] == operand_id_suffix);
    return rdb::Slice(serialized_op.data() + 2, serialized_op.size() - 2);
}

IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const bool append):
    size(size), append(append) {}
@@ -50,26 +58,47 @@ std::string IncreaseSizeOperand::serialize_params() const {
}


CreateOperand::CreateOperand(const std::string& metadata): metadata(metadata) {}

const OperandID CreateOperand::id() const{
    return OperandID::create;
}

std::string CreateOperand::serialize_params() const {
    return metadata;
}


bool MetadataMergeOperator::FullMergeV2(
        const MergeOperationInput& merge_in,
        MergeOperationOutput* merge_out) const {

    std::string prev_md_value;
    auto ops_it = merge_in.operand_list.cbegin();

    if(merge_in.existing_value == nullptr){
        //The key to operate on doesn't exists in DB

        if(MergeOperand::get_id(ops_it[0]) != OperandID::create){
            throw std::runtime_error("Merge operation failed: key do not exists and first operand is not a creation");
            // TODO use logger to print err info;
            //Log(logger, "Key %s do not exists", existing_value->ToString().c_str());

        return false;
            //return false;
        }
        prev_md_value = MergeOperand::get_params(ops_it[0]).ToString();
        ops_it++;
    } else {
        prev_md_value = merge_in.existing_value->ToString();
    }

    Metadata md{merge_in.key.ToString(), merge_in.existing_value->ToString()};
    Metadata md{merge_in.key.ToString(), prev_md_value};

    size_t fsize = md.size();

    for(const auto& serialized_op: merge_in.operand_list){
    for (; ops_it != merge_in.operand_list.cend(); ++ops_it){
        const rdb::Slice& serialized_op = *ops_it;
        assert(serialized_op.size() >= 2);
        auto operand_id = static_cast<OperandID>(serialized_op[0]);
        auto parameters = rdb::Slice(serialized_op.data() + 2, serialized_op.size() - 2);
        auto operand_id = MergeOperand::get_id(serialized_op);
        auto parameters = MergeOperand::get_params(serialized_op);

        if(operand_id == OperandID::increase_size){
            auto op = IncreaseSizeOperand(parameters);
@@ -79,6 +108,8 @@ bool MetadataMergeOperator::FullMergeV2(
            } else {
                fsize = std::max(op.size, fsize);
            }
        } else if(operand_id == OperandID::create){
            continue;
        } else {
            throw std::runtime_error(fmt::format("Unrecognized merge operand ID: {}", (char)operand_id));
        }