Skip to content
Commits on Source (29)
Subproject commit a25a82d401b4a0d986088c6802a08fd79185a278
Subproject commit 0af45bfa667f7ff9c78167ef94d975bffbd879f0
......@@ -2092,11 +2092,13 @@ struct chunk_stat {
public:
output() :
m_err(),
m_chunk_size(),
m_chunk_total(),
m_chunk_free() {}
output(uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) :
output(int32_t err, uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) :
m_err(err),
m_chunk_size(chunk_size),
m_chunk_total(chunk_total),
m_chunk_free(chunk_free) {}
......@@ -2111,11 +2113,17 @@ struct chunk_stat {
explicit
output(const rpc_chunk_stat_out_t& out) {
m_err = out.err;
m_chunk_size = out.chunk_size;
m_chunk_total = out.chunk_total;
m_chunk_free = out.chunk_free;
}
int32_t
err() const {
return m_err;
}
uint64_t
chunk_size() const {
return m_chunk_size;
......@@ -2132,6 +2140,7 @@ struct chunk_stat {
}
private:
int32_t m_err;
uint64_t m_chunk_size;
uint64_t m_chunk_total;
uint64_t m_chunk_free;
......
......@@ -14,17 +14,16 @@
#ifndef GEKKOFS_CHUNK_STORAGE_HPP
#define GEKKOFS_CHUNK_STORAGE_HPP
extern "C" {
#include <abt.h>
}
#include <global/global_defs.hpp>
#include <limits>
#include <string>
#include <memory>
#include <system_error>
/* Forward declarations */
namespace spdlog {
class logger;
class logger;
}
namespace gkfs {
......@@ -36,42 +35,43 @@ struct ChunkStat {
unsigned long chunk_free;
};
class ChunkStorageException : public std::system_error {
public:
ChunkStorageException(const int err_code, const std::string& s) : std::system_error(err_code,
std::generic_category(), s) {};
};
class ChunkStorage {
private:
static constexpr const char* LOGGER_NAME = "ChunkStorage";
std::shared_ptr<spdlog::logger> log;
std::shared_ptr<spdlog::logger> log_;
std::string root_path;
size_t chunksize;
std::string root_path_;
size_t chunksize_;
inline std::string absolute(const std::string& internal_path) const;
static inline std::string get_chunks_dir(const std::string& file_path);
static inline std::string get_chunk_path(const std::string& file_path, unsigned int chunk_id);
static inline std::string get_chunk_path(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id);
void init_chunk_space(const std::string& file_path) const;
public:
ChunkStorage(const std::string& path, size_t chunksize);
ChunkStorage(std::string& path, size_t chunksize);
void write_chunk(const std::string& file_path, unsigned int chunk_id,
const char* buff, size_t size, off64_t offset,
ABT_eventual& eventual) const;
void read_chunk(const std::string& file_path, unsigned int chunk_id,
char* buff, size_t size, off64_t offset,
ABT_eventual& eventual) const;
void destroy_chunk_space(const std::string& file_path) const;
void trim_chunk_space(const std::string& file_path, unsigned int chunk_start,
unsigned int chunk_end = std::numeric_limits<unsigned int>::max());
ssize_t
write_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, const char* buf, size_t size,
off64_t offset) const;
void delete_chunk(const std::string& file_path, unsigned int chunk_id);
ssize_t read_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size,
off64_t offset) const;
void truncate_chunk(const std::string& file_path, unsigned int chunk_id, off_t length);
void trim_chunk_space(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_start);
void destroy_chunk_space(const std::string& file_path) const;
void truncate_chunk_file(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length);
ChunkStat chunk_stat() const;
};
......
/*
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#ifndef GEKKOFS_DAEMON_DATA_LOGGING_HPP
#define GEKKOFS_DAEMON_DATA_LOGGING_HPP
#include <spdlog/spdlog.h>
namespace gkfs {
namespace data {
class DataModule {
private:
DataModule() {}
std::shared_ptr<spdlog::logger> log_;
public:
static constexpr const char* LOGGER_NAME = "DataModule";
static DataModule* getInstance() {
static DataModule instance;
return &instance;
}
DataModule(DataModule const&) = delete;
void operator=(DataModule const&) = delete;
const std::shared_ptr<spdlog::logger>& log() const;
void log(const std::shared_ptr<spdlog::logger>& log);
};
#define GKFS_DATA_MOD (static_cast<gkfs::data::DataModule*>(gkfs::data::DataModule::getInstance()))
} // namespace data
} // namespace gkfs
#endif //GEKKOFS_DAEMON_DATA_LOGGING_HPP
/*
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#ifndef GEKKOFS_DAEMON_FILE_HANDLE_HPP
#define GEKKOFS_DAEMON_FILE_HANDLE_HPP
#include <daemon/backend/data/data_module.hpp>
extern "C" {
#include <unistd.h>
}
namespace gkfs {
namespace data {
/**
* File handle to encapsulate a file descriptor, allowing RAII closing of the file descriptor
*/
class FileHandle {
private:
constexpr static const int init_value{-1};
int fd_{init_value};
std::string path_{};
public:
FileHandle() = default;
explicit FileHandle(int fd, std::string path) noexcept :
fd_(fd) {}
FileHandle(FileHandle&& rhs) = default;
FileHandle(const FileHandle& other) = delete;
FileHandle& operator=(FileHandle&& rhs) = default;
FileHandle& operator=(const FileHandle& other) = delete;
explicit operator bool() const noexcept {
return valid();
}
bool operator!() const noexcept {
return !valid();
}
bool
valid() const noexcept {
return fd_ != init_value;
}
int
native() const noexcept {
return fd_;
}
/**
* Closes file descriptor and resets it to initial value
* @return
*/
bool close() noexcept {
if (fd_ != init_value) {
if (::close(fd_) < 0) {
GKFS_DATA_MOD->log()->warn("{}() Failed to close file descriptor '{}' path '{}' errno '{}'", __func__,
fd_, path_, ::strerror(errno));
return false;
}
}
fd_ = init_value;
return true;
}
~FileHandle() {
if (fd_ != init_value)
close();
}
};
} // namespace data
} // namespace gkfs
#endif //GEKKOFS_DAEMON_FILE_HANDLE_HPP
......@@ -17,14 +17,20 @@
#include <string>
#include <stdexcept>
namespace gkfs {
namespace metadata {
class DBException : public std::runtime_error {
public:
DBException(const std::string& s) : std::runtime_error(s) {};
explicit DBException(const std::string& s) : std::runtime_error(s) {};
};
class NotFoundException : public DBException {
public:
NotFoundException(const std::string& s) : DBException(s) {};
explicit NotFoundException(const std::string& s) : DBException(s) {};
};
} // namespace metadata
} // namespace gkfs
#endif //GEKKOFS_DB_EXCEPTIONS_HPP
......@@ -25,8 +25,8 @@ extern "C" {
namespace gkfs {
namespace rpc {
template<typename I, typename O>
inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) {
template<typename InputType, typename OutputType>
inline hg_return_t cleanup(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) {
auto ret = HG_SUCCESS;
if (bulk_handle) {
ret = margo_bulk_free(*bulk_handle);
......@@ -51,16 +51,41 @@ inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t*
return ret;
}
template<typename I, typename O>
inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) {
template<typename OutputType>
inline hg_return_t respond(hg_handle_t* handle, OutputType* output) {
auto ret = HG_SUCCESS;
if (output && handle) {
ret = margo_respond(*handle, output);
if (ret != HG_SUCCESS)
return ret;
}
return cleanup(handle, input, static_cast<O*>(nullptr), bulk_handle);
return ret;
}
template<typename InputType, typename OutputType>
inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) {
auto ret = respond(handle, output);
if (ret != HG_SUCCESS)
return ret;
return cleanup(handle, input, static_cast<OutputType*>(nullptr), bulk_handle);
}
template<typename InputType, typename OutputType>
inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output) {
return cleanup_respond(handle, input, output, nullptr);
}
template<typename OutputType>
inline hg_return_t cleanup_respond(hg_handle_t* handle, OutputType* output) {
auto ret = respond(handle, output);
if (ret != HG_SUCCESS)
return ret;
if (handle) {
ret = margo_destroy(*handle);
if (ret != HG_SUCCESS)
return ret;
}
return ret;
}
} // namespace rpc
......
/*
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#ifndef GEKKOFS_DAEMON_DATA_HPP
#define GEKKOFS_DAEMON_DATA_HPP
#include <daemon/daemon.hpp>
#include <global/global_defs.hpp>
#include <string>
#include <vector>
extern "C" {
#include <abt.h>
#include <margo.h>
}
namespace gkfs {
namespace data {
class ChunkOpException : public std::runtime_error {
public:
explicit ChunkOpException(const std::string& s) : std::runtime_error(s) {};
};
class ChunkWriteOpException : public ChunkOpException {
public:
explicit ChunkWriteOpException(const std::string& s) : ChunkOpException(s) {};
};
class ChunkReadOpException : public ChunkOpException {
public:
explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s) {};
};
class ChunkMetaOpException : public ChunkOpException {
public:
explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s) {};
};
/**
* Classes to encapsulate asynchronous chunk operations.
* All operations on chunk files must go through the Argobots' task queues.
* Otherwise operations may overtake operations in the queues.
* This applies to write, read, and truncate which may modify the middle of a chunk, essentially a write operation.
*
* Note: This class is not thread-safe.
*
* In the future, this class may be used to provide failure tolerance for IO tasks
*
* Base class using the CRTP idiom
*/
template<class OperationType>
class ChunkOperation {
protected:
const std::string path_;
std::vector<ABT_task> abt_tasks_;
std::vector<ABT_eventual> task_eventuals_;
public:
explicit ChunkOperation(const std::string& path) : ChunkOperation(path, 1) {};
ChunkOperation(std::string path, size_t n) : path_(std::move(path)) {
// Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults
abt_tasks_.resize(n);
task_eventuals_.resize(n);
};
~ChunkOperation() {
cancel_all_tasks();
}
/**
* Cleans up and cancels all tasks in flight
*/
void cancel_all_tasks() {
GKFS_DATA->spdlogger()->trace("{}() enter", __func__);
for (auto& task : abt_tasks_) {
if (task) {
ABT_task_cancel(task);
ABT_task_free(&task);
}
}
for (auto& eventual : task_eventuals_) {
if (eventual) {
ABT_eventual_reset(eventual);
ABT_eventual_free(&eventual);
}
}
abt_tasks_.clear();
task_eventuals_.clear();
static_cast<OperationType*>(this)->clear_task_args();
}
};
class ChunkTruncateOperation : public ChunkOperation<ChunkTruncateOperation> {
friend class ChunkOperation<ChunkTruncateOperation>;
private:
struct chunk_truncate_args {
const std::string* path;
size_t size;
ABT_eventual eventual;
};
struct chunk_truncate_args task_arg_{};
static void truncate_abt(void* _arg);
void clear_task_args();
public:
explicit ChunkTruncateOperation(const std::string& path);
~ChunkTruncateOperation() = default;
void truncate(size_t size);
int wait_for_task();
};
class ChunkWriteOperation : public ChunkOperation<ChunkWriteOperation> {
friend class ChunkOperation<ChunkWriteOperation>;
private:
struct chunk_write_args {
const std::string* path;
const char* buf;
gkfs::rpc::chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
std::vector<struct chunk_write_args> task_args_;
static void write_file_abt(void* _arg);
void clear_task_args();
public:
ChunkWriteOperation(const std::string& path, size_t n);
~ChunkWriteOperation() = default;
void write_nonblock(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset);
std::pair<int, size_t> wait_for_tasks();
};
class ChunkReadOperation : public ChunkOperation<ChunkReadOperation> {
friend class ChunkOperation<ChunkReadOperation>;
private:
struct chunk_read_args {
const std::string* path;
char* buf;
gkfs::rpc::chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
std::vector<struct chunk_read_args> task_args_;
static void read_file_abt(void* _arg);
void clear_task_args();
public:
struct bulk_args {
margo_instance_id mid;
hg_addr_t origin_addr;
hg_bulk_t origin_bulk_handle;
std::vector<size_t>* origin_offsets;
hg_bulk_t local_bulk_handle;
std::vector<size_t>* local_offsets;
std::vector<uint64_t>* chunk_ids;
};
ChunkReadOperation(const std::string& path, size_t n);
~ChunkReadOperation() = default;
void read_nonblock(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset);
std::pair<int, size_t>
wait_for_tasks_and_push_back(const bulk_args& args);
};
} // namespace data
} // namespace gkfs
#endif //GEKKOFS_DAEMON_DATA_HPP
......@@ -35,7 +35,7 @@ void update(const std::string& path, Metadata& md);
void update_size(const std::string& path, size_t io_size, off_t offset, bool append);
void remove_node(const std::string& path);
void remove(const std::string& path);
} // namespace metadata
} // namespace gkfs
......
......@@ -18,6 +18,9 @@ namespace gkfs {
// These constexpr set the RPC's identity and which handler the receiver end should use
namespace rpc {
using chnk_id_t = unsigned long;
namespace tag {
constexpr auto fs_config = "rpc_srv_fs_config";
......@@ -46,10 +49,6 @@ constexpr auto ofi_verbs = "ofi+verbs";
} // namespace protocol
} // namespace rpc
namespace types {
// typedefs
typedef unsigned long rpc_chnk_id_t;
} // namespace types
} // namespace gkfs
#endif //GEKKOFS_GLOBAL_DEFS_HPP
......@@ -23,88 +23,95 @@ extern "C" {
/* visible API for RPC data types used in RPCS */
// misc generic rpc types
MERCURY_GEN_PROC(rpc_err_out_t, ((hg_int32_t) (err)))
MERCURY_GEN_PROC(rpc_err_out_t,
((hg_int32_t) (err)))
// Metadentry
MERCURY_GEN_PROC(rpc_mk_node_in_t,
((hg_const_string_t) (path))\
((uint32_t) (mode)))
((hg_const_string_t) (path))
((uint32_t) (mode)))
MERCURY_GEN_PROC(rpc_path_only_in_t, ((hg_const_string_t) (path)))
MERCURY_GEN_PROC(rpc_path_only_in_t,
((hg_const_string_t) (path)))
MERCURY_GEN_PROC(rpc_stat_out_t, ((hg_int32_t) (err))
((hg_const_string_t) (db_val)))
MERCURY_GEN_PROC(rpc_stat_out_t,
((hg_int32_t) (err))
((hg_const_string_t) (db_val)))
MERCURY_GEN_PROC(rpc_rm_node_in_t, ((hg_const_string_t) (path)))
MERCURY_GEN_PROC(rpc_rm_node_in_t,
((hg_const_string_t) (path)))
MERCURY_GEN_PROC(rpc_trunc_in_t,
((hg_const_string_t) (path)) \
((hg_uint64_t) (length)))
((hg_const_string_t) (path))
((hg_uint64_t) (length)))
MERCURY_GEN_PROC(rpc_update_metadentry_in_t,
((hg_const_string_t) (path))\
((uint64_t) (nlink))\
((hg_uint32_t) (mode))\
((hg_uint32_t) (uid))\
((hg_uint32_t) (gid))\
((hg_int64_t) (size))\
((hg_int64_t) (blocks))\
((hg_int64_t) (atime))\
((hg_int64_t) (mtime))\
((hg_int64_t) (ctime))\
((hg_bool_t) (nlink_flag))\
((hg_bool_t) (mode_flag))\
((hg_bool_t) (size_flag))\
((hg_bool_t) (block_flag))\
((hg_bool_t) (atime_flag))\
((hg_bool_t) (mtime_flag))\
((hg_bool_t) (ctime_flag)))
MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, ((hg_const_string_t) (path))
((hg_uint64_t) (size))
((hg_int64_t) (offset))
((hg_bool_t) (append)))
MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, ((hg_int32_t) (err))
((hg_int64_t) (ret_size)))
MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, ((hg_int32_t) (err))
((hg_int64_t) (ret_size)))
((hg_const_string_t) (path))
((uint64_t) (nlink))
((hg_uint32_t) (mode))
((hg_uint32_t) (uid))
((hg_uint32_t) (gid))
((hg_int64_t) (size))
((hg_int64_t) (blocks))
((hg_int64_t) (atime))
((hg_int64_t) (mtime))
((hg_int64_t) (ctime))
((hg_bool_t) (nlink_flag))
((hg_bool_t) (mode_flag))
((hg_bool_t) (size_flag))
((hg_bool_t) (block_flag))
((hg_bool_t) (atime_flag))
((hg_bool_t) (mtime_flag))
((hg_bool_t) (ctime_flag)))
MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t,
((hg_const_string_t) (path))
((hg_uint64_t) (size))
((hg_int64_t) (offset))
((hg_bool_t) (append)))
MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t,
((hg_int32_t) (err))
((hg_int64_t) (ret_size)))
MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t,
((hg_int32_t) (err))
((hg_int64_t) (ret_size)))
#ifdef HAS_SYMLINKS
MERCURY_GEN_PROC(rpc_mk_symlink_in_t,
((hg_const_string_t) (path))\
((hg_const_string_t) (target_path))
((hg_const_string_t) (path))
((hg_const_string_t) (target_path))
)
#endif
// data
MERCURY_GEN_PROC(rpc_read_data_in_t,
((hg_const_string_t) (path))\
((int64_t) (offset))\
((hg_uint64_t) (host_id))\
((hg_uint64_t) (host_size))\
((hg_uint64_t) (chunk_n))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
((hg_uint64_t) (total_chunk_size))\
((hg_bulk_t) (bulk_handle)))
((hg_const_string_t) (path))
((int64_t) (offset))
((hg_uint64_t) (host_id))
((hg_uint64_t) (host_size))
((hg_uint64_t) (chunk_n))
((hg_uint64_t) (chunk_start))
((hg_uint64_t) (chunk_end))
((hg_uint64_t) (total_chunk_size))
((hg_bulk_t) (bulk_handle)))
MERCURY_GEN_PROC(rpc_data_out_t,
((int32_t) (err))\
((hg_size_t) (io_size)))
((int32_t) (err))
((hg_size_t) (io_size)))
MERCURY_GEN_PROC(rpc_write_data_in_t,
((hg_const_string_t) (path))\
((int64_t) (offset))\
((hg_uint64_t) (host_id))\
((hg_uint64_t) (host_size))\
((hg_uint64_t) (chunk_n))\
((hg_uint64_t) (chunk_start))\
((hg_uint64_t) (chunk_end))\
((hg_uint64_t) (total_chunk_size))\
((hg_bulk_t) (bulk_handle)))
((hg_const_string_t) (path))
((int64_t) (offset))
((hg_uint64_t) (host_id))
((hg_uint64_t) (host_size))
((hg_uint64_t) (chunk_n))
((hg_uint64_t) (chunk_start))
((hg_uint64_t) (chunk_end))
((hg_uint64_t) (total_chunk_size))
((hg_bulk_t) (bulk_handle)))
MERCURY_GEN_PROC(rpc_get_dirents_in_t,
((hg_const_string_t) (path))
......@@ -117,15 +124,16 @@ MERCURY_GEN_PROC(rpc_get_dirents_out_t,
)
MERCURY_GEN_PROC(rpc_config_out_t, ((hg_const_string_t) (mountdir))
((hg_const_string_t) (rootdir)) \
((hg_bool_t) (atime_state)) \
((hg_bool_t) (mtime_state)) \
((hg_bool_t) (ctime_state)) \
((hg_bool_t) (link_cnt_state)) \
((hg_bool_t) (blocks_state)) \
((hg_uint32_t) (uid)) \
((hg_uint32_t) (gid)) \
MERCURY_GEN_PROC(rpc_config_out_t,
((hg_const_string_t) (mountdir))
((hg_const_string_t) (rootdir))
((hg_bool_t) (atime_state))
((hg_bool_t) (mtime_state))
((hg_bool_t) (ctime_state))
((hg_bool_t) (link_cnt_state))
((hg_bool_t) (blocks_state))
((hg_uint32_t) (uid))
((hg_uint32_t) (gid))
)
......@@ -134,7 +142,8 @@ MERCURY_GEN_PROC(rpc_chunk_stat_in_t,
)
MERCURY_GEN_PROC(rpc_chunk_stat_out_t,
((hg_uint64_t) (chunk_size))
((hg_int32_t) (err))
((hg_uint64_t) (chunk_size))
((hg_uint64_t) (chunk_total))
((hg_uint64_t) (chunk_free))
)
......
......@@ -22,10 +22,16 @@ extern "C" {
#include <string>
namespace gkfs {
namespace rpc {
hg_bool_t bool_to_merc_bool(bool state);
std::string get_my_hostname(bool short_hostname = false);
std::string get_host_by_name(const std::string& hostname);
} // namespace rpc
} // namespace gkfs
#endif //GEKKOFS_GLOBAL_RPC_UTILS_HPP
......@@ -9,7 +9,7 @@ SOURCE=""
INSTALL=""
DEP_CONFIG=""
VALID_DEP_OPTIONS="mogon2 mogon1 direct all"
VALID_DEP_OPTIONS="mogon2 mogon1 ngio direct all"
MOGON1_DEPS=(
"zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb"
......@@ -18,7 +18,12 @@ MOGON1_DEPS=(
MOGON2_DEPS=(
"zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb"
"syscall_intercept" "date"
"syscall_intercept" "date" "psm2"
)
NGIO_DEPS=(
"zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb"
"syscall_intercept" "date" "psm2"
)
DIRECT_DEPS=(
......@@ -57,7 +62,7 @@ optional arguments:
defaults to 'all'
-c <CONFIG>, --config <CONFIG>
allows additional configurations, e.g., for specific clusters
supported values: {mogon1, mogon2, direct, all}
supported values: {mogon1, mogon2, ngio, direct, all}
defaults to 'direct'
-d <DEPENDENCY>, --dependency <DEPENDENCY>
download a specific dependency and ignore --config setting. If unspecified
......@@ -78,19 +83,27 @@ list_dependencies() {
for d in "${MOGON1_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " Mogon 2: "
for d in "${MOGON2_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " NGIO: "
for d in "${NGIO_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " Direct GekkoFS dependencies: "
for d in "${DIRECT_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " All: "
for d in "${ALL_DEPS[@]}"; do
echo -n "$d "
done
echo ""
echo
}
check_dependency() {
......@@ -233,6 +246,10 @@ mogon2)
DEP_CONFIG=("${MOGON2_DEPS[@]}")
echo "'Mogon2' dependencies are compiled"
;;
ngio)
DEP_CONFIG=("${NGIO_DEPS[@]}")
echo "'NGIO' dependencies are compiled"
;;
all)
DEP_CONFIG=("${ALL_DEPS[@]}")
echo "'All' dependencies are compiled"
......@@ -260,6 +277,7 @@ set -e
export CPATH="${CPATH}:${INSTALL}/include"
export LIBRARY_PATH="${LIBRARY_PATH}:${INSTALL}/lib:${INSTALL}/lib64"
export PKG_CONFIG_PATH="${INSTALL}/lib/pkgconfig:${PKG_CONFIG_PATH}"
## Third party dependencies
......@@ -331,10 +349,16 @@ if check_dependency "ofi" "${DEP_CONFIG[@]}"; then
#libfabric
CURR=${SOURCE}/libfabric
prepare_build_dir ${CURR}
cd ${CURR}
./autogen.sh
cd ${CURR}/build
OFI_CONFIG="../configure --prefix=${INSTALL} --enable-tcp=yes"
if check_dependency "verbs" "${DEP_CONFIG[@]}"; then
OFI_CONFIG="${OFI_CONFIG} --enable-verbs=yes"
elif check_dependency "psm2" "${DEP_CONFIG[@]}"; then
OFI_CONFIG="${OFI_CONFIG} --enable-psm2=yes --with-psm2-src=${SOURCE}/psm2"
elif check_dependency "psm2-system" "${DEP_CONFIG[@]}"; then
OFI_CONFIG="${OFI_CONFIG} --enable-psm2=yes"
fi
${OFI_CONFIG}
make -j${CORES}
......
......@@ -9,7 +9,7 @@ NA_LAYER=""
DEP_CONFIG=""
VERBOSE=false
VALID_DEP_OPTIONS="mogon2 mogon1 direct all"
VALID_DEP_OPTIONS="mogon2 mogon1 ngio direct all"
MOGON1_DEPS=(
"zstd" "lz4" "snappy" "capstone" "ofi-verbs" "mercury" "argobots" "margo" "rocksdb"
......@@ -17,10 +17,15 @@ MOGON1_DEPS=(
)
MOGON2_DEPS=(
"zstd" "lz4" "snappy" "capstone" "ofi" "mercury" "argobots" "margo" "rocksdb"
"syscall_intercept" "date"
"zstd" "lz4" "snappy" "capstone" "ofi-experimental" "mercury" "argobots" "margo" "rocksdb-experimental"
"syscall_intercept-glibc3" "date" "psm2"
)
NGIO_DEPS=(
"zstd" "lz4" "snappy" "capstone" "ofi-experimental" "mercury" "argobots" "margo" "rocksdb"
"syscall_intercept" "date" "psm2"
)
DIRECT_DEPS=(
"ofi" "mercury" "argobots" "margo" "rocksdb" "syscall_intercept" "date"
)
......@@ -57,19 +62,27 @@ list_dependencies() {
for d in "${MOGON1_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " Mogon 2: "
for d in "${MOGON2_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " NGIO: "
for d in "${NGIO_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " Direct GekkoFS dependencies: "
for d in "${DIRECT_DEPS[@]}"; do
echo -n "$d "
done
echo
echo -n " All: "
for d in "${ALL_DEPS[@]}"; do
echo -n "$d "
done
echo ""
echo
}
check_dependency() {
......@@ -82,7 +95,6 @@ check_dependency() {
if echo "${DEPENDENCY}" | grep -q "${DEP}"; then
return
fi
# [[ "${DEPENDENCY}" == "${DEP}" ]] && return
else
# if not check if dependency is part of dependency config
for e in "${DEP_CONFIG[@]}"; do
......@@ -119,7 +131,7 @@ clonedeps() {
fi
# fix the version
cd "${SOURCE}/${FOLDER}" && git checkout -qf ${COMMIT}
echo "${ACTION} ${FOLDER} [$COMMIT]"
echo "${ACTION} '${REPO}' to '${FOLDER}' with commit '[${COMMIT}]' and flags '${GIT_FLAGS}'"
# apply patch if provided
if [[ -n "${PATCH}" ]]; then
......@@ -150,7 +162,7 @@ wgetdeps() {
curl ${COMMON_CURL_FLAGS} "$URL" || error_exit "Failed to download ${URL}" $?
tar -xf "$FILENAME" --directory "${SOURCE}/${FOLDER}" --strip-components=1
rm -f "$FILENAME"
echo "Downloaded ${FOLDER}"
echo "Downloaded '${URL}' to '${FOLDER}'"
}
usage_short() {
......@@ -179,7 +191,7 @@ optional arguments:
defaults to 'ofi'
-c <CONFIG>, --config <CONFIG>
allows additional configurations, e.g., for specific clusters
supported values: {mogon2, direct, all}
supported values: {mogon2, mogon1, ngio, direct, all}
defaults to 'direct'
-d <DEPENDENCY>, --dependency <DEPENDENCY>
download a specific dependency and ignore --config setting. If unspecified
......@@ -265,6 +277,10 @@ mogon2)
DEP_CONFIG=("${MOGON2_DEPS[@]}")
[[ -z "${DEPENDENCY}" ]] && echo "'Mogon2' dependencies are downloaded"
;;
ngio)
DEP_CONFIG=("${NGIO_DEPS[@]}")
[[ -z "${DEPENDENCY}" ]] && echo "'NGIO' dependencies are downloaded"
;;
all)
DEP_CONFIG=("${ALL_DEPS[@]}")
[[ -z "${DEPENDENCY}" ]] && echo "'All' dependencies are downloaded"
......@@ -323,19 +339,23 @@ fi
# get libfabric
if [ "${NA_LAYER}" == "ofi" ] || [ "${NA_LAYER}" == "all" ]; then
if check_dependency "ofi-experimental" "${DEP_CONFIG[@]}"; then
wgetdeps "libfabric" "https://github.com/ofiwg/libfabric/releases/download/v1.9.1/libfabric-1.9.1.tar.bz2" &
clonedeps "libfabric" "https://github.com/ofiwg/libfabric.git" "" "-b v1.9.1" &
elif check_dependency "ofi-verbs" "${DEP_CONFIG[@]}"; then
# libibverbs 1.2.1-1 used on mogon 1i (installed on system) which is linked to libfabric
# libfabric 1.8 random RPCs fail to be send. 1.9 RPC client cannot be started when in an MPI environment
wgetdeps "libfabric" "https://github.com/ofiwg/libfabric/releases/download/v1.7.2/libfabric-1.7.2.tar.gz" &
clonedeps "libfabric" "https://github.com/ofiwg/libfabric.git" "" "-b v1.7.2" &
elif check_dependency "ofi" "${DEP_CONFIG[@]}"; then
wgetdeps "libfabric" "https://github.com/ofiwg/libfabric/releases/download/v1.8.1/libfabric-1.8.1.tar.bz2" &
clonedeps "libfabric" "https://github.com/ofiwg/libfabric.git" "" "-b v1.8.1" &
fi
fi
if check_dependency "psm2" "${DEP_CONFIG[@]}"; then
wgetdeps "psm2" "https://github.com/intel/opa-psm2/archive/PSM2_11.2.86.tar.gz" &
fi
# get Mercury
if check_dependency "mercury" "${DEP_CONFIG[@]}"; then
clonedeps "mercury" "https://github.com/mercury-hpc/mercury" "fd410dfb9852b2b98d21113531f3058f45bfcd64" "--recurse-submodules" &
clonedeps "mercury" "https://github.com/mercury-hpc/mercury" "41caa143a07ed179a3149cac4af0dc7aa3f946fd" "--recurse-submodules" &
fi
# get Argobots
......@@ -345,19 +365,25 @@ fi
# get Margo
if check_dependency "margo" "${DEP_CONFIG[@]}"; then
clonedeps "margo" "https://xgitlab.cels.anl.gov/sds/margo.git" "016dbdce22da3fe4f97b46c20a53bced9370a217" &
clonedeps "margo" "https://xgitlab.cels.anl.gov/sds/margo.git" "v0.6.3" &
fi
# get rocksdb
if check_dependency "rocksdb" "${DEP_CONFIG[@]}"; then
wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.2.2.tar.gz" &
elif check_dependency "rocksdb-experimental" "${DEP_CONFIG[@]}"; then
wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.7.3.tar.gz" &
if check_dependency "rocksdb-experimental" "${DEP_CONFIG[@]}"; then
wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.11.4.tar.gz" &
else
wgetdeps "rocksdb" "https://github.com/facebook/rocksdb/archive/v6.2.2.tar.gz" &
fi
fi
# get syscall_intercept
if check_dependency "syscall_intercept" "${DEP_CONFIG[@]}"; then
clonedeps "syscall_intercept" "https://github.com/pmem/syscall_intercept.git" "cc3412a2ad39f2e26cc307d5b155232811d7408e" "" "syscall_intercept.patch" &
if check_dependency "syscall_intercept-glibc3" "${DEP_CONFIG[@]}"; then
clonedeps "syscall_intercept" "https://github.com/GBuella/syscall_intercept" "ea124fb4ab9eb56bc22a0e94f2b90928c7a88e8c" "-b add_endbr64_and_lea" "syscall_intercept.patch" &
else
clonedeps "syscall_intercept" "https://github.com/pmem/syscall_intercept.git" "cc3412a2ad39f2e26cc307d5b155232811d7408e" "" "syscall_intercept.patch" &
fi
fi
# get date
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import argparse
import time
import os
from util import util
__author__ = "Marc-Andre Vef"
__email__ = "vef@uni-mainz.de"
global PRETEND
global PSSH_PATH
global WAITTIME
global PSSH_HOSTFILE_PATH
def check_dependencies():
global PSSH_PATH
"""Check if pssh is installed"""
pssh_path = os.popen('which pssh').read().strip()
if pssh_path != '':
PSSH_PATH = pssh_path
return
pssh_path = os.popen('which parallel-ssh').read().strip()
if pssh_path != '':
PSSH_PATH = pssh_path
return
print('[ERR] parallel-ssh/pssh executable cannot be found. Please add it to the parameter list')
exit(1)
def shutdown_system(daemon_pid_path, nodelist, sigkill):
"""Shuts down GekkoFS on specified nodes.
Args:
daemon_pid_path (str): Path to daemon pid file
nodelist (str): Comma-separated list of nodes where daemons need to be launched
sigkill (bool): If true force kills daemons
"""
global PSSH_PATH
global PRETEND
global WAITTIME
global PSSH_HOSTFILE_PATH
# get absolute paths
daemon_pid_path = os.path.realpath(os.path.expanduser(daemon_pid_path))
pssh_nodelist = ''
nodefile = False
if os.path.exists(nodelist):
nodefile = True
if not util.create_pssh_hostfile(nodelist, PSSH_HOSTFILE_PATH):
exit(1)
if PSSH_PATH is '':
check_dependencies()
# set pssh arguments
if nodefile:
pssh = '%s -O StrictHostKeyChecking=no -i -h "%s"' % (PSSH_PATH, PSSH_HOSTFILE_PATH)
else:
pssh = '%s -O StrictHostKeyChecking=no -i -H "%s"' % (PSSH_PATH, nodelist.replace(',', ' '))
if sigkill:
cmd_str = '%s "pkill -SIGKILL --pidfile \"%s\""' % (pssh, daemon_pid_path)
else:
cmd_str = '%s "pkill -SIGTERM --pidfile \"%s\""' % (pssh, daemon_pid_path)
if PRETEND:
print('Pretending: {}'.format(cmd_str))
else:
print('Running: {}'.format(cmd_str))
pssh_ret = util.exec_shell(cmd_str, True)
err = False
for line in pssh_ret:
if 'FAILURE' in line.strip()[:30]:
err = True
print('------------------------- ERROR pssh -- Host "{}" -------------------------'.format(\
line[line.find('FAILURE'):].strip().split(' ')[1]))
print(line)
if not err:
if sigkill:
print('pssh daemon launch successfully executed. FS daemons have been force killed ...')
exit(1)
else:
print('pssh daemon launch successfully executed. Checking for FS shutdown errors ...\n')
else:
print('[ERR] with pssh. Aborting...')
exit(1)
if not PRETEND:
print('Give it some time ({} second) to finish up ...'.format(WAITTIME))
for i in range(WAITTIME):
print('{}\r'.format(WAITTIME - i))
time.sleep(1)
print('Checking logs ...\n')
cmd_chk_str = '%s "tail -4 /tmp/gkfs_daemon.log"' % pssh
if PRETEND:
print('Pretending: {}'.format(cmd_chk_str))
else:
print('Running: {}'.format(cmd_chk_str))
pssh_ret = util.exec_shell(cmd_chk_str, True)
err = False
fs_err = False
for line in pssh_ret:
if line == '':
continue
if 'Failure' in line.strip()[:30]:
err = True
print('------------------------- ERROR pssh -- Host "{}" -------------------------'.format(\
line[line.find('FAILURE'):].strip().split(' ')[1]))
print(line)
else:
# check for errors in log
if not 'All services shut down.' in line[line.strip().find('\n') + 1:]:
fs_err = True
print('------------------------- WARN pssh -- Host "{}" -------------------------'.format(\
line.strip().split(' ')[3].split('\n')[0]))
print('{}'.format(line[line.find('\n') + 1:]))
if not err and not fs_err:
print('pssh logging check successfully executed. Looks prime.')
else:
print('[WARN] while checking fs logs. Something might went wrong when shutting down')
exit(1)
if __name__ == "__main__":
# Init parser
parser = argparse.ArgumentParser(description='This script stops GekkoFS on multiple nodes',
formatter_class=argparse.RawTextHelpFormatter)
# positional arguments
parser.add_argument('daemonpidpath', type=str,
help='path to the daemon pid file')
parser.add_argument('nodelist', type=str,
help='''list of nodes where the file system is launched. This can be a comma-separated list
or a path to a nodefile (one node per line)''')
# optional arguments
parser.add_argument('-p', '--pretend', action='store_true',
help='Output launch command and do not actually execute it')
parser.add_argument('-9', '--sigkill', action='store_true',
help='Force kill daemons')
parser.add_argument('-P', '--pssh', metavar='<PSSH_PATH>', type=str, default='',
help='Path to parallel-ssh/pssh. Defaults to /usr/bin/{parallel-ssh,pssh}')
parser.add_argument('-J', '--jobid', metavar='<JOBID>', type=str, default='',
help='Jobid for cluster batch system. Used for a unique hostfile used for pssh.')
parser.add_argument('-H', '--pssh_hostfile', metavar='<pssh_hostfile>', type=str, default='/tmp/hostfile_pssh',
help='''This script creates a hostfile to pass to MPI. This variable defines the path.
Defaults to /tmp/hostfile_pssh''')
args = parser.parse_args()
if args.pretend is True:
PRETEND = True
else:
PRETEND = False
if args.jobid == '':
PSSH_HOSTFILE_PATH = args.pssh_hostfile
else:
PSSH_HOSTFILE_PATH = '%s_%s' % (args.pssh_hostfile, args.jobid)
PSSH_PATH = args.pssh
WAITTIME = 5
shutdown_system(args.daemonpidpath, args.nodelist, args.sigkill)
print('\nNothing left to do; exiting. :)')
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import argparse
import time
import os
from util import util
__author__ = "Marc-Andre Vef"
__email__ = "vef@uni-mainz.de"
global PRETEND
global PSSH_PATH
global WAITTIME
global PSSH_HOSTFILE_PATH
def check_dependencies():
global PSSH_PATH
"""Check if pssh is installed"""
pssh_path = os.popen('which pssh').read().strip()
if pssh_path != '':
PSSH_PATH = pssh_path
return
pssh_path = os.popen('which parallel-ssh').read().strip()
if pssh_path != '':
PSSH_PATH = pssh_path
return
print('[ERR] parallel-ssh/pssh executable cannot be found. Please add it to the parameter list')
exit(1)
def init_system(daemon_path, rootdir, metadir, mountdir, nodelist, cleanroot, numactl):
"""Initializes GekkoFS on specified nodes.
Args:
daemon_path (str): Path to daemon executable
rootdir (str): Path to root directory for fs data
metadir (str): Path to metadata directory where metadata is stored
mountdir (str): Path to mount directory where is used in
nodelist (str): Comma-separated list of nodes where daemons need to be launched
cleanroot (bool): if True, root and metadir is cleaned before daemon init
numactl (str): numactl arguments for daemon init
"""
global PSSH_PATH
global PRETEND
global PSSH_HOSTFILE_PATH
# get absolute paths
daemon_path = os.path.realpath(os.path.expanduser(daemon_path))
mountdir = os.path.realpath(os.path.expanduser(mountdir))
rootdir = os.path.realpath(os.path.expanduser(rootdir))
# Replace metadir with rootdir if only rootdir is given
if len(metadir) == 0:
metadir = rootdir
else:
metadir = os.path.realpath(os.path.expanduser(metadir))
pssh_nodelist = ''
nodefile = False
if os.path.exists(nodelist):
nodefile = True
if not util.create_pssh_hostfile(nodelist, PSSH_HOSTFILE_PATH):
exit(1)
if PSSH_PATH is '':
check_dependencies()
# set pssh arguments
if nodefile:
pssh = '%s -O StrictHostKeyChecking=no -i -h "%s"' % (PSSH_PATH, PSSH_HOSTFILE_PATH)
else:
pssh = '%s -O StrictHostKeyChecking=no -i -H "%s"' % (PSSH_PATH, nodelist.replace(',', ' '))
# clean root and metadata dir if needed
if cleanroot:
cmd_rm_str = '%s "rm -rf %s/* %s/* && truncate -s 0 /tmp/gkfs_daemon.log /tmp/gkfs_preload.log"' % (pssh, rootdir, metadir)
if PRETEND:
print('Pretending: {}'.format(cmd_rm_str))
else:
print('Running: {}'.format(cmd_rm_str))
pssh_ret = util.exec_shell(cmd_rm_str, True)
err = False
for line in pssh_ret:
if 'FAILURE' in line.strip()[:30]:
err = True
print('------------------------- ERROR pssh -- Host "{}" -------------------------'.format(\
line[line.find('FAILURE'):].strip().split(' ')[1]))
print(line)
if not err:
print('pssh daemon launch successfully executed. Root and Metadata dir are cleaned.\n')
else:
print('[ERR] with pssh. Aborting!')
exit(1)
# Start deamons
if nodefile:
if len(numactl) == 0:
cmd_str = '%s "nohup %s -r %s -i %s -m %s --hostfile %s > /tmp/gkfs_daemon.log 2>&1 &"' \
% (pssh, daemon_path, rootdir, metadir, mountdir, nodelist)
else:
cmd_str = '%s "nohup numactl %s %s -r %s -i %s -m %s --hostfile %s > /tmp/gkfs_daemon.log 2>&1 &"' \
% (pssh, numactl, daemon_path, rootdir, metadir, mountdir, nodelist)
else:
if len(numactl) == 0:
cmd_str = '%s "nohup %s -r %s -i %s -m %s --hosts %s > /tmp/gkfs_daemon.log 2>&1 &"' \
% (pssh, daemon_path, rootdir, metadir, mountdir, nodelist)
else:
cmd_str = '%s "nohup numactl %s %s -r %s -i %s -m %s --hosts %s > /tmp/gkfs_daemon.log 2>&1 &"' \
% (pssh, numactl, daemon_path, rootdir, metadir, mountdir, nodelist)
if PRETEND:
print('Pretending: {}'.format(cmd_str))
else:
print('Running: {}'.format(cmd_str))
pssh_ret = util.exec_shell(cmd_str, True)
err = False
for line in pssh_ret:
if 'FAILURE' in line.strip()[:30]:
err = True
print('------------------------- ERROR pssh -- Host "{}" -------------------------'.format(\
line[line.find('FAILURE'):].strip().split(' ')[1]))
print(line)
if not err:
print('pssh daemon launch successfully executed. Checking for FS startup errors ...\n')
else:
print('[ERR] with pssh. Aborting. Please run shutdown_gkfs.py to shut down orphan daemons!')
exit(1)
if not PRETEND:
print('Give it some time ({} second) to startup ...'.format(WAITTIME))
for i in range(WAITTIME):
print('{}\r'.format(WAITTIME - i)),
time.sleep(1)
# Check logs for errors
cmd_chk_str = '%s "head -5 /tmp/gkfs_daemon.log"' % pssh
if PRETEND:
print('Pretending: {}'.format(cmd_chk_str))
else:
print('Running: {}'.format(cmd_chk_str))
pssh_ret = util.exec_shell(cmd_chk_str, True)
err = False
fs_err = False
for line in pssh_ret:
if 'Failure' in line.strip()[:30]:
err = True
print('------------------------- ERROR pssh -- Host "{}" -------------------------'.format(\
line[line.find('FAILURE'):].strip().split(' ')[1]))
print(line)
else:
# check for errors in log
if '[E]' in line[line.strip().find('\n') + 1:] or 'Assertion `err\'' in line[
line.strip().find('\n') + 1:]:
fs_err = True
print('------------------------- ERROR pssh -- Host "{}" -------------------------'.format(\
line.strip().split(' ')[3].split('\n')[0]))
print('{}'.format(line[line.find('\n') + 1:]))
if not err and not fs_err:
print('pssh logging check successfully executed. Looks prime.')
else:
print('[ERR] while checking fs logs. Aborting. Please run shutdown_gkfs.py to shut down orphan daemons!')
exit(1)
if __name__ == "__main__":
# Init parser
parser = argparse.ArgumentParser(description='This script launches GekkoFS on multiple nodes',
formatter_class=argparse.RawTextHelpFormatter)
# positional arguments
parser.add_argument('daemonpath', type=str,
help='path to the daemon executable')
parser.add_argument('rootdir', type=str,
help='path to the root directory where all data will be stored')
parser.add_argument('mountdir', type=str,
help='path to the mount directory of the file system')
parser.add_argument('nodelist', type=str,
help='''list of nodes where the file system is launched. This can be a comma-separated list
or a path to a nodefile (one node per line)''')
# optional arguments
parser.add_argument('-i', '--metadir', metavar='<METADIR_PATH>', type=str, default='',
help='''Path to separate metadir directory where metadata is stored.
If not set, rootdir will be used instead.''')
parser.add_argument('-p', '--pretend', action='store_true',
help='Output launch command and do not actually execute it')
parser.add_argument('-P', '--pssh', metavar='<PSSH_PATH>', type=str, default='',
help='Path to parallel-ssh/pssh. Defaults to /usr/bin/{parallel-ssh,pssh}')
parser.add_argument('-J', '--jobid', metavar='<JOBID>', type=str, default='',
help='Jobid for cluster batch system. Used for a unique hostfile used for pssh.')
parser.add_argument('-c', '--cleanroot', action='store_true',
help='Removes contents of root and metadata directory before starting daemon. Be careful!')
parser.add_argument('-n', '--numactl', metavar='<numactl_args>', type=str, default='',
help='If daemon should be pinned to certain cores, set numactl arguments here.')
parser.add_argument('-H', '--pssh_hostfile', metavar='<pssh_hostfile>', type=str, default='/tmp/hostfile_pssh',
help='''This script creates a hostfile to pass to MPI. This variable defines the path.
Defaults to /tmp/hostfile_pssh''')
args = parser.parse_args()
if args.pretend:
PRETEND = True
else:
PRETEND = False
if args.jobid == '':
PSSH_HOSTFILE_PATH = args.pssh_hostfile
else:
PSSH_HOSTFILE_PATH = '%s_%s' % (args.pssh_hostfile, args.jobid)
PSSH_PATH = args.pssh
WAITTIME = 5
init_system(args.daemonpath, args.rootdir, args.metadir, args.mountdir, args.nodelist, args.cleanroot, args.numactl)
print('\nNothing left to do; exiting. :)')
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Script contains different useful functions
# Marc-Andre Vef
# Version 0.2 (06/19/2015)
from __future__ import print_function
import collections
import shutil
import sys
import time
from subprocess import Popen, PIPE
import os
def rm_trailing_slash(path):
"""Removes the trailing slash from a given path"""
return path[:-1] if path[-1] == '/' else path
def create_dir(path):
"""creates a directory at the paths location. Must be an absolute path"""
try:
if not os.path.exists(path):
os.makedirs(path)
except OSError as e:
print('Error: Output directory could not be created.')
print(e.strerror)
sys.exit(1)
def rm_rf(path):
"""rm -rf path"""
try:
shutil.rmtree(path)
except shutil.Error as e:
print('Warning: Could not delete path {}'.format(path))
print(e.strerror)
def rm_file(path):
"""remove a file"""
try:
os.remove(path)
except OSError as e:
print('Warning: Could not delete file {}'.format(path))
print(e.strerror)
def tprint(toprint, nobreak=False):
"""Function adds current time to input and prints it to the terminal
Args:
toprint (str): The string to be printed.
nobreak (bool): True if no line break should occur after print. Defaults to False.
"""
curr_time = time.strftime('[%H:%M:%S]')
if nobreak:
print('{}\t{}'.format(curr_time, toprint))
else:
print('{}\t{}'.format(curr_time, toprint))
def exec_shell(cmd, suppress_output=False):
"""Function executes a cmd on the shell
Args:
cmd (str): The command to be executed.
suppress_output (bool): Suppresses command line output. Defaults to False.
Returns:
(namedtuple(shell_out[err=(str), output=(str))): Executed cmd output in output
Err is filled if a cmd error is encountered else ''
Raises:
OSError: If command could not be executed.
"""
try:
if cmd == '':
raise OSError("Command string is empty.")
# simulate change directory to where mdtest executable is located
p = Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE, bufsize=1)
shell_out = ''
# Poll process for new output until finished
for line in iter(p.stdout.readline, ''):
if not suppress_output:
curr_time = time.strftime('[%H:%M:%S] ')
sys.stdout.write('%s%s' % (curr_time, line))
sys.stdout.flush()
shell_out += line
stdout, stderr = p.communicate()
out_tuple = collections.namedtuple('shell_out', ['err', 'output'])
return out_tuple(err=stderr.strip(), output=shell_out.strip())
except OSError as e:
print('ERR when executing shell command')
print(e.strerror)
def check_shell_out(msg):
"""Function looks for an error in namedtuple of exec_shell and returns the output string only if valid
Args:
msg((namedtuple(shell_out[err=(str), output=(str)))): The message to be printed
Returns:
(str): Returns the msg output if no error
Raises:
OSError: if shell_out contains an error
"""
if msg.err != '':
raise OSError('Shell command failed with\n\t%s' % msg.err)
return msg.output
def create_pssh_hostfile(hostfile, hostfile_pssh):
"""Function creates a pssh compatible hostfile
Args:
hostfile(str): Path to source hostfile
hostfile_pssh(str): Path to pssh compatible hostfile (contents will be created first)
Returns:
(bool): Returns true if successful
"""
# truncate pssh hostfile
try:
open(hostfile_pssh, 'w').close()
# make nodefile pssh compatible
with open(hostfile, 'r') as rf:
for line in rf.readlines():
# skip commented lines
if line.startswith('#'):
continue
with open(hostfile_pssh, 'a') as wf:
wf.write(line.strip().split(' ')[0] + '\n')
except IOError as e:
print('ERR while creating pssh compatible hostfile')
print(e.strerror)
return False
return True
......@@ -271,7 +271,13 @@ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask,
#endif
int gkfs_statfs(struct statfs* buf) {
auto blk_stat = gkfs::rpc::forward_get_chunk_stat();
gkfs::rpc::ChunkStat blk_stat{};
try {
blk_stat = gkfs::rpc::forward_get_chunk_stat();
} catch (const std::exception& e) {
LOG(ERROR, "{}() Failure with error: '{}'", e.what());
return -1;
}
buf->f_type = 0;
buf->f_bsize = blk_stat.chunk_size;
buf->f_blocks = blk_stat.chunk_total;
......@@ -288,8 +294,13 @@ int gkfs_statfs(struct statfs* buf) {
}
int gkfs_statvfs(struct statvfs* buf) {
gkfs::preload::init_ld_env_if_needed();
auto blk_stat = gkfs::rpc::forward_get_chunk_stat();
gkfs::rpc::ChunkStat blk_stat{};
try {
blk_stat = gkfs::rpc::forward_get_chunk_stat();
} catch (const std::exception& e) {
LOG(ERROR, "{}() Failure with error: '{}'", e.what());
return -1;
}
buf->f_bsize = blk_stat.chunk_size;
buf->f_blocks = blk_stat.chunk_total;
buf->f_bfree = blk_stat.chunk_free;
......
......@@ -19,6 +19,7 @@
#include <client/intercept.hpp>
#include <global/rpc/distributor.hpp>
#include <global/global_defs.hpp>
#include <fstream>
......@@ -64,6 +65,9 @@ bool init_hermes_client(const std::string& transport_prefix) {
#if USE_SHM
opts |= hermes::use_auto_sm;
#endif
if (gkfs::rpc::protocol::ofi_psm2 == string(RPC_PROTOCOL)) {
opts |= hermes::force_no_block_progress;
}
ld_network_service =
std::make_unique<hermes::async_engine>(
......