Program Listing for File merge.cpp
↰ Return to documentation for file (src/daemon/backend/metadata/merge.cpp
)
/*
Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS.
GekkoFS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
GekkoFS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
SPDX-License-Identifier: GPL-3.0-or-later
*/
#include <daemon/backend/metadata/merge.hpp>
#include <stdexcept>
using namespace std;
namespace gkfs::metadata {
string
MergeOperand::serialize_id() const {
string s;
s.reserve(2);
s += (char) id(); // TODO check if static_cast can be used
s += operand_id_suffix;
return s;
}
string
MergeOperand::serialize() const {
string s = serialize_id();
s += serialize_params();
return s;
}
OperandID
MergeOperand::get_id(const rdb::Slice& serialized_op) {
return static_cast<OperandID>(serialized_op[0]);
}
rdb::Slice
MergeOperand::get_params(const rdb::Slice& serialized_op) {
assert(serialized_op[1] == operand_id_suffix);
return {serialized_op.data() + 2, serialized_op.size() - 2};
}
IncreaseSizeOperand::IncreaseSizeOperand(const size_t size)
: size_(size), merge_id_(0), append_(false) {}
IncreaseSizeOperand::IncreaseSizeOperand(const size_t size,
const uint16_t merge_id,
const bool append)
: size_(size), merge_id_(merge_id), append_(append) {}
IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) {
size_t read = 0;
// Parse size
size_ = std::stoul(serialized_op.data(), &read);
if(read + 1 == serialized_op.size() ||
serialized_op[read] == serialize_end) {
merge_id_ = 0;
append_ = false;
return;
}
assert(serialized_op[read] == serialize_sep);
// Parse merge id
merge_id_ = static_cast<uint16_t>(
std::stoul(serialized_op.data() + read + 1, nullptr));
append_ = true;
}
OperandID
IncreaseSizeOperand::id() const {
return OperandID::increase_size;
}
string
IncreaseSizeOperand::serialize_params() const {
// serialize_end avoids rogue characters in the serialized string
if(append_)
return fmt::format("{}{}{}{}", size_, serialize_sep, merge_id_,
serialize_end);
else {
return fmt::format("{}{}", size_, serialize_end);
}
}
DecreaseSizeOperand::DecreaseSizeOperand(const size_t size) : size_(size) {}
DecreaseSizeOperand::DecreaseSizeOperand(const rdb::Slice& serialized_op) {
// Parse size
size_t read = 0;
// we need to convert serialized_op to a string because it doesn't contain
// the leading slash needed by stoul
size_ = ::stoul(serialized_op.ToString(), &read);
// check that we consumed all the input string
assert(read == serialized_op.size());
}
OperandID
DecreaseSizeOperand::id() const {
return OperandID::decrease_size;
}
string
DecreaseSizeOperand::serialize_params() const {
return ::to_string(size_);
}
CreateOperand::CreateOperand(const string& metadata) : metadata(metadata) {}
OperandID
CreateOperand::id() const {
return OperandID::create;
}
string
CreateOperand::serialize_params() const {
return metadata;
}
bool
MetadataMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
string prev_md_value;
auto ops_it = merge_in.operand_list.cbegin();
if(merge_in.existing_value == nullptr) {
// The key to operate on doesn't exists in DB
if(MergeOperand::get_id(ops_it[0]) != OperandID::create) {
throw ::runtime_error(
"Merge operation failed: key do not exists and first operand is not a creation");
}
prev_md_value = MergeOperand::get_params(ops_it[0]).ToString();
ops_it++;
} else {
prev_md_value = merge_in.existing_value->ToString();
}
Metadata md{prev_md_value};
size_t fsize = md.size();
for(; ops_it != merge_in.operand_list.cend(); ++ops_it) {
const rdb::Slice& serialized_op = *ops_it;
assert(serialized_op.size() >= 2);
auto operand_id = MergeOperand::get_id(serialized_op);
auto parameters = MergeOperand::get_params(serialized_op);
if constexpr(gkfs::config::metadata::use_mtime) {
md.update_mtime_now();
}
if(operand_id == OperandID::increase_size) {
auto op = IncreaseSizeOperand(parameters);
if(op.append()) {
auto curr_offset = fsize;
// append mode, just increment file size
fsize += op.size();
// save the offset where this append operation should start
// it is retrieved later in RocksDBBackend::increase_size_impl()
GKFS_METADATA_MOD->append_offset_reserve_put(op.merge_id(),
curr_offset);
} else {
fsize = ::max(op.size(), fsize);
}
} else if(operand_id == OperandID::decrease_size) {
auto op = DecreaseSizeOperand(parameters);
assert(op.size() < fsize); // we assume no concurrency here
fsize = op.size();
} else if(operand_id == OperandID::create) {
continue;
} else {
throw ::runtime_error("Unrecognized merge operand ID: " +
(char) operand_id);
}
}
md.size(fsize);
merge_out->new_value = md.serialize();
return true;
}
bool
MetadataMergeOperator::PartialMergeMulti(
const rdb::Slice& key, const ::deque<rdb::Slice>& operand_list,
string* new_value, rdb::Logger* logger) const {
return false;
}
const char*
MetadataMergeOperator::Name() const {
return "MetadataMergeOperator";
}
bool
MetadataMergeOperator::AllowSingleOperand() const {
return true;
}
} // namespace gkfs::metadata