Loading include/global/rpc/distributor.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -72,6 +72,7 @@ public: class ForwarderDistributor : public Distributor { private: host_t fwd_host_; std::vector<host_t> all_hosts_; unsigned int hosts_size_; std::hash<std::string> str_hash; public: Loading src/global/rpc/distributor.cpp +3 −2 Original line number Diff line number Diff line Loading @@ -71,7 +71,8 @@ locate_directory_metadata(const string& path) const { ForwarderDistributor:: ForwarderDistributor(host_t fwhost, unsigned int hosts_size) : fwd_host_(fwhost), hosts_size_(hosts_size) hosts_size_(hosts_size), all_hosts_(hosts_size) {} host_t ForwarderDistributor:: Loading @@ -91,7 +92,7 @@ locate_file_metadata(const std::string& path) const { std::vector<host_t> ForwarderDistributor:: locate_directory_metadata(const std::string& path) const { return {fwd_host_}; return all_hosts_; } } // namespace rpc } // namespace gkfs tests/integration/forwarding/test_map.py +88 −2 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ import stat import os import time import ctypes import socket import sh import sys import pytest Loading @@ -26,7 +27,7 @@ from harness.logger import logger nonexisting = "nonexisting" def test_write_two_io_nodes(gkfwd_daemon_factory, gkfwd_client_factory): def test_two_io_nodes(gkfwd_daemon_factory, gkfwd_client_factory): """Write files from two clients using two daemons""" d00 = gkfwd_daemon_factory.create() Loading Loading @@ -112,3 +113,88 @@ def test_write_two_io_nodes(gkfwd_daemon_factory, gkfwd_client_factory): assert ret.dirents[1].d_name == 'file-c01' assert ret.dirents[1].d_type == 8 # DT_REG assert ret.errno == 115 #FIXME: Should be 0! with open(c00.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '0' with open(c01.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '1' def test_two_io_nodes_remap(gkfwd_daemon_factory, gkfwd_client_factory): """Write files from two clients using two daemons""" d00 = gkfwd_daemon_factory.create() d01 = gkfwd_daemon_factory.create() c00 = gkfwd_client_factory.create('c-0') c01 = gkfwd_client_factory.create('c-1') file = d00.mountdir / "file-c00-1" # create a file in gekkofs ret = c00.open(file, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval == 10000 assert ret.errno == 115 #FIXME: Should be 0! # write a buffer we know buf = b'42' ret = c00.write(file, buf, len(buf)) assert ret.retval == len(buf) # Return the number of written bytes assert ret.errno == 115 #FIXME: Should be 0! with open(c00.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '0' # recreate the mapping so that the server that wrote will now read c00.remap('c-1') # we need to wait for at least the number of seconds between remap calls time.sleep(10) file = d00.mountdir / "file-c00-2" # open the file to write ret = c00.open(file, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval == 10000 assert ret.errno == 115 #FIXME: Should be 0! # read the file buf = b'24' ret = c00.write(file, buf, len(buf)) assert ret.retval == len(buf) # Return the number of read bytes assert ret.errno == 115 #FIXME: Should be 0! with open(c00.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '1' tests/integration/harness/gkfs.py +17 −1 Original line number Diff line number Diff line Loading @@ -751,6 +751,13 @@ class FwdClient: fwd_map_file.write('{} {}\n'.format(socket.gethostname(), int(identifier.split('-')[1]))) fwd_map_file.close() # record the map so we can modify it latter if needed self._map = self.cwd / gkfwd_forwarding_map_file_local # we need to ensure each client will have a distinct log gkfwd_client_log_file_local = '{}-{}'.format(identifier, gkfwd_client_log_file) self._log = self._workspace.logdir / gkfwd_client_log_file_local libdirs = ':'.join( filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + [str(p) for p in self._workspace.libdirs])) Loading Loading @@ -783,7 +790,7 @@ class FwdClient: 'LIBGKFS_HOSTS_FILE' : self.cwd / gkfwd_hosts_file, 'LIBGKFS_FORWARDING_MAP_FILE' : self.cwd / gkfwd_forwarding_map_file_local, 'LIBGKFS_LOG' : gkfs_client_log_level, 'LIBGKFS_LOG_OUTPUT' : self._workspace.logdir / gkfwd_client_log_file 'LIBGKFS_LOG_OUTPUT' : self._workspace.logdir / gkfwd_client_log_file_local } self._env.update(self._patched_env) Loading Loading @@ -812,6 +819,11 @@ class FwdClient: logger.debug(f"command output: {out.stdout}") return self._parser.parse(cmd, out.stdout) def remap(self, identifier): fwd_map_file = open(self.cwd / self._map, 'w') fwd_map_file.write('{} {}\n'.format(socket.gethostname(), int(identifier.split('-')[1]))) fwd_map_file.close() def __getattr__(self, name): return _proxy_exec(self, name) Loading @@ -819,6 +831,10 @@ class FwdClient: def cwd(self): return self._workspace.twd @property def log(self): return self._log class ShellFwdClient: """ A class to represent a GekkoFS shell client process. Loading Loading
include/global/rpc/distributor.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -72,6 +72,7 @@ public: class ForwarderDistributor : public Distributor { private: host_t fwd_host_; std::vector<host_t> all_hosts_; unsigned int hosts_size_; std::hash<std::string> str_hash; public: Loading
src/global/rpc/distributor.cpp +3 −2 Original line number Diff line number Diff line Loading @@ -71,7 +71,8 @@ locate_directory_metadata(const string& path) const { ForwarderDistributor:: ForwarderDistributor(host_t fwhost, unsigned int hosts_size) : fwd_host_(fwhost), hosts_size_(hosts_size) hosts_size_(hosts_size), all_hosts_(hosts_size) {} host_t ForwarderDistributor:: Loading @@ -91,7 +92,7 @@ locate_file_metadata(const std::string& path) const { std::vector<host_t> ForwarderDistributor:: locate_directory_metadata(const std::string& path) const { return {fwd_host_}; return all_hosts_; } } // namespace rpc } // namespace gkfs
tests/integration/forwarding/test_map.py +88 −2 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ import stat import os import time import ctypes import socket import sh import sys import pytest Loading @@ -26,7 +27,7 @@ from harness.logger import logger nonexisting = "nonexisting" def test_write_two_io_nodes(gkfwd_daemon_factory, gkfwd_client_factory): def test_two_io_nodes(gkfwd_daemon_factory, gkfwd_client_factory): """Write files from two clients using two daemons""" d00 = gkfwd_daemon_factory.create() Loading Loading @@ -112,3 +113,88 @@ def test_write_two_io_nodes(gkfwd_daemon_factory, gkfwd_client_factory): assert ret.dirents[1].d_name == 'file-c01' assert ret.dirents[1].d_type == 8 # DT_REG assert ret.errno == 115 #FIXME: Should be 0! with open(c00.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '0' with open(c01.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '1' def test_two_io_nodes_remap(gkfwd_daemon_factory, gkfwd_client_factory): """Write files from two clients using two daemons""" d00 = gkfwd_daemon_factory.create() d01 = gkfwd_daemon_factory.create() c00 = gkfwd_client_factory.create('c-0') c01 = gkfwd_client_factory.create('c-1') file = d00.mountdir / "file-c00-1" # create a file in gekkofs ret = c00.open(file, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval == 10000 assert ret.errno == 115 #FIXME: Should be 0! # write a buffer we know buf = b'42' ret = c00.write(file, buf, len(buf)) assert ret.retval == len(buf) # Return the number of written bytes assert ret.errno == 115 #FIXME: Should be 0! with open(c00.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '0' # recreate the mapping so that the server that wrote will now read c00.remap('c-1') # we need to wait for at least the number of seconds between remap calls time.sleep(10) file = d00.mountdir / "file-c00-2" # open the file to write ret = c00.open(file, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval == 10000 assert ret.errno == 115 #FIXME: Should be 0! # read the file buf = b'24' ret = c00.write(file, buf, len(buf)) assert ret.retval == len(buf) # Return the number of read bytes assert ret.errno == 115 #FIXME: Should be 0! with open(c00.log) as f: lines = f.readlines() for line in lines: if 'Forward to' in line: ion = line.split()[-1] assert ion == '1'
tests/integration/harness/gkfs.py +17 −1 Original line number Diff line number Diff line Loading @@ -751,6 +751,13 @@ class FwdClient: fwd_map_file.write('{} {}\n'.format(socket.gethostname(), int(identifier.split('-')[1]))) fwd_map_file.close() # record the map so we can modify it latter if needed self._map = self.cwd / gkfwd_forwarding_map_file_local # we need to ensure each client will have a distinct log gkfwd_client_log_file_local = '{}-{}'.format(identifier, gkfwd_client_log_file) self._log = self._workspace.logdir / gkfwd_client_log_file_local libdirs = ':'.join( filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] + [str(p) for p in self._workspace.libdirs])) Loading Loading @@ -783,7 +790,7 @@ class FwdClient: 'LIBGKFS_HOSTS_FILE' : self.cwd / gkfwd_hosts_file, 'LIBGKFS_FORWARDING_MAP_FILE' : self.cwd / gkfwd_forwarding_map_file_local, 'LIBGKFS_LOG' : gkfs_client_log_level, 'LIBGKFS_LOG_OUTPUT' : self._workspace.logdir / gkfwd_client_log_file 'LIBGKFS_LOG_OUTPUT' : self._workspace.logdir / gkfwd_client_log_file_local } self._env.update(self._patched_env) Loading Loading @@ -812,6 +819,11 @@ class FwdClient: logger.debug(f"command output: {out.stdout}") return self._parser.parse(cmd, out.stdout) def remap(self, identifier): fwd_map_file = open(self.cwd / self._map, 'w') fwd_map_file.write('{} {}\n'.format(socket.gethostname(), int(identifier.split('-')[1]))) fwd_map_file.close() def __getattr__(self, name): return _proxy_exec(self, name) Loading @@ -819,6 +831,10 @@ class FwdClient: def cwd(self): return self._workspace.twd @property def log(self): return self._log class ShellFwdClient: """ A class to represent a GekkoFS shell client process. Loading