Commit 29fbfd15 authored by Jean Bez's avatar Jean Bez
Browse files

Updates on forwarding root location and mapping thread shutdown

parent 91844106
Loading
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -52,7 +52,11 @@ constexpr auto use_blocks = false;
} // namespace metadata

namespace rpc {
#ifdef GKFS_ENABLE_FORWARDING
constexpr auto chunksize = 107374182400; // in bytes (e.g., 107374182400 == 100GB)
#else
constexpr auto chunksize = 524288; // in bytes (e.g., 524288 == 512KB)
#endif
//size of preallocated buffer to hold directory entries in rpc call
constexpr auto dirents_buff_size = (8 * 1024 * 1024); // 8 mega
/*
+1 −1
Original line number Diff line number Diff line
@@ -72,8 +72,8 @@ public:
class ForwarderDistributor : public Distributor {
private:
    host_t fwd_host_;
    std::vector<host_t> all_hosts_;
    unsigned int hosts_size_;
    std::vector<host_t> all_hosts_;
    std::hash<std::string> str_hash;
public:
    ForwarderDistributor(host_t fwhost, unsigned int hosts_size);
+13 −1
Original line number Diff line number Diff line
@@ -39,6 +39,9 @@ pthread_once_t init_env_thread = PTHREAD_ONCE_INIT;
#ifdef GKFS_ENABLE_FORWARDING
pthread_t mapper;
bool forwarding_running;

pthread_mutex_t remap_mutex;
pthread_cond_t remap_signal;
#endif

inline void exit_error_msg(int errcode, const string& msg) {
@@ -130,6 +133,10 @@ void init_ld_environment_() {

#ifdef GKFS_ENABLE_FORWARDING
void *forwarding_mapper(void* p) {
    struct timespec timeout;
    clock_gettime(CLOCK_REALTIME, &timeout);
    timeout.tv_sec += 10; // 10 seconds

    while (forwarding_running) {
        try {
            gkfs::util::load_forwarding_map();
@@ -140,7 +147,10 @@ void *forwarding_mapper(void* p) {
        }

        // Sleeps for 10 seconds
        sleep(10);
        // sleep(10);
        pthread_mutex_lock(&remap_mutex);
        pthread_cond_timedwait(&remap_signal, &remap_mutex, &timeout);
        pthread_mutex_unlock(&remap_mutex);
    }

    return nullptr;
@@ -159,6 +169,8 @@ void init_forwarding_mapper() {
void destroy_forwarding_mapper() {
    forwarding_running = false;

    pthread_cond_signal(&remap_signal);

    pthread_join(mapper, NULL);
}
#endif
+5 −0
Original line number Diff line number Diff line
@@ -403,7 +403,12 @@ int main(int argc, const char* argv[]) {

    assert(vm.count("rootdir"));
    auto rootdir = vm["rootdir"].as<string>();
    #ifdef GKFS_ENABLE_FORWARDING
    // In forwarding mode, the backend is shared
    auto rootdir_path = bfs::path(rootdir);
    #else
    auto rootdir_path = bfs::path(rootdir) / fmt::format_int(getpid()).str();
    #endif
    GKFS_DATA->spdlogger()->debug("{}() Root directory: '{}'",
                                  __func__, rootdir_path.native());
    bfs::create_directories(rootdir_path);
+3 −2
Original line number Diff line number Diff line
@@ -72,8 +72,9 @@ ForwarderDistributor::
ForwarderDistributor(host_t fwhost, unsigned int hosts_size) : 
    fwd_host_(fwhost),
    hosts_size_(hosts_size),
    all_hosts_(hosts_size)
{}
    all_hosts_(hosts_size) {
    ::iota(all_hosts_.begin(), all_hosts_.end(), 0);
}

host_t ForwarderDistributor::
localhost() const {