Commit 4e43dfac authored by Ramon Nou's avatar Ramon Nou Committed by Ramon Nou
Browse files

Updated Jerasure installation, fixing includes

WIP

Missing write with char**
parent 1c61574a
Loading
Loading
Loading
Loading
+2 −7
Original line number Diff line number Diff line
@@ -34,13 +34,8 @@
# Jerasure_LIBRARIES
#

# - Try to find galois as Jerasure.h is installed in the root include
find_path(Jerasure_INCLUDE_DIR
    NAMES jerasure.h
    )

find_path(Jerasure2_INCLUDE_DIR
    NAMES galois.h
    NAMES jerasure/jerasure.h
    )

find_library(Jerasure_LIBRARY
+8 −3
Original line number Diff line number Diff line
@@ -43,10 +43,15 @@ struct ChunkStat {

// TODO once we have LEAF, remove all the error code returns and throw them as
// an exception.

std::pair<int, ssize_t>
ecc_forward_write(const std::string& path, const void* buf, const size_t write_size,
ecc_forward_read(const std::string& path, void* buf, const size_t read_size,
                 const int8_t server);

std::pair<int, ssize_t>
ecc_forward_write(const std::string& path, const void* buf,
                  const size_t write_size, const int8_t server);

std::pair<int, ssize_t>
forward_write(const std::string& path, const void* buf, off64_t offset,
              size_t write_size, const int8_t num_copy = 0);
+1 −0
Original line number Diff line number Diff line
@@ -51,6 +51,7 @@ pkg_install() {
    ./configure --prefix="${INSTALL_DIR}" 
    make -j"${CORES}"
    make install
    mv ${INSTALL_DIR}/include/jerasure.h ${INSTALL_DIR}/include/jerasure/jerasure.h
}

pkg_check() {
+4 −4
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ extern "C" {
}
#define GKFS_ENABLE_EC 1
#ifdef GKFS_ENABLE_EC
#include <jerasure.h>
#include <jerasure/jerasure.h>
#include <jerasure/reed_sol.h>
#endif

@@ -936,7 +936,7 @@ gkfs_ecc_write(std::shared_ptr<gkfs::filemap::OpenFile> file, size_t count,

            // Write erasure
            std::string ecc_path = file->path() + "_ecc_" + to_string(i) + "_" +
                                   to_string(i + data_servers);
                                   to_string(i + data_servers - 1);
            for(int i = 0; i < CTX->get_replicas(); i++) {
                auto ecc_write = gkfs::rpc::ecc_forward_write(
                        ecc_path, coding[i], gkfs::config::rpc::chunksize,
@@ -1185,7 +1185,7 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
    std::set<int8_t> failed; // set with failed targets.
    if(CTX->get_replicas() != 0) {

        ret = gkfs::rpc::forward_read(file->path(), buf, offset, count, 0,
        ret = gkfs::rpc::forward_read(file->path(), buf, offset, count, CTX->get_replicas(),
                                      failed);
        while(ret.first == EIO) {
#ifdef GKFS_ENABLE_EC
+200 −31
Original line number Diff line number Diff line
@@ -40,7 +40,7 @@

#define GKFS_ENABLE_EC 1
#ifdef GKFS_ENABLE_EC
#include <jerasure.h>
#include <jerasure/jerasure.h>
#include <jerasure/reed_sol.h>
#endif

@@ -210,6 +210,130 @@ ecc_forward_write(const string& path, const void* buf, const size_t write_size,
    else
        return make_pair(0, out_size);
}
/**
 * Send an RPC request to read to a buffer.
 * @param path
 * @param buf
 * @param offset
 * @param read_size
 * @param num_copies number of copies available (0 is no replication)
 * @param failed nodes failed that should not be used
 * @return pair<error code, read size>
 */
pair<int, ssize_t>
ecc_forward_read(const string& path, void* buf, const size_t read_size,
                 const int8_t server) {


    std::vector<uint8_t> read_bitset_vect(8, 0);
    gkfs::rpc::set_bitset(read_bitset_vect, 0);

    // some helper variables for async RPCs
    std::vector<hermes::mutable_buffer> bufseq{
            hermes::mutable_buffer{buf, read_size},
    };

    // expose user buffers so that they can serve as RDMA data targets
    // (these are automatically "unexposed" when the destructor is called)
    hermes::exposed_memory local_buffers;

    try {
        local_buffers = ld_network_service->expose(
                bufseq, hermes::access_mode::write_only);

    } catch(const std::exception& ex) {
        LOG(ERROR, "Failed to expose buffers for RMA");
        return make_pair(EBUSY, 0);
    }

    std::vector<hermes::rpc_handle<gkfs::rpc::read_data>> handles;

    // Issue non-blocking RPC requests and wait for the result later
    //
    // TODO(amiranda): This could be simplified by adding a vector of inputs
    // to async_engine::broadcast(). This would allow us to avoid manually
    // looping over handles as we do below

    auto target = server;

    auto endp = CTX->hosts().at(target);

    try {

        LOG(DEBUG, "Sending RPC ...");

        gkfs::rpc::read_data::input in(
                path,
                // first offset in targets is the chunk with
                // a potential offset
                0, target, CTX->hosts().size(),
                gkfs::rpc::compressBitset(read_bitset_vect),
                // number of chunks handled by that destination
                1,
                // chunk start id of this write
                0,
                // chunk end id of this write
                0,
                // total size to write
                read_size, local_buffers);

        // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
        // that we can retry for RPC_TRIES (see old commits with margo)
        // TODO(amiranda): hermes will eventually provide a
        // post(endpoint) returning one result and a
        // broadcast(endpoint_set) returning a result_set. When that
        // happens we can remove the .at(0) :/
        handles.emplace_back(
                ld_network_service->post<gkfs::rpc::read_data>(endp, in));

    } catch(const std::exception& ex) {
        LOG(ERROR,
            "Unable to send non-blocking rpc for path \"{}\" "
            "[peer: {}]",
            path, target);
        return make_pair(EBUSY, 0);
    }

    // Wait for RPC responses and then get response and add it to out_size
    // which is the read size. All potential outputs are served to free
    // resources regardless of errors, although an errorcode is set.
    auto err = 0;
    ssize_t out_size = 0;
    std::size_t idx = 0;

    for(const auto& h : handles) {
        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            auto out = h.get().at(0);

            if(out.err() != 0) {
                LOG(ERROR, "Daemon reported error: {}", out.err());
                err = out.err();
            }

            out_size += static_cast<size_t>(out.io_size());

        } catch(const std::exception& ex) {
            LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
                path, target);
            err = EIO;
        }
        idx++;
    }


    /*
     * Typically file systems return the size even if only a part of it was
     * read. In our case, we do not keep track which daemon fully read its
     * workload. Thus, we always return size 0 on error.
     */
    if(err)
        return make_pair(err, 0);
    else
        return make_pair(0, out_size);
}

// #endif

/**
@@ -456,11 +580,19 @@ bool
gkfs_ecc_recover(const std::string& path, void* buffer_recover,
                 uint64_t chunk_candidate, uint64_t failed_server) {

    std::vector<char*> buffers(CTX->hosts().size(),
                               (char*) malloc(gkfs::config::rpc::chunksize));

    auto data_servers = CTX->hosts().size() - CTX->get_replicas();

    char** data = (char**) malloc(sizeof(char*) * data_servers);
    char** coding = (char**) malloc(sizeof(char*) * CTX->get_replicas());

    for(auto i = 0; i < data_servers; ++i) {
        data[i] = (char*) malloc(gkfs::config::rpc::chunksize);
    }
    for(auto i = 0; i < CTX->get_replicas(); ++i) {
        coding[i] = (char*) malloc(gkfs::config::rpc::chunksize);
    }

    auto initial_row_chunk = (chunk_candidate / data_servers) * data_servers;


@@ -472,55 +604,86 @@ gkfs_ecc_recover(const std::string& path, void* buffer_recover,
    LOG(DEBUG, "Operation Size - Range {} - data_servers {} replica_servers {}",
        initial_row_chunk, data_servers, CTX->get_replicas());

    vector<int> erased(CTX->hosts().size(), 1);
    vector<int> erased;

    auto i = initial_row_chunk;

    for(auto j = i; j < i + data_servers; ++j) {
    for(uint64_t j = 0; j < data_servers; ++j) {
        std::set<int8_t> failed;
        LOG(DEBUG, "Reading Chunk {} -> {}, from server {} ", i, j, j - i);
        LOG(DEBUG, "Reading Chunk {} -> {}, from server {}", i, i + j, j);

        auto out = gkfs::rpc::forward_read(
                path, buffers[j - i], j * gkfs::config::rpc::chunksize,
                path, data[j], (j + i) * gkfs::config::rpc::chunksize,
                gkfs::config::rpc::chunksize, 0, failed);
        if(out.first != 0) {
            LOG(ERROR, "Read Parity Error: {}", out.first);
            erased[j - i] = 0;
            erased.push_back(j);
        }
    }
    {
        uint64_t md5 = 0;
        for(auto k = 0; k < gkfs::config::rpc::chunksize; k++) {
            md5 += data[failed_server][k];
        }
        std::cout << "Content of the failed server? " << failed_server << " --> "
                  << md5 << std::endl;
    }

    std::string ecc_path =
            path + "_ecc_" + to_string(i) + "_" + to_string(i + data_servers);
   // memset(data[failed_server], 3, gkfs::config::rpc::chunksize);
    std::string ecc_path = path + "_ecc_" + to_string(i) + "_" +
                           to_string(i + data_servers - 1);

    for(auto j = i + data_servers; j < i + CTX->hosts().size(); ++j) {
        std::set<int8_t> failed;
        LOG(DEBUG, "Reading EC Chunk {} {} -> {}, from server {} ", ecc_path,
            i + data_servers, j, j - i + data_servers);
    for(auto ecc_num = 0; ecc_num < CTX->get_replicas(); ++ecc_num) {
        LOG(DEBUG, "Reading EC Chunk {} from server {} ", ecc_path,
            ecc_num + data_servers);
        auto out = gkfs::rpc::ecc_forward_read(ecc_path, coding[ecc_num],
                                               gkfs::config::rpc::chunksize,
                                               ecc_num + data_servers);

        auto out = gkfs::rpc::forward_read(
                ecc_path, buffers[j - i + data_servers],
                j * gkfs::config::rpc::chunksize, gkfs::config::rpc::chunksize,
                0, failed);
        if(out.first != 0) {
            LOG(ERROR, "Read Parity Error: {}", out.first);
            erased[j - i + data_servers] = 0;
            erased.push_back(ecc_num + data_servers);
        } else {
            LOG(DEBUG, "Read EC Success");
        }
    }

    // We force a failure
    erased[failed_server] = 1;
    erased.push_back(failed_server);
    erased.push_back(-1);


    int res = 0;

    // We have all the data to recover the buffer
    auto matrix = reed_sol_vandermonde_coding_matrix(data_servers,
                                                     CTX->get_replicas(), 8);
    jerasure_matrix_decode(data_servers, CTX->get_replicas(), 8, matrix, 0,
                           erased.data(), buffers.data(),
                           buffers.data() +
                                   gkfs::config::rpc::chunksize * data_servers,
                           gkfs::config::rpc::chunksize);

    memcpy(buffer_recover,
           buffers.data() + erased.front() * gkfs::config::rpc::chunksize,
    res = jerasure_matrix_decode(data_servers, CTX->get_replicas(), 8, matrix,
                                 1, erased.data(), data, coding,
                                 gkfs::config::rpc::chunksize);

    std::cout << "recovered? Fails? " << failed_server << " -- " << res
              << std::endl;

{
        uint64_t md5 = 0;
        for(auto k = 0; k < gkfs::config::rpc::chunksize; k++) {
            md5 += data[failed_server][k];
        }
        std::cout << "Content of the recovered server? " << failed_server << " --> "
                  << md5 << std::endl;
    }

    memcpy(buffer_recover, data[failed_server], gkfs::config::rpc::chunksize);

    {
        uint64_t md5 = 0;
        for(auto i = 0; i < gkfs::config::rpc::chunksize; i++) {
            md5 += ((char*) buffer_recover)[i];
        }
        std::cout << "md5 recovered? " << md5 << std::endl;
    }
    LOG(DEBUG, "EC computation finished");

    return true;
@@ -709,7 +872,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
                LOG(ERROR, "Daemon reported error: {}", out.err());
                err = out.err();
            }
            if(rand() % 2 == 0) {
            if(rand() % 2 == 0 and num_copies > 0) {
                throw std::exception();
            }
            out_size += static_cast<size_t>(out.io_size());
@@ -747,7 +910,8 @@ forward_read(const string& path, void* buf, const off64_t offset,
                // We have a chunk to recover
                // We don't need to worry about offset etc... just use the chunk
                // number
                void* recovered_chunk = malloc(gkfs::config::rpc::chunksize);
                char* recovered_chunk =
                        (char*) malloc(gkfs::config::rpc::chunksize);
                gkfs::rpc::gkfs_ecc_recover(path, recovered_chunk, chnk_id_file,
                                            failed_server);

@@ -779,10 +943,15 @@ forward_read(const string& path, void* buf, const off64_t offset,
                LOG(DEBUG,
                    "Recovered chunk : Start Offset {}/OffsetChunk {} - Size {}",
                    recover_offt, recover_offt_chunk, recover_size);
                std::cout << "Recovered " << recover_offt << " -- "
                          << recover_offt_chunk << " --- size " << recover_size
                          << std::endl;
                memcpy((char*) buf + recover_offt,
                       (char*) recovered_chunk + recover_offt_chunk,
                       recover_size);
                free(recovered_chunk);
            }

#endif
        }
        idx++;