Commit d305eaad authored by Ramon Nou's avatar Ramon Nou
Browse files

sharding openmap and remove chunk issue

parent c5f8c5b1
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
    - Added tests for FUSE support
  - Optimized and fixed fuse ([!293](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/293))
    - New optimization options
    - Less contention with threading
    - Less contention with threading using sharding
  
 

@@ -51,6 +51,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
  - Fix pytorch mmap ([!291](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/291))
  - Fix cuda in syscall ([!292](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/292))
    - mmap and dangling fd issues
  - Fix remove chunk bug ([!294](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/294))
    

## [0.9.5] - 2025-08
+14 −3
Original line number Diff line number Diff line
@@ -138,10 +138,18 @@ private:


class OpenFileMap {

private:
    std::map<int, std::shared_ptr<OpenFile>> files_;
    std::recursive_mutex files_mutex_;
    struct MapShard {
        std::map<int, std::shared_ptr<OpenFile>> files;
        mutable std::recursive_mutex mutex;
    };

    static constexpr size_t num_shards = 32;
    std::array<MapShard, num_shards> shards_;
    std::atomic<size_t> total_files_{0};

    MapShard&
    get_shard(int fd);

    int
    safe_generate_fd_idx_();
@@ -194,6 +202,9 @@ public:

    std::vector<int>
    get_range(unsigned int first, unsigned int last);

    bool
    empty() const;
};

} // namespace gkfs::filemap
+56 −23
Original line number Diff line number Diff line
@@ -145,11 +145,22 @@ OpenFile::inline_data_size(size_t size) {
    OpenFile::inline_data_size_ = size;
}

OpenFileMap::MapShard&
OpenFileMap::get_shard(int fd) {
    if(fd < 0) {
        return shards_[0]; // Default shard for invalid FDs, or we could throw
    }
    return shards_[fd % num_shards];
}

shared_ptr<OpenFile>
OpenFileMap::get(int fd) {
    lock_guard<recursive_mutex> lock(files_mutex_);
    auto f = files_.find(fd);
    if(f == files_.end()) {
    if(fd < 0)
        return nullptr;
    auto& shard = get_shard(fd);
    lock_guard<recursive_mutex> lock(shard.mutex);
    auto f = shard.files.find(fd);
    if(f == shard.files.end()) {
        return nullptr;
    } else {
        return f->second;
@@ -167,9 +178,12 @@ OpenFileMap::get_dir(int dirfd) {

bool
OpenFileMap::exist(const int fd) {
    lock_guard<recursive_mutex> lock(files_mutex_);
    auto f = files_.find(fd);
    return !(f == files_.end());
    if(fd < 0)
        return false;
    auto& shard = get_shard(fd);
    lock_guard<recursive_mutex> lock(shard.mutex);
    auto f = shard.files.find(fd);
    return !(f == shard.files.end());
}

int
@@ -224,19 +238,27 @@ OpenFileMap::safe_generate_fd_idx_() {
int
OpenFileMap::add(std::shared_ptr<OpenFile> open_file) {
    auto fd = safe_generate_fd_idx_();
    lock_guard<recursive_mutex> lock(files_mutex_);
    files_[fd] = open_file;
    if(fd < 0)
        return fd;
    auto& shard = get_shard(fd);
    lock_guard<recursive_mutex> lock(shard.mutex);
    shard.files[fd] = open_file;
    total_files_++;
    return fd;
}

bool
OpenFileMap::remove(const int fd) {
    lock_guard<recursive_mutex> lock(files_mutex_);
    auto f = files_.find(fd);
    if(f == files_.end()) {
    if(fd < 0)
        return false;
    auto& shard = get_shard(fd);
    lock_guard<recursive_mutex> lock(shard.mutex);
    auto f = shard.files.find(fd);
    if(f == shard.files.end()) {
        return false;
    }
    files_.erase(fd);
    shard.files.erase(fd);
    total_files_--;

    if(!CTX->protect_fds()) {
        if(!CTX->range_fd()) {
@@ -245,7 +267,7 @@ OpenFileMap::remove(const int fd) {
            return true;
        }
    }
    if(fd_validation_needed && files_.empty()) {
    if(fd_validation_needed && empty()) {
        fd_validation_needed = false;
        LOG(DEBUG, "fd_validation flag reset");
    }
@@ -254,20 +276,20 @@ OpenFileMap::remove(const int fd) {

int
OpenFileMap::dup(const int oldfd) {
    lock_guard<recursive_mutex> lock(files_mutex_);
    auto open_file = get(oldfd);
    if(open_file == nullptr) {
        errno = EBADF;
        return -1;
    }
    auto newfd = safe_generate_fd_idx_();
    files_.insert(make_pair(newfd, open_file));
    auto& shard = get_shard(newfd);
    lock_guard<recursive_mutex> lock(shard.mutex);
    shard.files.insert(make_pair(newfd, open_file));
    return newfd;
}

int
OpenFileMap::dup2(const int oldfd, const int newfd) {
    lock_guard<recursive_mutex> lock(files_mutex_);
    auto open_file = get(oldfd);
    if(open_file == nullptr) {
        errno = EBADF;
@@ -283,7 +305,10 @@ OpenFileMap::dup2(const int oldfd, const int newfd) {
    // by os streams that we do not overwrite
    if(get_fd_idx() < newfd && newfd != 0 && newfd != 1 && newfd != 2)
        fd_validation_needed = true;
    files_.insert(make_pair(newfd, open_file));

    auto& shard = get_shard(newfd);
    lock_guard<recursive_mutex> lock(shard.mutex);
    shard.files.insert(make_pair(newfd, open_file));
    return newfd;
}

@@ -318,15 +343,23 @@ OpenFileMap::get_fd_idx() {

std::vector<int>
OpenFileMap::get_range(unsigned int first, unsigned int last) {
    std::lock_guard<std::recursive_mutex> lock(files_mutex_);
    std::vector<int> result;
    // files_ is a sorted std::map, so lower_bound gives us an efficient start
    auto it = files_.lower_bound(static_cast<int>(first));
    while(it != files_.end() && static_cast<unsigned int>(it->first) <= last) {
    for(auto& shard : shards_) {
        lock_guard<recursive_mutex> lock(shard.mutex);
        auto it = shard.files.lower_bound(static_cast<int>(first));
        while(it != shard.files.end() &&
              static_cast<unsigned int>(it->first) <= last) {
            result.push_back(it->first);
            ++it;
        }
    }
    std::sort(result.begin(), result.end());
    return result;
}

bool
OpenFileMap::empty() const {
    return total_files_ == 0;
}

} // namespace gkfs::filemap
 No newline at end of file
+7 −3
Original line number Diff line number Diff line
@@ -385,12 +385,16 @@ init_preload() {
#ifdef ENABLE_USER
    return;
#endif
    static std::recursive_mutex init_mutex;
    std::lock_guard<std::recursive_mutex> lock(init_mutex);
    if(init) {
        return;
    }
    // The original errno value will be restored after initialization to not
    // leak internal error codes
    auto oerrno = errno;
    if(atomic_exchange(&init, 1) == 0) {
    init = true;
    pthread_atfork(&at_fork, &at_parent, &at_child);
    }

#ifndef BYPASS_SYSCALL
    CTX->enable_interception();
+31 −0
Original line number Diff line number Diff line
@@ -118,6 +118,7 @@ int
forward_remove(const std::string& path, bool rm_dir, int8_t num_copies) {
    int err = 0;

    // Step 1: Remove metadata from the responsible server(s)
    // We iterate over replicas
    for(auto copy = 0; copy < num_copies + 1; copy++) {
        auto endp = CTX->hosts().at(
@@ -135,6 +136,36 @@ forward_remove(const std::string& path, bool rm_dir, int8_t num_copies) {
            err = out.err;
        }
    }

    // If metadata removal failed, bail out early
    if(err != 0)
        return err;

    // Step 2: Broadcast remove_data to ALL daemons so that every host can
    // clean up whatever chunks it holds for this file.
    // NOTE: When using the proxy, the proxy handles this broadcast itself, so
    // we skip it here (CTX->use_proxy() is checked by the caller).
    // We send remove_data unconditionally for non-directory removes.
    // Daemons handle the no-op case safely (empty chunk dir == nothing to do).
    if(!rm_dir) {
        auto rpc_remove_data =
                CTX->rpc_engine()->define(gkfs::rpc::tag::remove_data);

        gkfs::rpc::rpc_rm_node_in_t rm_in;
        rm_in.path = path;
        rm_in.rm_dir = true; // tells daemon to remove the whole chunk directory

        for(std::size_t i = 0; i < CTX->hosts().size(); i++) {
            try {
                rpc_remove_data.on(CTX->hosts().at(i)).async(rm_in);
            } catch(const std::exception& e) {
                LOG(WARNING,
                    "{}() Failed to send remove_data RPC to host {}: {}",
                    __func__, i, e.what());
            }
        }
    }

    return err;
}

Loading