Line data Source code
1 : /* 2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain 3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany 4 : 5 : This software was partially supported by the 6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). 7 : 8 : This software was partially supported by the 9 : ADA-FS project under the SPPEXA project funded by the DFG. 10 : 11 : This file is part of GekkoFS. 12 : 13 : GekkoFS is free software: you can redistribute it and/or modify 14 : it under the terms of the GNU General Public License as published by 15 : the Free Software Foundation, either version 3 of the License, or 16 : (at your option) any later version. 17 : 18 : GekkoFS is distributed in the hope that it will be useful, 19 : but WITHOUT ANY WARRANTY; without even the implied warranty of 20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 21 : GNU General Public License for more details. 22 : 23 : You should have received a copy of the GNU General Public License 24 : along with GekkoFS. If not, see <https://www.gnu.org/licenses/>. 25 : 26 : SPDX-License-Identifier: GPL-3.0-or-later 27 : */ 28 : 29 : #include <daemon/backend/metadata/merge.hpp> 30 : #include <stdexcept> 31 : 32 : using namespace std; 33 : 34 : namespace gkfs::metadata { 35 : 36 : string 37 1141 : MergeOperand::serialize_id() const { 38 1141 : string s; 39 1141 : s.reserve(2); 40 1141 : s += (char) id(); // TODO check if static_cast can be used 41 1141 : s += operand_id_suffix; 42 1141 : return s; 43 : } 44 : 45 : string 46 1141 : MergeOperand::serialize() const { 47 1141 : string s = serialize_id(); 48 2282 : s += serialize_params(); 49 1141 : return s; 50 : } 51 : 52 : OperandID 53 2473 : MergeOperand::get_id(const rdb::Slice& serialized_op) { 54 2473 : return static_cast<OperandID>(serialized_op[0]); 55 : } 56 : 57 : rdb::Slice 58 2473 : MergeOperand::get_params(const rdb::Slice& serialized_op) { 59 2473 : assert(serialized_op[1] == operand_id_suffix); 60 2473 : return {serialized_op.data() + 2, serialized_op.size() - 2}; 61 : } 62 : 63 38 : IncreaseSizeOperand::IncreaseSizeOperand(const size_t size) 64 38 : : size_(size), merge_id_(0), append_(false) {} 65 : 66 3 : IncreaseSizeOperand::IncreaseSizeOperand(const size_t size, 67 : const uint16_t merge_id, 68 3 : const bool append) 69 3 : : size_(size), merge_id_(merge_id), append_(append) {} 70 : 71 116 : IncreaseSizeOperand::IncreaseSizeOperand(const rdb::Slice& serialized_op) { 72 116 : size_t read = 0; 73 : // Parse size 74 232 : size_ = std::stoul(serialized_op.data(), &read); 75 131 : if(read + 1 == serialized_op.size() || 76 15 : serialized_op[read] == serialize_end) { 77 101 : merge_id_ = 0; 78 101 : append_ = false; 79 101 : return; 80 : } 81 15 : assert(serialized_op[read] == serialize_sep); 82 : // Parse merge id 83 15 : merge_id_ = static_cast<uint16_t>( 84 30 : std::stoul(serialized_op.data() + read + 1, nullptr)); 85 15 : append_ = true; 86 : } 87 : 88 : OperandID 89 41 : IncreaseSizeOperand::id() const { 90 41 : return OperandID::increase_size; 91 : } 92 : 93 : string 94 41 : IncreaseSizeOperand::serialize_params() const { 95 : // serialize_end avoids rogue characters in the serialized string 96 41 : if(append_) 97 3 : return fmt::format("{}{}{}{}", size_, serialize_sep, merge_id_, 98 6 : serialize_end); 99 : else { 100 76 : return fmt::format("{}{}", size_, serialize_end); 101 : } 102 : } 103 : 104 : 105 3 : DecreaseSizeOperand::DecreaseSizeOperand(const size_t size) : size_(size) {} 106 : 107 10 : DecreaseSizeOperand::DecreaseSizeOperand(const rdb::Slice& serialized_op) { 108 : // Parse size 109 10 : size_t read = 0; 110 : // we need to convert serialized_op to a string because it doesn't contain 111 : // the leading slash needed by stoul 112 20 : size_ = ::stoul(serialized_op.ToString(), &read); 113 : // check that we consumed all the input string 114 10 : assert(read == serialized_op.size()); 115 10 : } 116 : 117 : OperandID 118 3 : DecreaseSizeOperand::id() const { 119 3 : return OperandID::decrease_size; 120 : } 121 : 122 : string 123 3 : DecreaseSizeOperand::serialize_params() const { 124 3 : return ::to_string(size_); 125 : } 126 : 127 : 128 1097 : CreateOperand::CreateOperand(const string& metadata) : metadata(metadata) {} 129 : 130 : OperandID 131 1097 : CreateOperand::id() const { 132 1097 : return OperandID::create; 133 : } 134 : 135 : string 136 1097 : CreateOperand::serialize_params() const { 137 1097 : return metadata; 138 : } 139 : 140 : /** 141 : * @internal 142 : * Merges all operands in chronological order for the same key. 143 : * 144 : * This is called before each Get() operation, among others. Therefore, it is 145 : * not possible to return a result for a specific merge operand. The return as 146 : * well as merge_out->new_value is for RocksDB internals The new value is the 147 : * merged value of multiple value that is written to one key. 148 : * 149 : * Append operations receive special treatment here as the corresponding write 150 : * function that triggered the size update needs the starting offset. In 151 : * parallel append operations this is crucial. This is done by accessing a mutex 152 : * protected std::map which may incur performance overheads for append 153 : * operations. 154 : * @endinternal 155 : */ 156 : bool 157 2353 : MetadataMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, 158 : MergeOperationOutput* merge_out) const { 159 : 160 2353 : string prev_md_value; 161 2353 : auto ops_it = merge_in.operand_list.cbegin(); 162 2353 : if(merge_in.existing_value == nullptr) { 163 : // The key to operate on doesn't exists in DB 164 2347 : if(MergeOperand::get_id(ops_it[0]) != OperandID::create) { 165 0 : throw ::runtime_error( 166 0 : "Merge operation failed: key do not exists and first operand is not a creation"); 167 : } 168 2347 : prev_md_value = MergeOperand::get_params(ops_it[0]).ToString(); 169 2347 : ops_it++; 170 : } else { 171 6 : prev_md_value = merge_in.existing_value->ToString(); 172 : } 173 : 174 4706 : Metadata md{prev_md_value}; 175 : 176 2353 : size_t fsize = md.size(); 177 : 178 2479 : for(; ops_it != merge_in.operand_list.cend(); ++ops_it) { 179 126 : const rdb::Slice& serialized_op = *ops_it; 180 126 : assert(serialized_op.size() >= 2); 181 126 : auto operand_id = MergeOperand::get_id(serialized_op); 182 126 : auto parameters = MergeOperand::get_params(serialized_op); 183 : 184 126 : if constexpr(gkfs::config::metadata::use_mtime) { 185 126 : md.update_mtime_now(); 186 : } 187 : 188 126 : if(operand_id == OperandID::increase_size) { 189 116 : auto op = IncreaseSizeOperand(parameters); 190 116 : if(op.append()) { 191 15 : auto curr_offset = fsize; 192 : // append mode, just increment file size 193 15 : fsize += op.size(); 194 : // save the offset where this append operation should start 195 : // it is retrieved later in RocksDBBackend::increase_size_impl() 196 15 : GKFS_METADATA_MOD->append_offset_reserve_put(op.merge_id(), 197 : curr_offset); 198 : } else { 199 101 : fsize = ::max(op.size(), fsize); 200 : } 201 10 : } else if(operand_id == OperandID::decrease_size) { 202 10 : auto op = DecreaseSizeOperand(parameters); 203 10 : assert(op.size() < fsize); // we assume no concurrency here 204 10 : fsize = op.size(); 205 0 : } else if(operand_id == OperandID::create) { 206 0 : continue; 207 : } else { 208 0 : throw ::runtime_error("Unrecognized merge operand ID: " + 209 0 : (char) operand_id); 210 : } 211 : } 212 : 213 2353 : md.size(fsize); 214 2353 : merge_out->new_value = md.serialize(); 215 4706 : return true; 216 : } 217 : 218 : bool 219 1074 : MetadataMergeOperator::PartialMergeMulti( 220 : const rdb::Slice& key, const ::deque<rdb::Slice>& operand_list, 221 : string* new_value, rdb::Logger* logger) const { 222 1074 : return false; 223 : } 224 : 225 : const char* 226 132 : MetadataMergeOperator::Name() const { 227 132 : return "MetadataMergeOperator"; 228 : } 229 : 230 : bool 231 33 : MetadataMergeOperator::AllowSingleOperand() const { 232 33 : return true; 233 : } 234 : 235 : } // namespace gkfs::metadata