......@@ -65,6 +65,8 @@ PreloadContext::PreloadContext()
char host[255];
gethostname(host, 255);
hostname = host;
PreloadContext::set_replicas(
std::stoi(gkfs::env::get_var(gkfs::env::NUM_REPL, "0")));
}
void
......@@ -452,5 +454,15 @@ PreloadContext::get_hostname() {
return hostname;
}
void
PreloadContext::set_replicas(const int repl) {
replicas_ = repl;
}
int
PreloadContext::get_replicas() {
return replicas_;
}
} // namespace preload
} // namespace gkfs
......@@ -200,16 +200,26 @@ namespace gkfs::utils {
optional<gkfs::metadata::Metadata>
get_metadata(const string& path, bool follow_links) {
std::string attr;
auto err = gkfs::rpc::forward_stat(path, attr);
auto err = gkfs::rpc::forward_stat(path, attr, 0);
// TODO: retry on failure
if(err) {
errno = err;
return {};
auto copy = 1;
while(copy < CTX->get_replicas() + 1 && err) {
LOG(ERROR, "Retrying Stat on replica {} {}", copy, follow_links);
err = gkfs::rpc::forward_stat(path, attr, copy);
copy++;
}
if(err) {
errno = err;
return {};
}
}
#ifdef HAS_SYMLINKS
if(follow_links) {
gkfs::metadata::Metadata md{attr};
while(md.is_link()) {
err = gkfs::rpc::forward_stat(md.target_path(), attr);
err = gkfs::rpc::forward_stat(md.target_path(), attr, 0);
if(err) {
errno = err;
return {};
......
......@@ -34,6 +34,7 @@
#include <common/rpc/distributor.hpp>
#include <common/arithmetic/arithmetic.hpp>
#include <common/rpc/rpc_util.hpp>
#include <unordered_set>
......@@ -42,21 +43,26 @@ using namespace std;
namespace gkfs::rpc {
/*
* This file includes all metadata RPC calls.
* This file includes all data RPC calls.
* NOTE: No errno is defined here!
*/
/**
* Send an RPC request to write from a buffer.
* There is a bitset of 1024 chunks to tell the server
* which chunks to process. Exceeding this value will work without
* replication. Another way is to leverage mercury segments.
* TODO: Decide how to manage a write to a replica that doesn't exist
* @param path
* @param buf
* @param append_flag
* @param write_size
* @param num_copies number of replicas
* @return pair<error code, written size>
*/
pair<int, ssize_t>
forward_write(const string& path, const void* buf, const off64_t offset,
const size_t write_size) {
const size_t write_size, const int8_t num_copies) {
// import pow2-optimized arithmetic functions
using namespace gkfs::utils::arithmetic;
......@@ -69,35 +75,50 @@ forward_write(const string& path, const void* buf, const off64_t offset,
auto chnk_end = block_index((offset + write_size) - 1,
gkfs::config::rpc::chunksize);
auto chnk_total = (chnk_end - chnk_start) + 1;
// Collect all chunk ids within count that have the same destination so
// that those are send in one rpc bulk transfer
std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
// contains the target ids, used to access the target_chnks map.
// First idx is chunk with potential offset
std::vector<uint64_t> targets{};
// targets for the first and last chunk as they need special treatment
uint64_t chnk_start_target = 0;
uint64_t chnk_end_target = 0;
// We need a set to manage replicas.
std::set<uint64_t> chnk_start_target{};
std::set<uint64_t> chnk_end_target{};
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
auto target = CTX->distributor()->locate_data(path, chnk_id);
std::unordered_map<uint64_t, std::vector<uint8_t>> write_ops_vect;
if(target_chnks.count(target) == 0) {
target_chnks.insert(
std::make_pair(target, std::vector<uint64_t>{chnk_id}));
targets.push_back(target);
} else {
target_chnks[target].push_back(chnk_id);
}
// If num_copies is 0, we do the normal write operation. Otherwise
// we process all the replicas.
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
for(auto copy = num_copies ? 1 : 0; copy < num_copies + 1; copy++) {
auto target = CTX->distributor()->locate_data(path, chnk_id, copy);
if(write_ops_vect.find(target) == write_ops_vect.end())
write_ops_vect[target] =
std::vector<uint8_t>(((chnk_total + 7) / 8));
gkfs::rpc::set_bitset(write_ops_vect[target], chnk_id - chnk_start);
if(target_chnks.count(target) == 0) {
target_chnks.insert(
std::make_pair(target, std::vector<uint64_t>{chnk_id}));
targets.push_back(target);
} else {
target_chnks[target].push_back(chnk_id);
}
// set first and last chnk targets
if(chnk_id == chnk_start) {
chnk_start_target = target;
}
// set first and last chnk targets
if(chnk_id == chnk_start) {
chnk_start_target.insert(target);
}
if(chnk_id == chnk_end) {
chnk_end_target = target;
if(chnk_id == chnk_end) {
chnk_end_target.insert(target);
}
}
}
......@@ -133,13 +154,13 @@ forward_write(const string& path, const void* buf, const off64_t offset,
target_chnks[target].size() * gkfs::config::rpc::chunksize;
// receiver of first chunk must subtract the offset from first chunk
if(target == chnk_start_target) {
if(chnk_start_target.end() != chnk_start_target.find(target)) {
total_chunk_size -=
block_overrun(offset, gkfs::config::rpc::chunksize);
}
// receiver of last chunk must subtract
if(target == chnk_end_target &&
if(chnk_end_target.end() != chnk_end_target.find(target) &&
!is_aligned(offset + write_size, gkfs::config::rpc::chunksize)) {
total_chunk_size -= block_underrun(offset + write_size,
gkfs::config::rpc::chunksize);
......@@ -148,7 +169,6 @@ forward_write(const string& path, const void* buf, const off64_t offset,
auto endp = CTX->hosts().at(target);
try {
LOG(DEBUG, "Sending RPC ...");
gkfs::rpc::write_data::input in(
......@@ -158,6 +178,7 @@ forward_write(const string& path, const void* buf, const off64_t offset,
block_overrun(offset, gkfs::config::rpc::chunksize), target,
CTX->hosts().size(),
// number of chunks handled by that destination
gkfs::rpc::compress_bitset(write_ops_vect[target]),
target_chnks[target].size(),
// chunk start id of this write
chnk_start,
......@@ -175,25 +196,26 @@ forward_write(const string& path, const void* buf, const off64_t offset,
ld_network_service->post<gkfs::rpc::write_data>(endp, in));
LOG(DEBUG,
"host: {}, path: \"{}\", chunks: {}, size: {}, offset: {}",
target, path, in.chunk_n(), total_chunk_size, in.offset());
"host: {}, path: \"{}\", chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}",
target, path, chnk_start, chnk_end, in.chunk_n(),
total_chunk_size, in.offset());
} catch(const std::exception& ex) {
LOG(ERROR,
"Unable to send non-blocking rpc for "
"path \"{}\" [peer: {}]",
path, target);
return make_pair(EBUSY, 0);
if(num_copies == 0)
return make_pair(EBUSY, 0);
}
}
// Wait for RPC responses and then get response and add it to out_size
// which is the written size All potential outputs are served to free
// resources regardless of errors, although an errorcode is set.
auto err = 0;
ssize_t out_size = 0;
std::size_t idx = 0;
#ifdef REPLICA_CHECK
std::vector<uint8_t> fill(chnk_total);
auto write_ops = write_ops_vect.begin();
#endif
for(const auto& h : handles) {
try {
// XXX We might need a timeout here to not wait forever for an
......@@ -203,18 +225,52 @@ forward_write(const string& path, const void* buf, const off64_t offset,
if(out.err() != 0) {
LOG(ERROR, "Daemon reported error: {}", out.err());
err = out.err();
} else {
out_size += static_cast<size_t>(out.io_size());
#ifdef REPLICA_CHECK
if(num_copies) {
if(fill.size() == 0) {
fill = write_ops->second;
} else {
for(size_t i = 0; i < fill.size(); i++) {
fill[i] |= write_ops->second[i];
}
}
}
write_ops++;
#endif
}
out_size += static_cast<size_t>(out.io_size());
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
path, targets[idx]);
err = EIO;
}
idx++;
}
// As servers can fail (and we cannot know if the total data is written), we
// send the updated size but check that at least one copy of all chunks are
// processed.
if(num_copies) {
// A bit-wise or should show that all the chunks are written (255)
out_size = write_size;
#ifdef REPLICA_CHECK
for(size_t i = 0; i < fill.size() - 1; i++) {
if(fill[i] != 255) {
err = EIO;
break;
}
}
// Process the leftover bytes
for(uint64_t chnk_id = (chnk_start + (fill.size() - 1) * 8);
chnk_id <= chnk_end; chnk_id++) {
if(!(fill[(chnk_id - chnk_start) / 8] &
(1 << ((chnk_id - chnk_start) % 8)))) {
err = EIO;
break;
}
}
#endif
}
/*
* Typically file systems return the size even if only a part of it was
* written. In our case, we do not keep track which daemon fully wrote its
......@@ -232,11 +288,14 @@ forward_write(const string& path, const void* buf, const off64_t offset,
* @param buf
* @param offset
* @param read_size
* @param num_copies number of copies available (0 is no replication)
* @param failed nodes failed that should not be used
* @return pair<error code, read size>
*/
pair<int, ssize_t>
forward_read(const string& path, void* buf, const off64_t offset,
const size_t read_size) {
const size_t read_size, const int8_t num_copies,
std::set<int8_t>& failed) {
// import pow2-optimized arithmetic functions
using namespace gkfs::utils::arithmetic;
......@@ -246,19 +305,35 @@ forward_read(const string& path, void* buf, const off64_t offset,
auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
auto chnk_end =
block_index((offset + read_size - 1), gkfs::config::rpc::chunksize);
auto chnk_total = (chnk_end - chnk_start) + 1;
// Collect all chunk ids within count that have the same destination so
// that those are send in one rpc bulk transfer
std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
// contains the recipient ids, used to access the target_chnks map.
// First idx is chunk with potential offset
std::vector<uint64_t> targets{};
// targets for the first and last chunk as they need special treatment
uint64_t chnk_start_target = 0;
uint64_t chnk_end_target = 0;
std::unordered_map<uint64_t, std::vector<uint8_t>> read_bitset_vect;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
auto target = CTX->distributor()->locate_data(path, chnk_id);
auto target = CTX->distributor()->locate_data(path, chnk_id, 0);
if(num_copies > 0) {
// If we have some failures we select another copy (randomly).
while(failed.find(target) != failed.end()) {
LOG(DEBUG, "Selecting another node, target: {} down", target);
target = CTX->distributor()->locate_data(path, chnk_id,
rand() % num_copies);
}
}
if(read_bitset_vect.find(target) == read_bitset_vect.end())
read_bitset_vect[target] =
std::vector<uint8_t>(((chnk_total + 7) / 8));
read_bitset_vect[target][(chnk_id - chnk_start) / 8] |=
1 << ((chnk_id - chnk_start) % 8); // set
if(target_chnks.count(target) == 0) {
target_chnks.insert(
......@@ -303,6 +378,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
// TODO(amiranda): This could be simplified by adding a vector of inputs
// to async_engine::broadcast(). This would allow us to avoid manually
// looping over handles as we do below
for(const auto& target : targets) {
// total chunk_size for target
......@@ -334,6 +410,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
// a potential offset
block_overrun(offset, gkfs::config::rpc::chunksize), target,
CTX->hosts().size(),
gkfs::rpc::compress_bitset(read_bitset_vect[target]),
// number of chunks handled by that destination
target_chnks[target].size(),
// chunk start id of this write
......@@ -343,11 +420,12 @@ forward_read(const string& path, void* buf, const off64_t offset,
// total size to write
total_chunk_size, local_buffers);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::read_data>(endp, in));
......@@ -394,9 +472,15 @@ forward_read(const string& path, void* buf, const off64_t offset,
LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
path, targets[idx]);
err = EIO;
// We should get targets[idx] and remove from the list of peers
failed.insert(targets[idx]);
// Then repeat the read with another peer (We repear the full
// read, this can be optimised but it is a cornercase)
}
idx++;
}
/*
* Typically file systems return the size even if only a part of it was
* read. In our case, we do not keep track which daemon fully read its
......@@ -413,11 +497,12 @@ forward_read(const string& path, void* buf, const off64_t offset,
* @param path
* @param current_size
* @param new_size
* @param num_copies Number of replicas
* @return error code
*/
int
forward_truncate(const std::string& path, size_t current_size,
size_t new_size) {
forward_truncate(const std::string& path, size_t current_size, size_t new_size,
const int8_t num_copies) {
// import pow2-optimized arithmetic functions
using namespace gkfs::utils::arithmetic;
......@@ -434,7 +519,9 @@ forward_truncate(const std::string& path, size_t current_size,
std::unordered_set<unsigned int> hosts;
for(unsigned int chunk_id = chunk_start; chunk_id <= chunk_end;
++chunk_id) {
hosts.insert(CTX->distributor()->locate_data(path, chunk_id));
for(auto copy = 0; copy < (num_copies + 1); ++copy) {
hosts.insert(CTX->distributor()->locate_data(path, chunk_id, copy));
}
}
std::vector<hermes::rpc_handle<gkfs::rpc::trunc_data>> handles;
......@@ -450,20 +537,23 @@ forward_truncate(const std::string& path, size_t current_size,
gkfs::rpc::trunc_data::input in(path, new_size);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::trunc_data>(endp, in));
} catch(const std::exception& ex) {
// TODO(amiranda): we should cancel all previously posted requests
// here, unfortunately, Hermes does not support it yet :/
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG(ERROR, "Failed to send request to host: {}", host);
err = EIO;
break; // We need to gather all responses so we can't return here
break; // We need to gather all responses so we can't return
// here
}
}
......@@ -503,20 +593,23 @@ forward_get_chunk_stat() {
gkfs::rpc::chunk_stat::input in(0);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::chunk_stat>(endp, in));
} catch(const std::exception& ex) {
// TODO(amiranda): we should cancel all previously posted requests
// here, unfortunately, Hermes does not support it yet :/
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG(ERROR, "Failed to send request to host: {}", endp.to_string());
err = EBUSY;
break; // We need to gather all responses so we can't return here
break; // We need to gather all responses so we can't return
// here
}
}
......@@ -547,9 +640,11 @@ forward_get_chunk_stat() {
chunk_free += out.chunk_free();
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to get RPC output from host: {}", i);
err = EBUSY;
// Avoid setting err if a server fails.
// err = EBUSY;
}
}
if(err)
return make_pair(err, ChunkStat{});
else
......
......@@ -45,19 +45,30 @@ forward_get_fs_config() {
auto endp = CTX->hosts().at(CTX->local_host_id());
gkfs::rpc::fs_config::output out;
try {
LOG(DEBUG, "Retrieving file system configurations from daemon");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
out = ld_network_service->post<gkfs::rpc::fs_config>(endp).get().at(0);
} catch(const std::exception& ex) {
LOG(ERROR, "Retrieving fs configurations from daemon");
return false;
bool found = false;
size_t idx = 0;
while(!found && idx <= CTX->hosts().size()) {
try {
LOG(DEBUG, "Retrieving file system configurations from daemon");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
out = ld_network_service->post<gkfs::rpc::fs_config>(endp).get().at(
0);
found = true;
} catch(const std::exception& ex) {
LOG(ERROR,
"Retrieving fs configurations from daemon, possible reattempt at peer: {}",
idx);
endp = CTX->hosts().at(idx++);
}
}
if(!found)
return false;
CTX->mountdir(out.mountdir());
LOG(INFO, "Mountdir: '{}'", CTX->mountdir());
......
......@@ -51,12 +51,14 @@ namespace gkfs::rpc {
* Send an RPC for a create request
* @param path
* @param mode
* @param copy Number of replica to create
* @return error code
*/
int
forward_create(const std::string& path, const mode_t mode) {
forward_create(const std::string& path, const mode_t mode, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -81,12 +83,14 @@ forward_create(const std::string& path, const mode_t mode) {
* Send an RPC for a stat request
* @param path
* @param attr
* @param copy metadata replica to read from
* @return error code
*/
int
forward_stat(const std::string& path, string& attr) {
forward_stat(const std::string& path, string& attr, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -121,40 +125,44 @@ forward_stat(const std::string& path, string& attr) {
* This function only attempts data removal if data exists (determined when
* metadata is removed)
* @param path
* @param num_copies Replication scenarios with many replicas
* @return error code
*/
int
forward_remove(const std::string& path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
forward_remove(const std::string& path, const int8_t num_copies) {
int64_t size = 0;
uint32_t mode = 0;
/*
* Send one RPC to metadata destination and remove metadata while retrieving
* size and mode to determine if data needs to removed too
*/
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::remove_metadata>(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
for(auto copy = 0; copy < (num_copies + 1); copy++) {
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
if(out.err())
return out.err();
size = out.size();
mode = out.mode();
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
/*
* Send one RPC to metadata destination and remove metadata while
* retrieving size and mode to determine if data needs to removed too
*/
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::remove_metadata>(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
if(out.err())
return out.err();
size = out.size();
mode = out.mode();
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
}
// if file is not a regular file and it's size is 0, data does not need to
// be removed, thus, we exit
......@@ -167,44 +175,54 @@ forward_remove(const std::string& path) {
// Small files
if(static_cast<std::size_t>(size / gkfs::config::rpc::chunksize) <
CTX->hosts().size()) {
const auto metadata_host_id =
CTX->distributor()->locate_file_metadata(path);
const auto endp_metadata = CTX->hosts().at(metadata_host_id);
try {
LOG(DEBUG, "Sending RPC to host: {}", endp_metadata.to_string());
gkfs::rpc::remove_data::input in(path);
handles.emplace_back(
ld_network_service->post<gkfs::rpc::remove_data>(
endp_metadata, in));
uint64_t chnk_start = 0;
uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
const auto chnk_host_id =
CTX->distributor()->locate_data(path, chnk_id);
if constexpr(gkfs::config::metadata::implicit_data_removal) {
/*
* If the chnk host matches the metadata host the remove
* request as already been sent as part of the metadata
* remove request.
*/
if(chnk_host_id == metadata_host_id)
continue;
}
const auto endp_chnk = CTX->hosts().at(chnk_host_id);
LOG(DEBUG, "Sending RPC to host: {}", endp_chnk.to_string());
for(auto copymd = 0; copymd < (num_copies + 1); copymd++) {
const auto metadata_host_id =
CTX->distributor()->locate_file_metadata(path, copymd);
const auto endp_metadata = CTX->hosts().at(metadata_host_id);
try {
LOG(DEBUG, "Sending RPC to host: {}",
endp_metadata.to_string());
gkfs::rpc::remove_data::input in(path);
handles.emplace_back(
ld_network_service->post<gkfs::rpc::remove_data>(
endp_chnk, in));
endp_metadata, in));
uint64_t chnk_start = 0;
uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end;
chnk_id++) {
for(auto copy = 0; copy < (num_copies + 1); copy++) {
const auto chnk_host_id =
CTX->distributor()->locate_data(path, chnk_id,
copy);
if constexpr(gkfs::config::metadata::
implicit_data_removal) {
/*
* If the chnk host matches the metadata host the
* remove request as already been sent as part of
* the metadata remove request.
*/
if(chnk_host_id == metadata_host_id)
continue;
}
const auto endp_chnk = CTX->hosts().at(chnk_host_id);
LOG(DEBUG, "Sending RPC to host: {}",
endp_chnk.to_string());
handles.emplace_back(
ld_network_service
->post<gkfs::rpc::remove_data>(
endp_chnk, in));
}
}
} catch(const std::exception& ex) {
LOG(ERROR,
"Failed to forward non-blocking rpc request reduced remove requests");
return EBUSY;
}
} catch(const std::exception& ex) {
LOG(ERROR,
"Failed to forward non-blocking rpc request reduced remove requests");
return EBUSY;
}
} else { // "Big" files
for(const auto& endp : CTX->hosts()) {
......@@ -260,12 +278,14 @@ forward_remove(const std::string& path) {
* during a truncate() call.
* @param path
* @param length
* @param copy Target replica (0 original)
* @return error code
*/
int
forward_decr_size(const std::string& path, size_t length) {
forward_decr_size(const std::string& path, size_t length, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -295,14 +315,17 @@ forward_decr_size(const std::string& path, size_t length) {
* @param path
* @param md
* @param md_flags
* @param copy Target replica (0 original)
* @return error code
*/
int
forward_update_metadentry(
const string& path, const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags) {
forward_update_metadentry(const string& path,
const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags,
const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -348,6 +371,7 @@ forward_update_metadentry(
* This marks that this file doesn't have to be accessed directly
* Create a new md with the new name, which should have as value the old name
* All operations should check blockcnt and extract a NOTEXISTS
* The operations does not support replication
* @param oldpath
* @param newpath
* @param md
......@@ -358,8 +382,8 @@ int
forward_rename(const string& oldpath, const string& newpath,
const gkfs::metadata::Metadata& md) {
auto endp =
CTX->hosts().at(CTX->distributor()->locate_file_metadata(oldpath));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(oldpath, 0));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -405,8 +429,8 @@ forward_rename(const string& oldpath, const string& newpath,
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto endp2 =
CTX->hosts().at(CTX->distributor()->locate_file_metadata(newpath));
auto endp2 = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(newpath, 0));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -479,53 +503,85 @@ forward_rename(const string& oldpath, const string& newpath,
/**
* Send an RPC request for an update to the file size.
* This is called during a write() call or similar
* A single correct call is needed only to progress.
* @param path
* @param size
* @param offset
* @param append_flag
* @param num_copies number of replicas
* @return pair<error code, size after update>
*/
pair<int, off64_t>
forward_update_metadentry_size(const string& path, const size_t size,
const off64_t offset, const bool append_flag) {
const off64_t offset, const bool append_flag,
const int num_copies) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::update_metadentry_size>(
endp, path, size, offset,
bool_to_merc_bool(append_flag))
.get()
.at(0);
std::vector<hermes::rpc_handle<gkfs::rpc::update_metadentry_size>> handles;
LOG(DEBUG, "Got response success: {}", out.err());
for(auto copy = 0; copy < num_copies + 1; copy++) {
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
// we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::update_metadentry_size>(
endp, path, size, offset,
bool_to_merc_bool(append_flag)));
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return make_pair(EBUSY, 0);
}
}
auto err = 0;
ssize_t out_size = 0;
auto idx = 0;
bool valid = false;
for(const auto& h : handles) {
try {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
auto out = h.get().at(0);
if(out.err())
return make_pair(out.err(), 0);
else
return make_pair(0, out.ret_size());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return make_pair(EBUSY, 0);
if(out.err() != 0) {
LOG(ERROR, "Daemon {} reported error: {}", idx, out.err());
} else {
valid = true;
out_size = out.ret_size();
}
} catch(const std::exception& ex) {
LOG(ERROR, "Failed to get rpc output");
if(!valid) {
err = EIO;
}
}
idx++;
}
if(!valid)
return make_pair(err, 0);
else
return make_pair(0, out_size);
}
/**
* Send an RPC request to get the current file size.
* This is called during a lseek() call
* @param path
* @param copy Target replica (0 original)
* @return pair<error code, file size>
*/
pair<int, off64_t>
forward_get_metadentry_size(const std::string& path) {
forward_get_metadentry_size(const std::string& path, const int copy) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp = CTX->hosts().at(
CTX->distributor()->locate_file_metadata(path, copy));
try {
LOG(DEBUG, "Sending RPC ...");
......@@ -831,7 +887,8 @@ forward_get_dirents_single(const string& path, int server) {
int
forward_mk_symlink(const std::string& path, const std::string& target_path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
auto endp =
CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0));
try {
LOG(DEBUG, "Sending RPC ...");
......
......@@ -47,27 +47,34 @@ SimpleHashDistributor::localhost() const {
return localhost_;
}
unsigned int
SimpleHashDistributor::hosts_size() const {
return hosts_size_;
}
host_t
SimpleHashDistributor::locate_data(const string& path,
const chunkid_t& chnk_id) const {
return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
const int num_copy) const {
return (str_hash(path + ::to_string(chnk_id)) + num_copy) % hosts_size_;
}
host_t
SimpleHashDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) {
unsigned int hosts_size,
const int num_copy) {
if(hosts_size_ != hosts_size) {
hosts_size_ = hosts_size;
all_hosts_ = std::vector<unsigned int>(hosts_size);
::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}
return str_hash(path + ::to_string(chnk_id)) % hosts_size_;
return (str_hash(path + ::to_string(chnk_id)) + num_copy) % hosts_size_;
}
host_t
SimpleHashDistributor::locate_file_metadata(const string& path) const {
return str_hash(path) % hosts_size_;
SimpleHashDistributor::locate_file_metadata(const string& path,
const int num_copy) const {
return (str_hash(path) + num_copy) % hosts_size_;
}
::vector<host_t>
......@@ -83,14 +90,20 @@ LocalOnlyDistributor::localhost() const {
return localhost_;
}
unsigned int
LocalOnlyDistributor::hosts_size() const {
return hosts_size_;
}
host_t
LocalOnlyDistributor::locate_data(const string& path,
const chunkid_t& chnk_id) const {
LocalOnlyDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
const int num_copy) const {
return localhost_;
}
host_t
LocalOnlyDistributor::locate_file_metadata(const string& path) const {
LocalOnlyDistributor::locate_file_metadata(const string& path,
const int num_copy) const {
return localhost_;
}
......@@ -110,24 +123,32 @@ ForwarderDistributor::localhost() const {
return fwd_host_;
}
unsigned int
ForwarderDistributor::hosts_size() const {
return hosts_size_;
}
host_t
ForwarderDistributor::locate_data(const std::string& path,
const chunkid_t& chnk_id) const {
const chunkid_t& chnk_id,
const int num_copy) const {
return fwd_host_;
}
host_t
ForwarderDistributor::locate_data(const std::string& path,
const chunkid_t& chnk_id,
unsigned int host_size) {
unsigned int host_size, const int num_copy) {
return fwd_host_;
}
host_t
ForwarderDistributor::locate_file_metadata(const std::string& path) const {
return str_hash(path) % hosts_size_;
ForwarderDistributor::locate_file_metadata(const std::string& path,
const int num_copy) const {
return (str_hash(path) + num_copy) % hosts_size_;
}
std::vector<host_t>
ForwarderDistributor::locate_directory_metadata(const std::string& path) const {
return all_hosts_;
......@@ -213,21 +234,26 @@ GuidedDistributor::localhost() const {
return localhost_;
}
unsigned int
GuidedDistributor::hosts_size() const {
return hosts_size_;
}
host_t
GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
unsigned int hosts_size) {
unsigned int hosts_size, const int num_copy) {
if(hosts_size_ != hosts_size) {
hosts_size_ = hosts_size;
all_hosts_ = std::vector<unsigned int>(hosts_size);
::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}
return (locate_data(path, chnk_id));
return (locate_data(path, chnk_id, num_copy));
}
host_t
GuidedDistributor::locate_data(const string& path,
const chunkid_t& chnk_id) const {
GuidedDistributor::locate_data(const string& path, const chunkid_t& chnk_id,
const int num_copy) const {
auto it = map_interval.find(path);
if(it != map_interval.end()) {
auto it_f = it->second.first.IsInsideInterval(chnk_id);
......@@ -245,14 +271,16 @@ GuidedDistributor::locate_data(const string& path,
}
auto locate = path + ::to_string(chnk_id);
return str_hash(locate) % hosts_size_;
return (str_hash(locate) + num_copy) % hosts_size_;
}
host_t
GuidedDistributor::locate_file_metadata(const string& path) const {
return str_hash(path) % hosts_size_;
GuidedDistributor::locate_file_metadata(const string& path,
const int num_copy) const {
return (str_hash(path) + num_copy) % hosts_size_;
}
::vector<host_t>
GuidedDistributor::locate_directory_metadata(const string& path) const {
return all_hosts_;
......
......@@ -36,6 +36,7 @@ extern "C" {
#include <system_error>
using namespace std;
namespace gkfs::rpc {
......@@ -104,4 +105,111 @@ get_host_by_name(const string& hostname) {
}
#endif
/**
* @brief Get the bit from a bit vector
*
* @param data
* @param position
* @return the bit
*/
bool
get_bitset(const std::vector<uint8_t>& data, const uint16_t position) {
return (data[(position) / 8] & 1 << ((position) % 8));
}
/**
* @brief Get the bit from a bit vector
*
* @param data
* @param position
*/
void
set_bitset(std::vector<uint8_t>& data, const uint16_t position) {
data[(position) / 8] |= 1 << ((position) % 8); // set
}
std::string
base64_encode(const std::vector<uint8_t>& data) {
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
std::ostringstream encoded;
uint32_t buffer = 0;
int padding = 0;
for(uint8_t byte : data) {
buffer = (buffer << 8) | byte;
padding += 8;
while(padding >= 6) {
padding -= 6;
encoded << base64_chars[(buffer >> padding) & 0x3F];
}
}
if(padding > 0) {
buffer <<= 6 - padding;
encoded << base64_chars[buffer & 0x3F];
}
while(encoded.str().length() % 4 != 0) {
encoded << '=';
}
return encoded.str();
}
std::vector<uint8_t>
base64_decode(const std::string& encoded) {
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
std::vector<uint8_t> data;
uint32_t buffer = 0;
int padding = 0;
size_t count = 0;
for(char c : encoded) {
if(c == '=')
break;
std::size_t value = base64_chars.find(c);
if(value == std::string::npos)
continue;
buffer = (buffer << 6) | value;
padding += 6;
if(padding >= 8) {
padding -= 8;
data.push_back(static_cast<uint8_t>((buffer >> padding) & 0xFF));
count++;
}
}
// Handle padding characters
if(padding > 0 && padding < 6 && (buffer & ((1 << padding) - 1)) == 0) {
// Remove the padding bits
buffer >>= padding;
padding = 0;
data.push_back(static_cast<uint8_t>((buffer >> 8) & 0xFF));
count++;
}
if(count == 0 || padding % 8 != 0)
return {};
return data;
}
std::string
compress_bitset(const std::vector<uint8_t>& bytes) {
return base64_encode(bytes);
}
std::vector<uint8_t>
decompress_bitset(const std::string& compressedString) {
return base64_decode(compressedString);
}
} // namespace gkfs::rpc
\ No newline at end of file
......@@ -40,6 +40,7 @@
#include <daemon/ops/data.hpp>
#include <common/rpc/rpc_types.hpp>
#include <common/rpc/rpc_util.hpp>
#include <common/rpc/distributor.hpp>
#include <common/arithmetic/arithmetic.hpp>
#include <common/statistics/stats.hpp>
......@@ -51,9 +52,9 @@
#define AGIOS_WRITE 1
#define AGIOS_SERVER_ID_IGNORE 0
#endif
using namespace std;
namespace {
/**
......@@ -115,6 +116,8 @@ rpc_srv_write(hg_handle_t handle) {
__func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
in.total_chunk_size, bulk_size, in.offset);
std::vector<uint8_t> write_ops_vect =
gkfs::rpc::decompress_bitset(in.wbitset);
#ifdef GKFS_ENABLE_AGIOS
int* data;
......@@ -228,9 +231,9 @@ rpc_srv_write(hg_handle_t handle) {
chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
chnk_id_file++) {
// Continue if chunk does not hash to this host
#ifndef GKFS_ENABLE_FORWARDING
if(RPC_DATA->distributor()->locate_data(in.path, chnk_id_file,
host_size) != host_id) {
if(!(gkfs::rpc::get_bitset(write_ops_vect,
chnk_id_file - in.chunk_start))) {
GKFS_DATA->spdlogger()->trace(
"{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
__func__, chnk_id_file, host_id, chnk_id_curr);
......@@ -240,7 +243,6 @@ rpc_srv_write(hg_handle_t handle) {
if(GKFS_DATA->enable_chunkstats()) {
GKFS_DATA->stats()->add_write(in.path, chnk_id_file);
}
#endif
chnk_ids_host[chnk_id_curr] =
chnk_id_file; // save this id to host chunk list
......@@ -417,7 +419,8 @@ rpc_srv_read(hg_handle_t handle) {
"{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
__func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
in.total_chunk_size, bulk_size, in.offset);
std::vector<uint8_t> read_bitset_vect =
gkfs::rpc::decompress_bitset(in.wbitset);
#ifdef GKFS_ENABLE_AGIOS
int* data;
ABT_eventual eventual = ABT_EVENTUAL_NULL;
......@@ -485,10 +488,9 @@ rpc_srv_read(hg_handle_t handle) {
__func__);
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
#ifndef GKFS_ENABLE_FORWARDING
auto const host_id = in.host_id;
auto const host_size = in.host_size;
#endif
auto path = make_shared<string>(in.path);
// chnk_ids used by this host
vector<uint64_t> chnk_ids_host(in.chunk_n);
......@@ -519,9 +521,11 @@ rpc_srv_read(hg_handle_t handle) {
chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
chnk_id_file++) {
// Continue if chunk does not hash to this host
#ifndef GKFS_ENABLE_FORWARDING
if(RPC_DATA->distributor()->locate_data(in.path, chnk_id_file,
host_size) != host_id) {
// We only check if we are not using replicas
if(!(gkfs::rpc::get_bitset(read_bitset_vect,
chnk_id_file - in.chunk_start))) {
GKFS_DATA->spdlogger()->trace(
"{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
__func__, chnk_id_file, host_id, chnk_id_curr);
......@@ -530,7 +534,7 @@ rpc_srv_read(hg_handle_t handle) {
if(GKFS_DATA->enable_chunkstats()) {
GKFS_DATA->stats()->add_read(in.path, chnk_id_file);
}
#endif
chnk_ids_host[chnk_id_curr] =
chnk_id_file; // save this id to host chunk list
......@@ -597,6 +601,10 @@ rpc_srv_read(hg_handle_t handle) {
GKFS_DATA->spdlogger()->warn(
"{}() Not all chunks were detected!!! Size left {}", __func__,
chnk_size_left_host);
if(chnk_size_left_host == in.total_chunk_size)
return HG_CANCELED;
/*
* 4. Read task results and accumulate in out.io_size
*/
......
......@@ -39,23 +39,23 @@ TEST_CASE( "Guided distributor Testing", "[Distributor]" ) {
// The distributor should return 3 for all the tested files
auto d = gkfs::rpc::GuidedDistributor();
REQUIRE( d.locate_data("/t.c01",1,10) == 3 );
REQUIRE( d.locate_data("/t.c02",1,10) == 3 );
REQUIRE( d.locate_data("/t.c03",1,10) == 3 );
REQUIRE( d.locate_data("/t.c04",1,10) == 3 );
REQUIRE( d.locate_data("/t.c05",1,10) == 3 );
REQUIRE( d.locate_data("/t.c06",1,10) == 3 );
REQUIRE( d.locate_data("/t.c07",1,10) == 3 );
REQUIRE( d.locate_data("/t.c01",1,10,0) == 3 );
REQUIRE( d.locate_data("/t.c02",1,10,0) == 3 );
REQUIRE( d.locate_data("/t.c03",1,10,0) == 3 );
REQUIRE( d.locate_data("/t.c04",1,10,0) == 3 );
REQUIRE( d.locate_data("/t.c05",1,10,0) == 3 );
REQUIRE( d.locate_data("/t.c06",1,10,0) == 3 );
REQUIRE( d.locate_data("/t.c07",1,10,0) == 3 );
// Next result is random, but with the same seed is consistent
// We ask for chunk 5 that is distributed randomly between the
// 10 servers.
REQUIRE ( (d.locate_data("/t.c01",5,10) +
d.locate_data("/t.c02",5,10) +
d.locate_data("/t.c03",5,10) +
d.locate_data("/t.c04",5,10) +
d.locate_data("/t.c05",5,10) +
d.locate_data("/t.c06",5,10) +
d.locate_data("/t.c07",5,10) ) == 42);
REQUIRE ( (d.locate_data("/t.c01",5,10,0) +
d.locate_data("/t.c02",5,10,0) +
d.locate_data("/t.c03",5,10,0) +
d.locate_data("/t.c04",5,10,0) +
d.locate_data("/t.c05",5,10,0) +
d.locate_data("/t.c06",5,10,0) +
d.locate_data("/t.c07",5,10,0) ) == 42);
}
}