Verified Commit ebd1173e authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Fix bug in rm_node due to async_engine::broadcast

parent 139fcc1f
Loading
Loading
Loading
Loading
Loading
+38 −17
Original line number Diff line number Diff line
@@ -159,36 +159,57 @@ int rm_node(const std::string& path, const bool remove_metadentry_only) {

    std::vector<hermes::rpc_handle<gkfs::rpc::remove>> handles;

    hermes::endpoint_set endps;

    std::copy(CTX->hosts().begin(), 
              CTX->hosts().end(), 
              std::back_inserter(endps));

    for (const auto& endp : CTX->hosts()) {
        try {
            CTX->log()->trace("{}() Sending RPC to host: {}", 
                              __func__, endp.to_string());

        auto output_set = 
            ld_network_service->broadcast<gkfs::rpc::remove>(endps, path).get();
            gkfs::rpc::remove::input in(path);

        // Wait for RPC responses and then get response
        for (const auto& out : output_set) {
            CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
            // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
            // we can retry for RPC_TRIES (see old commits with margo)
            // TODO(amiranda): hermes will eventually provide a post(endpoint) 
            // returning one result and a broadcast(endpoint_set) returning a 
            // result_set. When that happens we can remove the .at(0) :/
            handles.emplace_back(
                ld_network_service->post<gkfs::rpc::remove>(endp, in));

            if(out.err() != 0) {
                errno = out.err();
                return -1;
        } catch (const std::exception& ex) {
            // TODO(amiranda): we should cancel all previously posted requests 
            // here, unfortunately, Hermes does not support it yet :/
            CTX->log()->error("{}() Failed to send request to host: {}", 
                              __func__, endp.to_string());
            throw std::runtime_error("Failed to forward non-blocking rpc request");
        }
    }

        return 0;
    // wait for RPC responses
    bool got_error = false;

    for(const auto& h : handles) {

        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            auto out = h.get().at(0);

            if(out.err() != 0) {
                CTX->log()->error("{}() received error response: {}", 
                        __func__, out.err());
                got_error = true;
                errno = out.err();
            }
        } catch(const std::exception& ex) {
            CTX->log()->error("{}() while getting rpc output", __func__);
            got_error = true;
            errno = EBUSY;
        return -1;
        }
    }

    return got_error ? -1 : 0;

}


int update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) {

+1 −1
Original line number Diff line number Diff line
@@ -132,7 +132,7 @@ static hg_return_t rpc_srv_rm_node(hg_handle_t handle) {
    if (ret != HG_SUCCESS)
        ADAFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__);
    assert(ret == HG_SUCCESS);
    ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path {}", in.path);
    ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path '{}'", in.path);

    try {
        // Remove metadentry if exists on the node