Commit f1e97ed9 authored by sevenuz's avatar sevenuz
Browse files

reverse first commit on this branch and then maybe fix rpc problem for fuse3 multithreading

parent 5eb37db4
Loading
Loading
Loading
Loading
Loading
+25 −0
Original line number Diff line number Diff line
@@ -209,6 +209,31 @@ init_environment() {
                CTX->rpc_protocol(), THALLIUM_CLIENT_MODE, margo_config);
        CTX->rpc_engine(rpc_engine);

        // Pre-register every RPC tag while still single-threaded so that
        // concurrent calls to define() from FUSE worker threads never race
        // inside HG_Core_register() and corrupt Mercury's RPC callback table.
        for(const auto* tag : {gkfs::rpc::tag::write,
                               gkfs::rpc::tag::read,
                               gkfs::rpc::tag::truncate,
                               gkfs::rpc::tag::get_chunk_stat,
                               gkfs::rpc::tag::create,
                               gkfs::rpc::tag::stat,
                               gkfs::rpc::tag::remove_metadata,
                               gkfs::rpc::tag::remove_data,
                               gkfs::rpc::tag::decr_size,
                               gkfs::rpc::tag::update_metadentry,
                               gkfs::rpc::tag::update_metadentry_size,
                               gkfs::rpc::tag::get_metadentry_size,
                               gkfs::rpc::tag::get_dirents,
                               gkfs::rpc::tag::get_dirents_extended,
                               gkfs::rpc::tag::get_dirents_filtered,
                               gkfs::rpc::tag::mk_symlink,
                               gkfs::rpc::tag::rename,
                               gkfs::rpc::tag::write_data_inline,
                               gkfs::rpc::tag::read_data_inline}) {
            rpc_engine->define(tag);
        }

        if(CTX->use_proxy()) {
            auto ipc_engine = std::make_shared<thallium::engine>(
                    "na+sm", THALLIUM_CLIENT_MODE, margo_config);
+2 −42
Original line number Diff line number Diff line
@@ -64,44 +64,6 @@ extern "C" {
#include <syscall.h>
}

namespace {

struct ThreadState {
    bool initialized{false};
    std::shared_ptr<thallium::engine> engine;
    std::vector<thallium::endpoint> hosts;
};

thread_local ThreadState tl_state;

/*
 * Called lazily on the first CTX->rpc_engine() or CTX->hosts() call per
 * thread after global init is complete. Creates an independent Margo engine
 * and re-looks up server endpoints for this thread, eliminating shared-engine
 * contention between FUSE worker threads (or multi-threaded app threads in the
 * LD_PRELOAD case).  The guard `global_hosts.empty()` prevents activation
 * during gkfs_init() itself, where hosts_ is still being populated.
 */
void
ensure_thread_local(const std::string& rpc_protocol,
                    const std::vector<thallium::endpoint>& global_hosts) {
    if(tl_state.initialized || global_hosts.empty())
        return;

    auto margo_config = R"({"use_progress_thread":true,"rpc_thread_count":0})";
    tl_state.engine = std::make_shared<thallium::engine>(
            rpc_protocol, THALLIUM_CLIENT_MODE, margo_config);

    tl_state.hosts.reserve(global_hosts.size());
    for(const auto& ep : global_hosts) {
        tl_state.hosts.emplace_back(
                tl_state.engine->lookup(std::string(ep)));
    }
    tl_state.initialized = true;
}

} // namespace

namespace gkfs {

namespace preload {
@@ -291,8 +253,7 @@ PreloadContext::cwd() const {

const std::vector<thallium::endpoint>&
PreloadContext::hosts() const {
    ensure_thread_local(rpc_protocol_, hosts_);
    return tl_state.initialized ? tl_state.hosts : hosts_;
    return hosts_;
}

void
@@ -805,8 +766,7 @@ PreloadContext::read_metrics() {

std::shared_ptr<thallium::engine>
PreloadContext::rpc_engine() {
    ensure_thread_local(rpc_protocol_, hosts_);
    return tl_state.initialized ? tl_state.engine : rpc_engine_;
    return rpc_engine_;
}

void
+2 −2
Original line number Diff line number Diff line
@@ -210,7 +210,7 @@ forward_write(const string& path, const void* buf, const off64_t offset,

    // Keep track of targets for error reporting

    auto write_rpc = CTX->rpc_engine()->define(gkfs::rpc::tag::write);
    static auto write_rpc = CTX->rpc_engine()->define(gkfs::rpc::tag::write);

    for(std::size_t i = 0; i < targets.size(); ++i) {
        auto target = targets[i];
@@ -423,7 +423,7 @@ forward_read(const string& path, void* buf, const off64_t offset,
    std::vector<uint64_t> waiter_targets; // track targets for error reporting
    waiter_targets.reserve(targets.size());

    auto read_rpc = CTX->rpc_engine()->define(gkfs::rpc::tag::read);
    static auto read_rpc = CTX->rpc_engine()->define(gkfs::rpc::tag::read);

    // Issue non-blocking RPC requests and wait for the result later
    for(std::size_t i = 0; i < targets.size(); ++i) {