Commit fc80c695 authored by Marc Vef's avatar Marc Vef
Browse files

namespace usage unification for rpc_send

parent 7897d84f
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -18,15 +18,15 @@

namespace rpc_send {

    ssize_t write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset,
                  size_t write_size, int64_t updated_metadentry_size);

    struct ChunkStat {
        unsigned long chunk_size;
        unsigned long chunk_total;
        unsigned long chunk_free;
    };

    ssize_t write(const std::string& path, const void* buf, bool append_flag, off64_t in_offset,
                  size_t write_size, int64_t updated_metadentry_size);

    ssize_t read(const std::string& path, void* buf, off64_t offset, size_t read_size);

    int trunc_data(const std::string& path, size_t current_size, size_t new_size);
+2 −7
Original line number Diff line number Diff line
@@ -14,13 +14,10 @@
#include <hermes.hpp>
#include <client/rpc/hg_rpcs.hpp>

namespace hermes { namespace detail {

//==============================================================================
// register request types so that they can be used by users and the engine
//
void
register_user_request_types() {
void hermes::detail::register_user_request_types() {
    (void) registered_requests().add<gkfs::rpc::fs_config>();
    (void) registered_requests().add<gkfs::rpc::create>();
    (void) registered_requests().add<gkfs::rpc::stat>();
@@ -41,5 +38,3 @@ register_user_request_types() {
    (void) registered_requests().add<gkfs::rpc::chunk_stat>();

}
 No newline at end of file

}} // namespace hermes::detail
+359 −372
Original line number Diff line number Diff line
@@ -13,14 +13,13 @@

#include <client/preload_util.hpp>
#include <client/rpc/ld_rpc_data_ws.hpp>
#include <global/rpc/distributor.hpp>
#include <global/chunk_calc_util.hpp>
#include <client/rpc/hg_rpcs.hpp>
#include <client/logging.hpp>

#include <unordered_set>
#include <global/rpc/distributor.hpp>
#include <global/chunk_calc_util.hpp>

namespace rpc_send {
#include <unordered_set>

using namespace std;

@@ -30,7 +29,7 @@ namespace rpc_send {
/**
 * Sends an RPC request to a specific node to pull all chunks that belong to him
 */
    ssize_t write(const string& path, const void* buf, const bool append_flag,
ssize_t rpc_send::write(const string& path, const void* buf, const bool append_flag,
                        const off64_t in_offset, const size_t write_size,
                        const int64_t updated_metadentry_size) {

@@ -38,9 +37,7 @@ namespace rpc_send {

    // Calculate chunkid boundaries and numbers so that daemons know in
    // which interval to look for chunks
        off64_t offset = append_flag ?
                         in_offset :
                         (updated_metadentry_size - write_size);
    off64_t offset = append_flag ? in_offset : (updated_metadentry_size - write_size);

    auto chnk_start = chnk_id_for_offset(offset, gkfs_config::rpc::chunksize);
    auto chnk_end = chnk_id_for_offset((offset + write_size) - 1, gkfs_config::rpc::chunksize);
@@ -60,8 +57,7 @@ namespace rpc_send {
        auto target = CTX->distributor()->locate_data(path, chnk_id);

        if (target_chnks.count(target) == 0) {
                target_chnks.insert(
                        std::make_pair(target, std::vector<uint64_t>{chnk_id}));
            target_chnks.insert(std::make_pair(target, std::vector<uint64_t>{chnk_id}));
            targets.push_back(target);
        } else {
            target_chnks[target].push_back(chnk_id);
@@ -87,8 +83,7 @@ namespace rpc_send {
    hermes::exposed_memory local_buffers;

    try {
            local_buffers =
                    ld_network_service->expose(bufseq, hermes::access_mode::read_only);
        local_buffers = ld_network_service->expose(bufseq, hermes::access_mode::read_only);

    } catch (const std::exception& ex) {
        LOG(ERROR, "Failed to expose buffers for RMA");
@@ -146,8 +141,7 @@ namespace rpc_send {
            // TODO(amiranda): hermes will eventually provide a post(endpoint)
            // returning one result and a broadcast(endpoint_set) returning a
            // result_set. When that happens we can remove the .at(0) :/
                handles.emplace_back(
                        ld_network_service->post<gkfs::rpc::write_data>(endp, in));
            handles.emplace_back(ld_network_service->post<gkfs::rpc::write_data>(endp, in));

            LOG(DEBUG, "host: {}, path: \"{}\", chunks: {}, size: {}, offset: {}",
                target, path, in.chunk_n(), total_chunk_size, in.offset());
@@ -197,7 +191,7 @@ namespace rpc_send {
/**
 * Sends an RPC request to a specific node to push all chunks that belong to him
 */
    ssize_t read(const string& path, void* buf, const off64_t offset, const size_t read_size) {
ssize_t rpc_send::read(const string& path, void* buf, const off64_t offset, const size_t read_size) {

    // Calculate chunkid boundaries and numbers so that daemons know in which
    // interval to look for chunks
@@ -219,8 +213,7 @@ namespace rpc_send {
        auto target = CTX->distributor()->locate_data(path, chnk_id);

        if (target_chnks.count(target) == 0) {
                target_chnks.insert(
                        std::make_pair(target, std::vector<uint64_t>{chnk_id}));
            target_chnks.insert(std::make_pair(target, std::vector<uint64_t>{chnk_id}));
            targets.push_back(target);
        } else {
            target_chnks[target].push_back(chnk_id);
@@ -246,8 +239,7 @@ namespace rpc_send {
    hermes::exposed_memory local_buffers;

    try {
            local_buffers =
                    ld_network_service->expose(bufseq, hermes::access_mode::write_only);
        local_buffers = ld_network_service->expose(bufseq, hermes::access_mode::write_only);

    } catch (const std::exception& ex) {
        LOG(ERROR, "Failed to expose buffers for RMA");
@@ -353,7 +345,7 @@ namespace rpc_send {
    return error ? -1 : out_size;
}

    int trunc_data(const std::string& path, size_t current_size, size_t new_size) {
int rpc_send::trunc_data(const std::string& path, size_t current_size, size_t new_size) {

    assert(current_size > new_size);
    bool error = false;
@@ -361,8 +353,7 @@ namespace rpc_send {
    // Find out which data servers need to delete data chunks in order to
    // contact only them
    const unsigned int chunk_start = chnk_id_for_offset(new_size, gkfs_config::rpc::chunksize);
        const unsigned int chunk_end =
                chnk_id_for_offset(current_size - new_size - 1, gkfs_config::rpc::chunksize);
    const unsigned int chunk_end = chnk_id_for_offset(current_size - new_size - 1, gkfs_config::rpc::chunksize);

    std::unordered_set<unsigned int> hosts;
    for (unsigned int chunk_id = chunk_start; chunk_id <= chunk_end; ++chunk_id) {
@@ -385,8 +376,7 @@ namespace rpc_send {
            // TODO(amiranda): hermes will eventually provide a post(endpoint)
            // returning one result and a broadcast(endpoint_set) returning a
            // result_set. When that happens we can remove the .at(0) :/
                handles.emplace_back(
                        ld_network_service->post<gkfs::rpc::trunc_data>(endp, in));
            handles.emplace_back(ld_network_service->post<gkfs::rpc::trunc_data>(endp, in));

        } catch (const std::exception& ex) {
            // TODO(amiranda): we should cancel all previously posted requests
@@ -421,7 +411,7 @@ namespace rpc_send {
    return error ? -1 : 0;
}

    ChunkStat chunk_stat() {
rpc_send::ChunkStat rpc_send::chunk_stat() {

    std::vector<hermes::rpc_handle<gkfs::rpc::chunk_stat>> handles;

@@ -436,8 +426,7 @@ namespace rpc_send {
            // TODO(amiranda): hermes will eventually provide a post(endpoint)
            // returning one result and a broadcast(endpoint_set) returning a
            // result_set. When that happens we can remove the .at(0) :/
                handles.emplace_back(
                        ld_network_service->post<gkfs::rpc::chunk_stat>(endp, in));
            handles.emplace_back(ld_network_service->post<gkfs::rpc::chunk_stat>(endp, in));

        } catch (const std::exception& ex) {
            // TODO(amiranda): we should cancel all previously posted requests
@@ -473,5 +462,3 @@ namespace rpc_send {

    return {chunk_size, chunk_total, chunk_free};
}

} // end namespace rpc_send
+33 −39
Original line number Diff line number Diff line
@@ -18,14 +18,11 @@

#include <boost/token_functions.hpp>


namespace rpc_send {

/**
* Gets fs configuration information from the running daemon and transfers it to the memory of the library
* @return
*/
    bool get_fs_config() {
bool rpc_send::get_fs_config() {

    auto endp = CTX->hosts().at(CTX->local_host_id());
    gkfs::rpc::fs_config::output out;
@@ -59,6 +56,3 @@ namespace rpc_send {

    return true;
}


}
+327 −362
Original line number Diff line number Diff line
@@ -22,11 +22,9 @@
#include <global/rpc/distributor.hpp>
#include <global/rpc/rpc_types.hpp>

namespace rpc_send {

using namespace std;

    int mk_node(const std::string& path, const mode_t mode) {
int rpc_send::mk_node(const std::string& path, const mode_t mode) {

    int err = EUNKNOWN;
    auto endp = CTX->hosts().at(
@@ -39,8 +37,7 @@ namespace rpc_send {
        // TODO(amiranda): hermes will eventually provide a post(endpoint)
        // returning one result and a broadcast(endpoint_set) returning a
        // result_set. When that happens we can remove the .at(0) :/
            auto out =
                    ld_network_service->post<gkfs::rpc::create>(endp, path, mode).get().at(0);
        auto out = ld_network_service->post<gkfs::rpc::create>(endp, path, mode).get().at(0);
        err = out.err();
        LOG(DEBUG, "Got response success: {}", err);

@@ -58,10 +55,9 @@ namespace rpc_send {
    return err;
}

    int stat(const std::string& path, string& attr) {
int rpc_send::stat(const std::string& path, string& attr) {

        auto endp = CTX->hosts().at(
                CTX->distributor()->locate_file_metadata(path));
    auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

    try {
        LOG(DEBUG, "Sending RPC ...");
@@ -70,8 +66,7 @@ namespace rpc_send {
        // TODO(amiranda): hermes will eventually provide a post(endpoint)
        // returning one result and a broadcast(endpoint_set) returning a
        // result_set. When that happens we can remove the .at(0) :/
            auto out =
                    ld_network_service->post<gkfs::rpc::stat>(endp, path).get().at(0);
        auto out = ld_network_service->post<gkfs::rpc::stat>(endp, path).get().at(0);
        LOG(DEBUG, "Got response success: {}", out.err());

        if (out.err() != 0) {
@@ -91,10 +86,9 @@ namespace rpc_send {
    return 0;
}

    int decr_size(const std::string& path, size_t length) {
int rpc_send::decr_size(const std::string& path, size_t length) {

        auto endp = CTX->hosts().at(
                CTX->distributor()->locate_file_metadata(path));
    auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

    try {

@@ -104,9 +98,7 @@ namespace rpc_send {
        // TODO(amiranda): hermes will eventually provide a post(endpoint)
        // returning one result and a broadcast(endpoint_set) returning a
        // result_set. When that happens we can remove the .at(0) :/
            auto out =
                    ld_network_service->post<gkfs::rpc::decr_size>(
                            endp, path, length).get().at(0);
        auto out = ld_network_service->post<gkfs::rpc::decr_size>(endp, path, length).get().at(0);

        LOG(DEBUG, "Got response success: {}", out.err());

@@ -124,15 +116,14 @@ namespace rpc_send {
    }
}

    int rm_node(const std::string& path, const bool remove_metadentry_only, const ssize_t size) {
int rpc_send::rm_node(const std::string& path, const bool remove_metadentry_only, const ssize_t size) {

    // if only the metadentry should be removed, send one rpc to the
    // metadentry's responsible node to remove the metadata
    // else, send an rpc to all hosts and thus broadcast chunk_removal.
    if (remove_metadentry_only) {

            auto endp = CTX->hosts().at(
                    CTX->distributor()->locate_file_metadata(path));
        auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

        try {

@@ -142,8 +133,7 @@ namespace rpc_send {
            // TODO(amiranda): hermes will eventually provide a post(endpoint)
            // returning one result and a broadcast(endpoint_set) returning a
            // result_set. When that happens we can remove the .at(0) :/
                auto out =
                        ld_network_service->post<gkfs::rpc::remove>(endp, path).get().at(0);
            auto out = ld_network_service->post<gkfs::rpc::remove>(endp, path).get().at(0);

            LOG(DEBUG, "Got response success: {}", out.err());

@@ -168,14 +158,12 @@ namespace rpc_send {
    // Small files
    if (static_cast<std::size_t>(size / gkfs_config::rpc::chunksize) < CTX->hosts().size()) {

            auto endp = CTX->hosts().at(
                    CTX->distributor()->locate_file_metadata(path));
        auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

        try {
            LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
            gkfs::rpc::remove::input in(path);
                handles.emplace_back(
                        ld_network_service->post<gkfs::rpc::remove>(endp, in));
            handles.emplace_back(ld_network_service->post<gkfs::rpc::remove>(endp, in));

            uint64_t chnk_start = 0;
            uint64_t chnk_end = size / gkfs_config::rpc::chunksize;
@@ -186,8 +174,7 @@ namespace rpc_send {

                LOG(DEBUG, "Sending RPC to host: {}", target.to_string());

                    handles.emplace_back(
                            ld_network_service->post<gkfs::rpc::remove>(target, in));
                handles.emplace_back(ld_network_service->post<gkfs::rpc::remove>(target, in));
            }
        } catch (const std::exception& ex) {
            LOG(ERROR, "Failed to send reduced remove requests");
@@ -209,8 +196,7 @@ namespace rpc_send {
                //
                //

                    handles.emplace_back(
                            ld_network_service->post<gkfs::rpc::remove>(endp, in));
                handles.emplace_back(ld_network_service->post<gkfs::rpc::remove>(endp, in));

            } catch (const std::exception& ex) {
                // TODO(amiranda): we should cancel all previously posted requests
@@ -248,10 +234,9 @@ namespace rpc_send {
}


    int update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) {
int rpc_send::update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) {

        auto endp = CTX->hosts().at(
                CTX->distributor()->locate_file_metadata(path));
    auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

    try {

@@ -261,8 +246,7 @@ namespace rpc_send {
        // TODO(amiranda): hermes will eventually provide a post(endpoint)
        // returning one result and a broadcast(endpoint_set) returning a
        // result_set. When that happens we can remove the .at(0) :/
            auto out =
                    ld_network_service->post<gkfs::rpc::update_metadentry>(
        auto out = ld_network_service->post<gkfs::rpc::update_metadentry>(
                endp,
                path,
                (md_flags.link_count ? md.link_count() : 0),
@@ -298,11 +282,11 @@ namespace rpc_send {
    }
}

    int update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag,
int
rpc_send::update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag,
                                 off64_t& ret_size) {

        auto endp = CTX->hosts().at(
                CTX->distributor()->locate_file_metadata(path));
    auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

    try {

@@ -312,8 +296,7 @@ namespace rpc_send {
        // TODO(amiranda): hermes will eventually provide a post(endpoint)
        // returning one result and a broadcast(endpoint_set) returning a
        // result_set. When that happens we can remove the .at(0) :/
            auto out =
                    ld_network_service->post<gkfs::rpc::update_metadentry_size>(
        auto out = ld_network_service->post<gkfs::rpc::update_metadentry_size>(
                endp, path, size, offset,
                bool_to_merc_bool(append_flag)).get().at(0);

@@ -337,10 +320,9 @@ namespace rpc_send {
    }
}

    int get_metadentry_size(const std::string& path, off64_t& ret_size) {
int rpc_send::get_metadentry_size(const std::string& path, off64_t& ret_size) {

        auto endp = CTX->hosts().at(
                CTX->distributor()->locate_file_metadata(path));
    auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

    try {

@@ -350,9 +332,7 @@ namespace rpc_send {
        // TODO(amiranda): hermes will eventually provide a post(endpoint)
        // returning one result and a broadcast(endpoint_set) returning a
        // result_set. When that happens we can remove the .at(0) :/
            auto out =
                    ld_network_service->post<gkfs::rpc::get_metadentry_size>(
                            endp, path).get().at(0);
        auto out = ld_network_service->post<gkfs::rpc::get_metadentry_size>(endp, path).get().at(0);

        LOG(DEBUG, "Got response success: {}", out.err());

@@ -370,11 +350,10 @@ namespace rpc_send {
/**
 * Sends an RPC request to a specific node to push all chunks that belong to him
 */
    void get_dirents(OpenDir& open_dir) {
void rpc_send::get_dirents(OpenDir& open_dir) {

    auto const root_dir = open_dir.path();
        auto const targets =
                CTX->distributor()->locate_directory_metadata(root_dir);
    auto const targets = CTX->distributor()->locate_directory_metadata(root_dir);

    /* preallocate receiving buffer. The actual size is not known yet.
     *
@@ -382,12 +361,10 @@ namespace rpc_send {
     * It turns out that this operation is increadibly slow for such a big
     * buffer. Moreover we don't need a zeroed buffer here.
     */
        auto large_buffer =
                std::unique_ptr<char[]>(new char[gkfs_config::rpc::dirents_buff_size]);
    auto large_buffer = std::unique_ptr<char[]>(new char[gkfs_config::rpc::dirents_buff_size]);

    //XXX there is a rounding error here depending on the number of targets...
        const std::size_t per_host_buff_size =
                gkfs_config::rpc::dirents_buff_size / targets.size();
    const std::size_t per_host_buff_size = gkfs_config::rpc::dirents_buff_size / targets.size();

    // expose local buffers for RMA from servers
    std::vector<hermes::exposed_memory> exposed_buffers;
@@ -395,8 +372,7 @@ namespace rpc_send {

    for (std::size_t i = 0; i < targets.size(); ++i) {
        try {
                exposed_buffers.emplace_back(
                        ld_network_service->expose(
            exposed_buffers.emplace_back(ld_network_service->expose(
                    std::vector<hermes::mutable_buffer>{
                            hermes::mutable_buffer{
                                    large_buffer.get() + (i * per_host_buff_size),
@@ -424,8 +400,7 @@ namespace rpc_send {
        try {

            LOG(DEBUG, "Sending RPC to host: {}", targets[i]);
                handles.emplace_back(
                        ld_network_service->post<gkfs::rpc::get_dirents>(endp, in));
            handles.emplace_back(ld_network_service->post<gkfs::rpc::get_dirents>(endp, in));
        } catch (const std::exception& ex) {
            LOG(ERROR, "Unable to send non-blocking get_dirents() "
                       "on {} [peer: {}]", root_dir, targets[i]);
@@ -467,17 +442,12 @@ namespace rpc_send {

        for (std::size_t j = 0; j < out.dirents_size(); j++) {

                FileType ftype = (*bool_ptr) ?
                                 FileType::directory :
                                 FileType::regular;
            FileType ftype = (*bool_ptr) ? FileType::directory : FileType::regular;
            bool_ptr++;

            // Check that we are not outside the recv_buff for this specific host
            assert((names_ptr - reinterpret_cast<char*>(base_ptr)) > 0);
                assert(
                        static_cast<unsigned long int>(
                                names_ptr - reinterpret_cast<char*>(base_ptr)) <
                        per_host_buff_size);
            assert(static_cast<unsigned long int>(names_ptr - reinterpret_cast<char*>(base_ptr)) < per_host_buff_size);

            auto name = std::string(names_ptr);
            names_ptr += name.size() + 1;
@@ -489,10 +459,9 @@ namespace rpc_send {

#ifdef HAS_SYMLINKS

    int mk_symlink(const std::string& path, const std::string& target_path) {
int rpc_send::mk_symlink(const std::string& path, const std::string& target_path) {

        auto endp = CTX->hosts().at(
                CTX->distributor()->locate_file_metadata(path));
    auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));

    try {

@@ -502,9 +471,7 @@ namespace rpc_send {
        // TODO(amiranda): hermes will eventually provide a post(endpoint)
        // returning one result and a broadcast(endpoint_set) returning a
        // result_set. When that happens we can remove the .at(0) :/
            auto out =
                    ld_network_service->post<gkfs::rpc::mk_symlink>(
                            endp, path, target_path).get().at(0);
        auto out = ld_network_service->post<gkfs::rpc::mk_symlink>(endp, path, target_path).get().at(0);

        LOG(DEBUG, "Got response success: {}", out.err());

@@ -523,5 +490,3 @@ namespace rpc_send {
}

#endif

} //end namespace rpc_send