Verified Commit f37b3e7b authored by Marc Vef's avatar Marc Vef
Browse files

Parallelize opendir when using dir cache

parent b2b74091
Loading
Loading
Loading
Loading
+51 −12
Original line number Diff line number Diff line
@@ -1332,19 +1332,25 @@ gkfs_opendir(const std::string& path) {
    // this is used in get_metadata() later to avoid stat RPCs
    if(CTX->use_cache()) {
        ret.second = make_shared<gkfs::filemap::OpenDir>(path);
        // TODO parallelize

        std::vector<std::future<
                pair<int, unique_ptr<vector<tuple<const basic_string<char>,
                                                  bool, size_t, time_t>>>>>>
                futures;
        // Launch RPC calls asynchronously
        for(uint64_t i = 0; i < CTX->hosts().size(); i++) {
            pair<int, unique_ptr<vector<tuple<const basic_string<char>, bool,
                                              size_t, time_t>>>>
                    res{};
            futures.push_back(std::async(std::launch::async, [&, i]() {
                if(gkfs::config::proxy::fwd_get_dirents_single &&
                   CTX->use_proxy()) {
                res = gkfs::rpc::forward_get_dirents_single_proxy(path, i);
                    return gkfs::rpc::forward_get_dirents_single_proxy(path, i);
                } else {
                res = gkfs::rpc::forward_get_dirents_single(path, i);
                    return gkfs::rpc::forward_get_dirents_single(path, i);
                }
            //            auto res = gkfs::rpc::forward_get_dirents_single(path,
            //            i);
            }));
        }
        // Collect and process results
        for(auto& fut : futures) {
            auto res = fut.get(); // Wait for the RPC result
            auto& open_dir = *res.second;
            for(auto& dentry : open_dir) {
                // type returns as a boolean. true if it is a directory
@@ -1362,6 +1368,39 @@ gkfs_opendir(const std::string& path) {
            }
            ret.first = res.first;
        }
        // TODO parallelize
        //        for(uint64_t i = 0; i < CTX->hosts().size(); i++) {
        //            pair<int, unique_ptr<vector<tuple<const
        //            basic_string<char>, bool,
        //                                              size_t, time_t>>>>
        //                    res{};
        //            if(gkfs::config::proxy::fwd_get_dirents_single &&
        //               CTX->use_proxy()) {
        //                res =
        //                gkfs::rpc::forward_get_dirents_single_proxy(path, i);
        //            } else {
        //                res = gkfs::rpc::forward_get_dirents_single(path, i);
        //            }
        //            auto& open_dir = *res.second;
        //            for(auto& dentry : open_dir) {
        //                // type returns as a boolean. true if it is a
        //                directory LOG(DEBUG, "name: {} type: {} size: {}
        //                ctime: {}",
        //                    get<0>(dentry), get<1>(dentry), get<2>(dentry),
        //                    get<3>(dentry));
        //                auto ftype = get<1>(dentry) ?
        //                gkfs::filemap::FileType::directory
        //                                            :
        //                                            gkfs::filemap::FileType::regular;
        //                // filename, is_dir, size, ctime
        //                ret.second->add(get<0>(dentry), ftype);
        //                CTX->cache()->insert(path, get<0>(dentry),
        //                                     gkfs::cache::cache_entry{ftype,
        //                                                              get<2>(dentry),
        //                                                              get<3>(dentry)});
        //            }
        //            ret.first = res.first;
        //        }
        //        CTX->cache()->dump_cache_to_log(path);
    } else {
        ret = gkfs::rpc::forward_get_dirents(path);