Commit 8cd4b745 authored by sevenuz's avatar sevenuz
Browse files

rpc engine fix for FUSE > 3

parent f4006c8d
Loading
Loading
Loading
Loading
+42 −2
Original line number Diff line number Diff line
@@ -64,6 +64,44 @@ extern "C" {
#include <syscall.h>
}

namespace {

struct ThreadState {
    bool initialized{false};
    std::shared_ptr<thallium::engine> engine;
    std::vector<thallium::endpoint> hosts;
};

thread_local ThreadState tl_state;

/*
 * Called lazily on the first CTX->rpc_engine() or CTX->hosts() call per
 * thread after global init is complete. Creates an independent Margo engine
 * and re-looks up server endpoints for this thread, eliminating shared-engine
 * contention between FUSE worker threads (or multi-threaded app threads in the
 * LD_PRELOAD case).  The guard `global_hosts.empty()` prevents activation
 * during gkfs_init() itself, where hosts_ is still being populated.
 */
void
ensure_thread_local(const std::string& rpc_protocol,
                    const std::vector<thallium::endpoint>& global_hosts) {
    if(tl_state.initialized || global_hosts.empty())
        return;

    auto margo_config = R"({"use_progress_thread":true,"rpc_thread_count":0})";
    tl_state.engine = std::make_shared<thallium::engine>(
            rpc_protocol, THALLIUM_CLIENT_MODE, margo_config);

    tl_state.hosts.reserve(global_hosts.size());
    for(const auto& ep : global_hosts) {
        tl_state.hosts.emplace_back(
                tl_state.engine->lookup(std::string(ep)));
    }
    tl_state.initialized = true;
}

} // namespace

namespace gkfs {

namespace preload {
@@ -253,7 +291,8 @@ PreloadContext::cwd() const {

const std::vector<thallium::endpoint>&
PreloadContext::hosts() const {
    return hosts_;
    ensure_thread_local(rpc_protocol_, hosts_);
    return tl_state.initialized ? tl_state.hosts : hosts_;
}

void
@@ -766,7 +805,8 @@ PreloadContext::read_metrics() {

std::shared_ptr<thallium::engine>
PreloadContext::rpc_engine() {
    return rpc_engine_;
    ensure_thread_local(rpc_protocol_, hosts_);
    return tl_state.initialized ? tl_state.engine : rpc_engine_;
}

void