Commit 9dd6ebef authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch 'marc/proxy_dev' into 'master'

New feature: GekkoFS Proxy

The GekkoFS proxy is an additional (alternative) component that runs on each client and acts as gateway between the
client and daemons. It can improve network stability, e.g., for opa-psm2, and provides a basis for future asynchronous
I/O as well as client caching techniques to control file system semantics.

The `gkfs` script fully supports the GekkoFS proxy and an example can be found in `scripts/run`. When using the proxy
manually additional arguments are required on the daemon side, i.e., which network interface and protocol should be
used:

```bash
<daemon args> --proxy-listen eno1 --proxy-protocol ofi+sockets
```

The proxy is started thereafter:

```bash
./gkfs_proxy -H ./gkfs_hostfile --pid-path ./vef_gkfs_proxy.pid -p ofi+sockets
```

The shared hostfile was generated by the daemons whereas the pid_path is local to the machine and is
detected by clients. The pid-path defaults to `/tmp/gkfs_proxy.pid`.

Under default operation, clients detect automatically whether to use the proxy. If another proxy path is used, the
environment variable `LIBGKFS_PROXY_PID_FILE` can be set for the clients.

Alternatively, the `gkfs` automatically sets all required arguments:

```bash
scripts/run/gkfs -c scripts/run/gkfs.conf -f start --proxy
* [gkfs] Starting GekkoFS daemons (1 nodes) ...
* [gkfs] GekkoFS daemons running
* [gkfs] Startup time: 2.013 seconds
* [gkfs] Starting GekkoFS proxies (1 nodes) ...
* [gkfs] GekkoFS proxies running
* [gkfs] Startup time: 5.002 seconds
Press 'q' to exit
```

Please consult `include/config.hpp` for additional configuration options. Note, GekkoFS proxy does not support
replication.

Closes #114

Closes #114

See merge request !191
parents e83e86d2 64915a03
Loading
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -8,6 +8,13 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### New

- Added the GekkoFS proxy as an optional gateway between client and daemon. The proxy is started on each compute node
  that houses clients ([!191](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_request/191)).
  - Additional options for the GekkoFS daemon were added to integrate the GekkoFS proxy.
  - The proxy introduced a new executable: `gkfs_proxy`.
  - The `gkfs` run script has been significantly reworked to accommodate the proxy and a number of additional features,
    e.g., CPU socket pinning.
  - The environment variable `LIBGKFS_PROXY_PID_FILE` was added for clients when a non-default pid file path is in use.
- Added client-side metrics including the periodic export to a file or ZeroMQ sink via the TCP
  protocol ([!176](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_request/176)).
  - CMake option added to enable this optional feature `-DGKFS_ENABLE_CLIENT_METRICS=ON`
+44 −1
Original line number Diff line number Diff line
@@ -37,7 +37,8 @@ GekkoFS testing support: `python38-devel` (**>Python-3.6 required**)
      execute the following command from the root of the source directory: `git submodule update --init`
3. Set up the necessary environment variables where the compiled direct GekkoFS dependencies will be installed at (we
   assume the path `/home/foo/gekkofs_deps/install` in the following)
    - `export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/home/foo/gekkofs_deps/install/lib:/home/foo/gekkofs_deps/install/lib64`
    -
   `export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/home/foo/gekkofs_deps/install/lib:/home/foo/gekkofs_deps/install/lib64`
4. Download and compile the direct dependencies, e.g.,
    - Download example: `gekkofs/scripts/dl_dep.sh /home/foo/gekkofs_deps/git`
    - Compilation example: `gekkofs/scripts/compile_dep.sh /home/foo/gekkofs_deps/git /home/foo/gekkofs_deps/install`
@@ -372,6 +373,48 @@ total_bytes: 1802366
total_iops: 4
```

### GekkoFS proxy

The GekkoFS proxy is an additional (alternative) component that runs on each client and acts as gateway between the
client and daemons. It can improve network stability, e.g., for opa-psm2, and provides a basis for future asynchronous
I/O as well as client caching techniques to control file system semantics.

The `gkfs` script fully supports the GekkoFS proxy and an example can be found in `scripts/run`. When using the proxy
manually additional arguments are required on the daemon side, i.e., which network interface and protocol should be
used:

```bash
<daemon args> --proxy-listen eno1 --proxy-protocol ofi+sockets
```

The proxy is started thereafter:

```bash
./gkfs_proxy -H ./gkfs_hostfile --pid-path ./vef_gkfs_proxy.pid -p ofi+sockets
```

The shared hostfile was generated by the daemons whereas the pid_path is local to the machine and is
detected by clients. The pid-path defaults to `/tmp/gkfs_proxy.pid`.

Under default operation, clients detect automatically whether to use the proxy. If another proxy path is used, the
environment variable `LIBGKFS_PROXY_PID_FILE` can be set for the clients.

Alternatively, the `gkfs` automatically sets all required arguments:

```bash
scripts/run/gkfs -c scripts/run/gkfs.conf -f start --proxy
* [gkfs] Starting GekkoFS daemons (1 nodes) ...
* [gkfs] GekkoFS daemons running
* [gkfs] Startup time: 2.013 seconds
* [gkfs] Starting GekkoFS proxies (1 nodes) ...
* [gkfs] GekkoFS proxies running
* [gkfs] Startup time: 5.002 seconds
Press 'q' to exit
```

Please consult `include/config.hpp` for additional configuration options. Note, GekkoFS proxy does not support
replication.

## Acknowledgment

This software was partially supported by the EC H2020 funded NEXTGenIO project (Project ID: 671951, www.nextgenio.eu).
+2 −2
Original line number Diff line number Diff line
###
#  Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
#  Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
#  Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
#  Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany

#  This software was partially supported by the
#  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
+19 −2
Original line number Diff line number Diff line
@@ -27,10 +27,27 @@
################################################################################

set (CMAKE_CXX_STANDARD 17)
add_executable(sfind sfind.cpp)

add_executable(sfind sfind.cpp)
set_property(TARGET sfind PROPERTY POSITION_INDEPENDENT_CODE ON)
if(GKFS_INSTALL_TESTS)
    install(TARGETS sfind
        DESTINATION ${CMAKE_INSTALL_BINDIR}
    )
endif()

find_package(MPI)
if (MPI_FOUND)
    message(STATUS "[gekkofs] MPI was found. Building gfind example")
    add_executable(gfind gfind.cpp)
    set_property(TARGET gfind PROPERTY POSITION_INDEPENDENT_CODE ON)
    target_link_libraries(gfind
        PUBLIC
        MPI::MPI_CXX
    )
    if(GKFS_INSTALL_TESTS)
        install(TARGETS gfind
            DESTINATION ${CMAKE_INSTALL_BINDIR}
        )
    endif()
endif()
+424 −411
Original line number Diff line number Diff line
@@ -58,10 +58,9 @@ struct dirent_extended {

/* Function exported from GekkoFS LD_PRELOAD, code needs to be compiled with
 * -fPIC, if not will segfault */
extern "C" int gkfs_getsingleserverdir(const char *path,
                                       struct dirent_extended *dirp,
                                       unsigned int count, int server)
    __attribute__((weak));
extern "C" int
gkfs_getsingleserverdir(const char* path, struct dirent_extended* dirp,
                        unsigned int count, int server) __attribute__((weak));

/* PFIND OPTIONS EXTENDED We need to add the GekkoFS mount dir and the number of
 * servers */
@@ -85,7 +84,8 @@ typedef struct {
    int max_entries_per_iter;
    int steal_from_next;            // if true, then steal from the next process
    int parallel_single_dir_access; // if 1, use hashing to parallelize single
                                  // directory access, if 2 sequential increment
                                    // directory access, if 2 sequential
                                    // increment

    int verbosity;
} pfind_options_t;
@@ -103,12 +103,14 @@ int pfind_rank;

static pfind_options_t* opt;

void pfind_abort(const string str) {
void
pfind_abort(const string str) {
    printf("%s", str.c_str());
    exit(1);
}

static void pfind_print_help(pfind_options_t *res) {
static void
pfind_print_help(pfind_options_t* res) {
    printf("pfind \nSynopsis:\n"
           "pfind <workdir> [-newer <timestamp file>] [-size <size>c] [-name "
           "<substr>] [-regex <regex>] [-S <numserver>] [-M <mountdir>]\n"
@@ -120,13 +122,13 @@ static void pfind_print_help(pfind_options_t *res) {
           "Optional flags\n"
           "\t-h: prints the help\n"
           "\t--help: prints the help without initializing MPI\n",
         res->workdir, res->timestamp_file, res->name_pattern, res->num_servers,
         res->mountdir);
           res->workdir, res->timestamp_file, res->name_pattern,
           res->num_servers, res->mountdir);
}
MPI_Comm pfind_com;
int pfind_size;
pfind_options_t *pfind_parse_args(int argc, char **argv, int force_print_help,
                                  MPI_Comm com) {
pfind_options_t*
pfind_parse_args(int argc, char** argv, int force_print_help, MPI_Comm com) {
    MPI_Comm_rank(com, &pfind_rank);
    MPI_Comm_size(com, &pfind_size);
    pfind_com = com;
@@ -200,6 +202,14 @@ pfind_options_t *pfind_parse_args(int argc, char **argv, int force_print_help,
            }
            argv[i][0] = 0;
            argv[++i][0] = 0;
        } else if(strcmp(argv[i], "-M") == 0) {
            res->mountdir = strdup(argv[i + 1]);
            argv[i][0] = 0;
            argv[++i][0] = 0;
        } else if(strcmp(argv[i], "-S") == 0) {
            res->num_servers = atoi(argv[i + 1]);
            argv[i][0] = 0;
            argv[++i][0] = 0;
        } else if(!firstarg) {
            firstarg = strdup(argv[i]);
            argv[i][0] = 0;
@@ -223,7 +233,8 @@ pfind_options_t *pfind_parse_args(int argc, char **argv, int force_print_help,
                res->steal_from_next = 1;
                break;
            case 'x':
      /* ignore fake arg that we added when we processed the extra args */
                /* ignore fake arg that we added when we processed the extra
                 * args */
                break;
            case 'P':
                res->print_by_process = 1;
@@ -254,12 +265,6 @@ pfind_options_t *pfind_parse_args(int argc, char **argv, int force_print_help,
            case 's':
                res->stonewall_timer = atol(optarg);
                break;
    case 'S':
      res->num_servers = atoi(optarg);
      break;
    case 'M':
      res->mountdir = strdup(optarg);
      break;
            case 'v':
                res->verbosity++;
                break;
@@ -291,21 +296,23 @@ pfind_options_t *pfind_parse_args(int argc, char **argv, int force_print_help,
}

/* Master send a new path to the workers */
void send_newPath(string path) {
void
send_newPath(string path) {
    auto count = path.size() + 1;
    MPI_Bcast(&count, 1, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Bcast((void*) path.c_str(), count, MPI_CHAR, 0, MPI_COMM_WORLD);
}

/* Clients get a new path, getting a "0" size char means there is no new path*/
string recv_newPath() {
string
recv_newPath() {
    int count;
    MPI_Bcast(&count, 1, MPI_INT, 0, MPI_COMM_WORLD);
    if(count == 0)
        return "Terminate";
  char buf[count];
  MPI_Bcast(buf, count, MPI_CHAR, 0, MPI_COMM_WORLD);
  return buf;
    std::vector<char> buf(count);
    MPI_Bcast(buf.data(), count, MPI_CHAR, 0, MPI_COMM_WORLD);
    return std::string(buf.begin(), buf.end());
}

/* Client Processing a path.
@@ -315,15 +322,16 @@ string recv_newPath() {
 * server, which is enough for most cases
 *
 */
void dirProcess(const string path, unsigned long long &checked,
void
dirProcess(const string path, unsigned long long& checked,
           unsigned long long& found, queue<string>& dirs,
           unsigned int world_rank, unsigned int world_size,
           pfind_options_t* opt) {
    struct dirent_extended* getdir = (struct dirent_extended*) malloc(
            (sizeof(struct dirent_extended) + 255) * 1024 * 100);
    memset(getdir, 0, (sizeof(struct dirent_extended) + 255) * 1024 * 100);
  // cout << "PROCESSING " << world_rank << "/"<< world_size << " = " << path <<
  // endl;
    // cout << "PROCESSING " << world_rank << "/"<< world_size << " = " << path
    // << endl;


    int servers_per_node = ceil(opt->num_servers / (world_size - 1));
@@ -331,7 +339,7 @@ void dirProcess(const string path, unsigned long long &checked,
        servers_per_node++;
    for(int it = 0; it < servers_per_node; it++) {
        auto server = (world_rank - 1) * servers_per_node + it;
      if (server >= opt->num_servers)
        if(server >= (unsigned int) opt->num_servers)
            break;

        unsigned long long total_size = 0;
@@ -340,7 +348,7 @@ void dirProcess(const string path, unsigned long long &checked,
                (sizeof(struct dirent_extended) + 255) * 1024 * 100, server);
        struct dirent_extended* temp = getdir;

      while (total_size < n) {
        while(total_size < (unsigned long long) n) {
            if(strlen(temp->d_name) == 0)
                break;
            total_size += temp->d_reclen;
@@ -351,8 +359,8 @@ void dirProcess(const string path, unsigned long long &checked,
                    slash = "/";
                checked++;
                dirs.push(path + slash + temp->d_name);
          temp = reinterpret_cast<dirent_extended *>(reinterpret_cast<char *>(temp) +
                                                     temp->d_reclen);
                temp = reinterpret_cast<dirent_extended*>(
                        reinterpret_cast<char*>(temp) + temp->d_reclen);
                continue;
            }
            /* Find filtering */
@@ -361,24 +369,25 @@ void dirProcess(const string path, unsigned long long &checked,
                if((uint64_t) temp->ctime < runtime.ctime_min)
                    timeOK = false;
            }
        if (timeOK and (temp->size == opt->size or opt->size == std::numeric_limits<uint64_t>::max()))
            if(timeOK and (temp->size == opt->size or
                           opt->size == std::numeric_limits<uint64_t>::max()))
                if(!(opt->name_pattern &&
                     regexec(&opt->name_regex, temp->d_name, 0, nullptr, 0)))
                    found++;
            checked++;
        temp =
            reinterpret_cast<dirent_extended *>(reinterpret_cast<char *>(temp) + temp->d_reclen);
            temp = reinterpret_cast<dirent_extended*>(
                    reinterpret_cast<char*>(temp) + temp->d_reclen);
        }
    }
}

int process(char *processor_name, int world_rank, int world_size,
int
process(char* processor_name, int world_rank, int world_size,
        pfind_options_t* opt) {
    // Print off a hello world message

    // INIT PFIND
    memset(&runtime, 0, sizeof(pfind_runtime_options_t));
  int ret;
    /* Get timestamp file */
    if(opt->timestamp_file) {
        if(pfind_rank == 0) {
@@ -393,7 +402,6 @@ int process(char *processor_name, int world_rank, int world_size,
        MPI_Bcast(&runtime.ctime_min, 1, MPI_INT, 0, pfind_com);
    }

  auto iterations = 0;
    if(world_rank == 0) {
        queue<string> dirs;
        string workdir = opt->workdir;
@@ -403,45 +411,47 @@ int process(char *processor_name, int world_rank, int world_size,
        dirs.push(workdir);

        do {

      string processpath = dirs.front();
            std::string processpath = dirs.front();
            dirs.pop();
      // DISTRIBUTE WORK
            send_newPath(processpath);

            auto received_strings = true;
      // We need to gather new directories found (we use send-recv)

            for(auto i = 1; i < world_size; i++) {
                received_strings = true;
                while(received_strings) {
                    received_strings = false;
          //	cout << " Checking from " << i << endl;

                    MPI_Status mpistatus;
                    MPI_Probe(i, 0, MPI_COMM_WORLD, &mpistatus);

                    int count;
                    MPI_Get_count(&mpistatus, MPI_CHAR, &count);
          char buf[count];
          MPI_Recv(&buf, count, MPI_CHAR, i, 0, MPI_COMM_WORLD, &mpistatus);

                    std::vector<char> buf(count);
                    MPI_Recv(buf.data(), count, MPI_CHAR, i, 0, MPI_COMM_WORLD,
                             &mpistatus);

                    if(count == 0) {
                        continue;
                    }
          // cout << " Receiving from " << i << " ---- " << buf << endl;
          string s = buf;
                    std::string s(buf.begin(), buf.end());
                    dirs.push(s);
                    received_strings = true;
                }
            }
      // cout << "NO more paths " << dirs.size() << endl;
        } while(!dirs.empty());


        auto count = 0;
        MPI_Bcast(&count, 1, MPI_INT, 0, MPI_COMM_WORLD);

        MPI_Barrier(MPI_COMM_WORLD);

    unsigned long long *Array_checked =
        (unsigned long long *)malloc(sizeof(unsigned long long) * world_size);
    unsigned long long *Array_found =
        (unsigned long long *)malloc(sizeof(unsigned long long) * world_size);
        unsigned long long* Array_checked = (unsigned long long*) malloc(
                sizeof(unsigned long long) * world_size);
        unsigned long long* Array_found = (unsigned long long*) malloc(
                sizeof(unsigned long long) * world_size);
        unsigned long long checked = 0;
        unsigned long long found = 0;

@@ -467,10 +477,12 @@ int process(char *processor_name, int world_rank, int world_size,
            if(toProcess == "Terminate") {
                break;
            }
      // cout << "REceived " << toProcess << " --- " << world_rank << endl;
            // cout << "REceived " << toProcess << " --- " << world_rank <<
            // endl;
            queue<string> dirs;

      dirProcess(toProcess, checked, found, dirs, world_rank, world_size, opt);
            dirProcess(toProcess, checked, found, dirs, world_rank, world_size,
                       opt);
            // Send NEW DIRS to master
            while(!dirs.empty()) {
                string s = dirs.front();
@@ -493,7 +505,8 @@ int process(char *processor_name, int world_rank, int world_size,
    return 0;
}

int main(int argc, char **argv) {
int
main(int argc, char** argv) {

    for(int i = 0; i < argc; i++) {
        if(strcmp(argv[i], "--help") == 0) {
Loading