Commit ef1df73f authored by Tommaso Tocci's avatar Tommaso Tocci Committed by Marc Vef
Browse files

Introduce MergeOperand baseclass

The new MergeOperand base class can be subclassed in order to implement
new merge-operands with their specific parameters and semantics.
parent 0c1beda2
Loading
Loading
Loading
Loading
+22 −6
Original line number Original line Diff line number Diff line
@@ -7,20 +7,36 @@


namespace rdb = rocksdb;
namespace rdb = rocksdb;


enum class OperandID: char {
    increase_size = 's',
    create = 'c'
};


class IncreaseSizeOperand {
class MergeOperand {
    public:
    public:
        const static char separator;
        constexpr static char operand_id_suffix = ':';
        const static char true_char;
        std::string serialize() const;
        const static char false_char;

    protected:
        std::string serialize_id() const;
        virtual std::string serialize_params() const = 0;
        virtual const OperandID id() const = 0;
};

class IncreaseSizeOperand: public MergeOperand {
    public:
        constexpr const static char separator = ',';
        constexpr const static char true_char = 't';
        constexpr const static char false_char = 'f';


        size_t size;
        size_t size;
        bool append;
        bool append;


        IncreaseSizeOperand(const size_t size, const bool append);
        IncreaseSizeOperand(const size_t size, const bool append);
        IncreaseSizeOperand(const std::string& serialized_op);
        IncreaseSizeOperand(const rdb::Slice& serialized_op);


        std::string serialize() const;
        virtual const OperandID id() const override;
        virtual std::string serialize_params() const override;
};
};


class MetadataMergeOperator: public rocksdb::MergeOperator {
class MetadataMergeOperator: public rocksdb::MergeOperator {
+42 −17
Original line number Original line Diff line number Diff line
#include <daemon/db/merge.hpp>
#include <daemon/db/merge.hpp>




const char IncreaseSizeOperand::separator = ',';
std::string MergeOperand::serialize_id() const {
const char IncreaseSizeOperand::true_char = 't';
    std::string s;
const char IncreaseSizeOperand::false_char = 'f';
    s.reserve(2);
    s += (char)id();
    s += operand_id_suffix;
    return s;
}

std::string MergeOperand::serialize() const {
    std::string s = serialize_id();
    s += serialize_params();
    return s;
}



IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const bool append):
IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, const bool append):
    size(size), append(append) {}
    size(size), append(append) {}


IncreaseSizeOperand::IncreaseSizeOperand(const std::string& serialized_op){
IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op){
    size_t chrs_parsed = 0;
    size_t chrs_parsed = 0;
    size_t read = 0;
    size_t read = 0;


    //Parse size
    //Parse size
    size = std::stoul(&serialized_op.at(chrs_parsed), &read);
    size = std::stoul(serialized_op.data() + chrs_parsed, &read);
    chrs_parsed += read + 1;
    chrs_parsed += read + 1;
    assert(serialized_op.at(chrs_parsed - 1) == separator);
    assert(serialized_op[chrs_parsed - 1] == separator);


    //Parse append flag
    //Parse append flag
    assert(serialized_op.at(chrs_parsed) == false_char ||
    assert(serialized_op[chrs_parsed] == false_char ||
           serialized_op.at(chrs_parsed) == true_char);
           serialized_op[chrs_parsed] == true_char);
    append = (serialized_op.at(chrs_parsed) == false_char) ? false : true;
    append = (serialized_op[chrs_parsed] == false_char) ? false : true;
    //check that we consumed all the input string
    //check that we consumed all the input string
    assert(chrs_parsed + 1 == serialized_op.size());
    assert(chrs_parsed + 1 == serialized_op.size());
}
}


std::string IncreaseSizeOperand::serialize() const {
const OperandID IncreaseSizeOperand::id() const {
    return OperandID::increase_size;
}

std::string IncreaseSizeOperand::serialize_params() const {
    std::string s;
    std::string s;
    s.reserve(3);
    s += std::to_string(size);
    s += std::to_string(size);
    s += this->separator;
    s += this->separator;
    s += (append == false)? false_char : true_char;
    s += (append == false)? false_char : true_char;
    return s;
    return s;
}
}



bool MetadataMergeOperator::FullMergeV2(
bool MetadataMergeOperator::FullMergeV2(
        const MergeOperationInput& merge_in,
        const MergeOperationInput& merge_in,
        MergeOperationOutput* merge_out) const {
        MergeOperationOutput* merge_out) const {
@@ -49,14 +66,22 @@ bool MetadataMergeOperator::FullMergeV2(
    Metadata md{merge_in.key.ToString(), merge_in.existing_value->ToString()};
    Metadata md{merge_in.key.ToString(), merge_in.existing_value->ToString()};
    size_t fsize = md.size();
    size_t fsize = md.size();


    for(const auto& operand: merge_in.operand_list){
    for(const auto& serialized_op: merge_in.operand_list){
        auto op = IncreaseSizeOperand(operand.ToString());
        assert(serialized_op.size() >= 2);
        auto operand_id = static_cast<OperandID>(serialized_op[0]);
        auto parameters = rdb::Slice(serialized_op.data() + 2, serialized_op.size() - 2);

        if(operand_id == OperandID::increase_size){
            auto op = IncreaseSizeOperand(parameters);
            if(op.append){
            if(op.append){
                //append mode, just increment file
                //append mode, just increment file
                fsize += op.size;
                fsize += op.size;
            } else {
            } else {
                fsize = std::max(op.size, fsize);
                fsize = std::max(op.size, fsize);
            }
            }
        } else {
            throw std::runtime_error(fmt::format("Unrecognized merge operand ID: {}", (char)operand_id));
        }
    }
    }


    md.size(fsize);
    md.size(fsize);