Commit 68994d32 authored by Ramon Nou's avatar Ramon Nou Committed by Ramon Nou
Browse files

Added ECC Distribution in client

Parity calculation (WIP)

Reed Solomon  coding (WIP)
parent b4a5589e
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -59,6 +59,12 @@ forward_truncate(const std::string& path, size_t current_size, size_t new_size,

std::pair<int, ChunkStat>
forward_get_chunk_stat();
#define GKFS_USE_ECC_DISTRIBUTION 1
#ifdef GKFS_USE_ECC_DISTRIBUTION
std::pair <uint64_t, uint64_t > calc_op_chunks(const std::string& path, const bool append_flag,
              const off64_t in_offset, const size_t write_size,
              const int64_t updated_metadentry_size);
#endif

} // namespace gkfs::rpc

+1 −1
Original line number Diff line number Diff line
@@ -36,7 +36,7 @@
#include <unordered_map>
#include <fstream>
#include <map>

#define GKFS_USE_ECC_DISTRIBUTION 1
namespace gkfs::rpc {

using chunkid_t = unsigned int;
+80 −10
Original line number Diff line number Diff line
@@ -49,6 +49,8 @@ extern "C" {
#include <sys/statvfs.h>
}

#include <jerasure.h>
#include <reed_sol.h>
using namespace std;

/*
@@ -915,7 +917,82 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
    auto ret_write = gkfs::rpc::forward_write(*path, buf, offset, count, 0);
    err = ret_write.first;
    write_size = ret_write.second;
#define GKFS_USE_ECC_DISTRIBUTION 1
#ifdef GKFS_USE_ECC_DISTRIBUTION
    // Process ECC calculation

    // 0 - Involved chunks:

    auto chunks = gkfs::rpc::calc_op_chunks(*path, append_flag, offset, count,
                                            updated_size);

    std::set<uint64_t> chunk_set;

    // For each chunk we will have a set of chunks involved on that calculation
    // [0] [1] [2] [3] [4] [n-p] [p1] [p2]
    // [n-p+1] ....
    // i.e. : [0] -> 1,2,3,4,n-p
    // i.e : [4] -> 0,1,2,3,n-p
    // i.e : [n-p+1] ->
    // 3 data serv
    // (chunk / data_servers)*data_servers --> Initial row chunk
    // Involved : From initial to ... initial + data_servers
    if((uint64_t) updated_size >=
       (uint64_t) CTX->hosts().size() * gkfs::config::rpc::chunksize) {
        auto data_servers = CTX->hosts().size() - CTX->get_replicas();
        for(auto i = chunks.first; i <= chunks.second; ++i) {
            auto initial_row_chunk = (i / data_servers) * data_servers;
            chunk_set.insert(initial_row_chunk);
        }
        // Parity Stored in : parity1 .. parity2, as name =
        // [PARITY][Path][Initial row chunk]

        // 1 - Read data from the other chunks
        std::vector<char*> buffers(
                CTX->hosts().size(),
                (char*) malloc(gkfs::config::rpc::chunksize));
        std::cout << "OPERATION "
                  << " --- Size : " << updated_size
                  << " Chunks Range:" << chunks.first << " -- " << chunks.second
                  << " Data + Repliscas " << data_servers << " -- "
                  << CTX->get_replicas() << std::endl;
        // TODO : This could be optimised, with a single read loop
        for(auto i : chunk_set) {
            std::cout << i << " --- Size : " << updated_size << std::endl;
            for(auto j = i; j < i + data_servers; ++j) {
                std::set<int8_t> failed;
                std::cout << " Reading chunk "
                          << " [" << i << "] --> " << j << std::endl;
                auto out = gkfs::rpc::forward_read(
                        *path, buffers[j - i], j * gkfs::config::rpc::chunksize,
                        gkfs::config::rpc::chunksize, 0, failed);
                std::cout << " Read Success " << out.first << " -- "
                          << out.second << std::endl;
            }

            // We have all the data to process a EC

            std::vector<char*> coding(
                    CTX->get_replicas(),
                    (char*) malloc(gkfs::config::rpc::chunksize));

            auto matrix = reed_sol_vandermonde_coding_matrix(
                    data_servers, CTX->get_replicas(), 8);
            jerasure_matrix_encode(data_servers, CTX->get_replicas(), 8, matrix,
                                   buffers.data(), coding.data(),
                                   gkfs::config::rpc::chunksize);

            std::cout << " Parity computation done " << std::endl;
        }
    } else {
        std::cout << "No EC in small files" << std::endl;
    }
    // 2 - Calc Erasure codes

    // 3 - Write erasure codes


#else
    if(num_replicas > 0) {

        auto ret_write_repl = gkfs::rpc::forward_write(*path, buf, offset,
@@ -923,7 +1000,7 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
            write_size = ret_write_repl.second;
        }
    }

#endif
    if(err) {
        LOG(WARNING, "gkfs::rpc::forward_write() failed with err '{}'", err);
        errno = err;
@@ -1087,17 +1164,10 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
    std::set<int8_t> failed; // set with failed targets.
    if(CTX->get_replicas() != 0) {

<<<<<<< HEAD
        ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
                                      CTX->get_replicas(), failed);
        while(ret.first == EIO) {
            ret = gkfs::rpc::forward_read(file.path(), buf, offset, count,
=======
        ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
                                      CTX->get_replicas(), failed);
        ret = gkfs::rpc::forward_read(file->path(), buf, offset, count, 0,
                                      failed);
        while(ret.first == EIO) {
            ret = gkfs::rpc::forward_read(file->path(), buf, offset, count,
>>>>>>> 9ff73051 (Changelog change and branch)
                                          CTX->get_replicas(), failed);
            LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'",
                ret.first);
+5 −2
Original line number Diff line number Diff line
@@ -228,6 +228,9 @@ init_environment() {
#ifdef GKFS_USE_GUIDED_DISTRIBUTION
    auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
            CTX->local_host_id(), CTX->hosts().size());
#elif GKFS_USE_ECC_DISTRIBUTION
    auto distributor = std::make_shared<gkfs::rpc::ECCDistributor>(
            CTX->local_host_id(), CTX->hosts().size(), CTX->get_replicas());
#else
        auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
                CTX->local_host_id(), CTX->hosts().size());
+30 −0
Original line number Diff line number Diff line
@@ -47,6 +47,36 @@ namespace gkfs::rpc {
 * NOTE: No errno is defined here!
 */

#ifdef GKFS_USE_ECC_DISTRIBUTION
/**
 * @brief Calculate the chunk start and end that will be affected by the operation.
 * 
 * @param path 
 * @param append_flag 
 * @param in_offset 
 * @param write_size 
 * @param updated_metadentry_size 
 * @param num_copies 
 * @return pair<uint64_t, uint64_t> 
 */
std::pair<uint64_t, uint64_t>
calc_op_chunks(const std::string& path, const bool append_flag,
               const off64_t in_offset, const size_t write_size,
               const int64_t updated_metadentry_size) {
    using namespace gkfs::utils::arithmetic;
    off64_t offset =
            append_flag ? in_offset : (updated_metadentry_size - write_size);

    auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
    auto chnk_end = block_index((offset + write_size) - 1,
                                gkfs::config::rpc::chunksize);


     return make_pair(chnk_start, chnk_end);
}

#endif

/**
 * Send an RPC request to write from a buffer.
 * There is a bitset of 1024 chunks to tell the server