Loading src/daemon/backend/data/chunk_storage.cpp +15 −9 Original line number Original line Diff line number Diff line Loading @@ -20,6 +20,8 @@ #include <boost/filesystem.hpp> #include <boost/filesystem.hpp> #include <spdlog/spdlog.h> #include <spdlog/spdlog.h> #include "leaf/handle_errors.hpp" extern "C" { extern "C" { #include <sys/statfs.h> #include <sys/statfs.h> Loading Loading @@ -98,15 +100,19 @@ ChunkStorage::ChunkStorage(string& path, const size_t chunksize) : */ */ void ChunkStorage::destroy_chunk_space(const string& file_path) const { void ChunkStorage::destroy_chunk_space(const string& file_path) const { auto chunk_dir = absolute(get_chunks_dir(file_path)); auto chunk_dir = absolute(get_chunks_dir(file_path)); try { boost::leaf::try_catch( [&] { // Note: remove_all does not throw an error when path doesn't exist. // Note: remove_all does not throw an error when path doesn't exist. auto n = bfs::remove_all(chunk_dir); auto n = bfs::remove_all(chunk_dir); log_->debug("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); log_->debug("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); } catch (const bfs::filesystem_error& e) { }, [&](const bfs::filesystem_error& e) { auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, chunk_dir, e.what()); chunk_dir, e.what()); throw ChunkStorageException(e.code().value(), err_str); throw ChunkStorageException(e.code().value(), err_str); } }); } } /** /** Loading src/daemon/daemon.cpp +105 −59 Original line number Original line Diff line number Diff line Loading @@ -37,6 +37,7 @@ #include <fstream> #include <fstream> #include <csignal> #include <csignal> #include <condition_variable> #include <condition_variable> #include "leaf/handle_errors.hpp" extern "C" { extern "C" { #include <unistd.h> #include <unistd.h> Loading Loading @@ -153,12 +154,16 @@ void init_environment() { // Initialize metadata db // Initialize metadata db std::string metadata_path = GKFS_DATA->metadir() + "/rocksdb"s; std::string metadata_path = GKFS_DATA->metadir() + "/rocksdb"s; GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path); GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path); try { boost::leaf::try_catch( [&] { GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(metadata_path)); GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(metadata_path)); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize metadata DB: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize metadata DB: {}", __func__, e.what()); throw; throw; } }); #ifdef GKFS_ENABLE_FORWARDING #ifdef GKFS_ENABLE_FORWARDING GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); Loading @@ -178,31 +183,47 @@ void init_environment() { std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); bfs::create_directories(chunk_storage_path); bfs::create_directories(chunk_storage_path); try { boost::leaf::try_catch( [&] { GKFS_DATA->storage( GKFS_DATA->storage( std::make_shared<gkfs::data::ChunkStorage>(chunk_storage_path, gkfs::config::rpc::chunksize)); std::make_shared<gkfs::data::ChunkStorage>(chunk_storage_path, gkfs::config::rpc::chunksize)); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize storage backend: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize storage backend: {}", __func__, e.what()); throw; throw; } }); // Init margo for RPC // Init margo for RPC GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", __func__, GKFS_DATA->bind_addr()); GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", try { __func__, GKFS_DATA->bind_addr()); boost::leaf::try_catch( [&] { init_rpc_server(); init_rpc_server(); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what()); throw; throw; } }); // Init Argobots ESs to drive IO // Init Argobots ESs to drive IO try { boost::leaf::try_catch( [&] { GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__); GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__); init_io_tasklet_pool(); init_io_tasklet_pool(); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize Argobots pool for I/O: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize Argobots pool for I/O: {}", __func__, e.what()); throw; throw; } }); // TODO set metadata configurations. these have to go into a user configurable file that is parsed here // TODO set metadata configurations. these have to go into a user configurable file that is parsed here GKFS_DATA->atime_state(gkfs::config::metadata::use_atime); GKFS_DATA->atime_state(gkfs::config::metadata::use_atime); Loading @@ -212,11 +233,17 @@ void init_environment() { GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks); GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks); // Create metadentry for root directory // Create metadentry for root directory gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO}; gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO}; try { boost::leaf::try_catch( [&] { gkfs::metadata::create("/", root_md); gkfs::metadata::create("/", root_md); } catch (const std::exception& e) { }, [&](const std::exception &e) { throw runtime_error("Failed to write root metadentry to KV store: "s + e.what()); throw runtime_error("Failed to write root metadentry to KV store: "s + e.what()); } }); // setup hostfile to let clients know that a daemon is running on this host // setup hostfile to let clients know that a daemon is running on this host if (!GKFS_DATA->hosts_file().empty()) { if (!GKFS_DATA->hosts_file().empty()) { gkfs::util::populate_hosts_file(); gkfs::util::populate_hosts_file(); Loading Loading @@ -256,11 +283,16 @@ void destroy_enviroment() { if (!GKFS_DATA->hosts_file().empty()) { if (!GKFS_DATA->hosts_file().empty()) { GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__); GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__); try { boost::leaf::try_catch( [&] { gkfs::util::destroy_hosts_file(); gkfs::util::destroy_hosts_file(); } catch (const bfs::filesystem_error& e) { }, [&](const bfs::filesystem_error& e) { GKFS_DATA->spdlogger()->debug("{}() hosts file not found", __func__); GKFS_DATA->spdlogger()->debug("{}() hosts file not found", __func__); } }); } } if (RPC_DATA->server_rpc_mid() != nullptr) { if (RPC_DATA->server_rpc_mid() != nullptr) { Loading Loading @@ -457,38 +489,52 @@ int main(int argc, const char* argv[]) { return 0; return 0; } } try { boost::leaf::try_catch( [&] { po::notify(vm); po::notify(vm); } catch (po::required_option& e) { }, [&](po::required_option& e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << "Error: " << e.what() << "\n"; return 1; return 1; } }); // intitialize logging framework // intitialize logging framework initialize_loggers(); initialize_loggers(); GKFS_DATA->spdlogger(spdlog::get("main")); GKFS_DATA->spdlogger(spdlog::get("main")); // parse all input parameters and populate singleton structures // parse all input parameters and populate singleton structures try { boost::leaf::try_catch( [&] { parse_input(vm); parse_input(vm); } catch (const std::exception& e) { }, [&](const std::exception& e) { cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.", e.what()); cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.", e.what()); exit(EXIT_FAILURE); exit(EXIT_FAILURE); } }); /* /* * Initialize environment and start daemon. Wait until signaled to cancel before shutting down * Initialize environment and start daemon. Wait until signaled to cancel before shutting down */ */ try { boost::leaf::try_catch( [&] { GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); init_environment(); init_environment(); } catch (const std::exception& e) { }, [&](const std::exception& e) { auto emsg = fmt::format("Failed to initialize environment: {}", e.what()); auto emsg = fmt::format("Failed to initialize environment: {}", e.what()); GKFS_DATA->spdlogger()->error(emsg); GKFS_DATA->spdlogger()->error(emsg); cerr << emsg << endl; cerr << emsg << endl; destroy_enviroment(); destroy_enviroment(); exit(EXIT_FAILURE); exit(EXIT_FAILURE); } }); signal(SIGINT, shutdown_handler); signal(SIGINT, shutdown_handler); signal(SIGTERM, shutdown_handler); signal(SIGTERM, shutdown_handler); Loading src/daemon/handler/srv_data.cpp +70 −39 Original line number Original line Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <global/rpc/rpc_types.hpp> #include <global/rpc/rpc_types.hpp> #include <global/rpc/distributor.hpp> #include <global/rpc/distributor.hpp> #include <global/chunk_calc_util.hpp> #include <global/chunk_calc_util.hpp> #include "leaf/handle_errors.hpp" #ifdef GKFS_ENABLE_AGIOS #ifdef GKFS_ENABLE_AGIOS #include <daemon/scheduler/agios.hpp> #include <daemon/scheduler/agios.hpp> Loading Loading @@ -213,15 +214,22 @@ hg_return_t rpc_srv_write(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; chnk_size_left_host -= transfer_size; } } try { // start tasklet for writing chunk int err = boost::leaf::try_catch( [&]() -> int { chunk_op.write_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chunk_op.write_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); } catch (const gkfs::data::ChunkWriteOpException& e) { return 0; }, [&](const gkfs::data::ChunkWriteOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); return -1; } }); if(err == -1) return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); // next chunk // next chunk chnk_id_curr++; chnk_id_curr++; Loading Loading @@ -403,17 +411,26 @@ hg_return_t rpc_srv_read(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; chnk_size_left_host -= transfer_size; } } try { bool err = boost::leaf::try_catch( [&]() -> bool { // start tasklet for read operation // start tasklet for read operation chunk_read_op.read_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chunk_read_op.read_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); } catch (const gkfs::data::ChunkReadOpException& e) { return false; }, [&](const gkfs::data::ChunkReadOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); return true; } }); if(err) return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); chnk_id_curr++; chnk_id_curr++; } } // Sanity check that all chunks where detected in previous loop // Sanity check that all chunks where detected in previous loop // TODO error out. If we continue this will crash the server when sending results back that don't exist. // TODO error out. If we continue this will crash the server when sending results back that don't exist. if (chnk_size_left_host != 0) if (chnk_size_left_host != 0) Loading Loading @@ -461,14 +478,22 @@ hg_return_t rpc_srv_truncate(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); gkfs::data::ChunkTruncateOperation chunk_op{in.path}; gkfs::data::ChunkTruncateOperation chunk_op{in.path}; try { int err = boost::leaf::try_catch( [&]() -> int { // start tasklet for truncate operation // start tasklet for truncate operation chunk_op.truncate(in.length); chunk_op.truncate(in.length); } catch (const gkfs::data::ChunkMetaOpException& e) { return 0; }, [&] (const gkfs::data::ChunkMetaOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out); return -1; } }); if (err == -1) return gkfs::rpc::cleanup_respond(&handle, &in, &out); // wait and get output // wait and get output out.err = chunk_op.wait_for_task(); out.err = chunk_op.wait_for_task(); Loading @@ -487,19 +512,25 @@ hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() enter", __func__); GKFS_DATA->spdlogger()->debug("{}() enter", __func__); rpc_chunk_stat_out_t out{}; rpc_chunk_stat_out_t out{}; out.err = EIO; out.err = EIO; try { boost::leaf::try_catch( [&] { auto chk_stat = GKFS_DATA->storage()->chunk_stat(); auto chk_stat = GKFS_DATA->storage()->chunk_stat(); out.chunk_size = chk_stat.chunk_size; out.chunk_size = chk_stat.chunk_size; out.chunk_total = chk_stat.chunk_total; out.chunk_total = chk_stat.chunk_total; out.chunk_free = chk_stat.chunk_free; out.chunk_free = chk_stat.chunk_free; out.err = 0; out.err = 0; } catch (const gkfs::data::ChunkStorageException& err) { }, [&](const gkfs::data::ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); out.err = err.code().value(); out.err = err.code().value(); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error when chunk stat '{}'", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() Unexpected error when chunk stat '{}'", __func__, err.what()); out.err = EAGAIN; out.err = EAGAIN; } }); // Create output and send it back // Create output and send it back return gkfs::rpc::cleanup_respond(&handle, &out); return gkfs::rpc::cleanup_respond(&handle, &out); Loading src/daemon/handler/srv_metadata.cpp +118 −95 Original line number Original line Diff line number Diff line Loading @@ -20,6 +20,15 @@ #include <global/rpc/rpc_types.hpp> #include <global/rpc/rpc_types.hpp> #include <daemon/backend/data/chunk_storage.hpp> #include <daemon/backend/data/chunk_storage.hpp> #include "leaf/exception.hpp" #include "leaf/handle_errors.hpp" #include "leaf/pred.hpp" #include "leaf/on_error.hpp" #include "leaf/common.hpp" #include "leaf/result.hpp" #include "leaf/error.hpp" using namespace std; using namespace std; /* /* Loading @@ -38,14 +47,20 @@ hg_return_t rpc_srv_create(hg_handle_t handle) { assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS); GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); gkfs::metadata::Metadata md(in.mode); gkfs::metadata::Metadata md(in.mode); try { boost::leaf::try_catch( [&] { // create metadentry // create metadentry gkfs::metadata::create(in.path, md); gkfs::metadata::create(in.path, md); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&](std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); out.err = -1; out.err = -1; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { if (hret != HG_SUCCESS) { Loading @@ -69,19 +84,21 @@ hg_return_t rpc_srv_stat(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() path: '{}'", __func__, in.path); std::string val; std::string val; try { boost::leaf::try_catch( [&]{ // get the metadata // get the metadata val = gkfs::metadata::get_str(in.path); val = gkfs::metadata::get_str(in.path); out.db_val = val.c_str(); out.db_val = val.c_str(); out.err = 0; out.err = 0; GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__, out.db_val); GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__, out.db_val); } catch (const gkfs::metadata::NotFoundException& e) { },[&](gkfs::metadata::NotFoundException& e){ GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; out.err = ENOENT; } catch (const std::exception& e) { },[&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry from DB: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry from DB: '{}'", __func__, e.what()); out.err = EBUSY; out.err = EBUSY; } }); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { if (hret != HG_SUCCESS) { Loading @@ -107,13 +124,14 @@ hg_return_t rpc_srv_decr_size(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); try { boost::leaf::try_catch( [&]{ GKFS_DATA->mdb()->decrease_size(in.path, in.length); GKFS_DATA->mdb()->decrease_size(in.path, in.length); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&] (std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'", __func__, e.what()); out.err = EIO; out.err = EIO; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -139,20 +157,21 @@ hg_return_t rpc_srv_remove(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); // Remove metadentry if exists on the node and remove all chunks for that file // Remove metadentry if exists on the node and remove all chunks for that file try { boost::leaf::try_catch( [&]{ gkfs::metadata::remove(in.path); gkfs::metadata::remove(in.path); out.err = 0; out.err = 0; } catch (const gkfs::metadata::DBException& e) { }, [&] (gkfs::metadata::DBException& e) { GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__, in.path, e.what()); GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__, in.path, e.what()); out.err = EIO; out.err = EIO; } catch (const gkfs::data::ChunkStorageException& e) { }, [&] (gkfs::data::ChunkStorageException& e) { GKFS_DATA->spdlogger()->error("{}(): path '{}' errcode '{}' message '{}'", __func__, in.path, e.code().value(), GKFS_DATA->spdlogger()->error("{}(): path '{}' errcode '{}' message '{}'", __func__, in.path, e.code().value(), e.what()); e.what()); out.err = e.code().value(); out.err = e.code().value(); } catch (const std::exception& e) { }, [&] (std::exception& e) { GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__, in.path, e.what()); GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__, in.path, e.what()); out.err = EBUSY; out.err = EBUSY; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -178,7 +197,8 @@ hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() Got update metadentry RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got update metadentry RPC with path '{}'", __func__, in.path); // do update // do update try { boost::leaf::try_catch( [&]{ gkfs::metadata::Metadata md = gkfs::metadata::get(in.path); gkfs::metadata::Metadata md = gkfs::metadata::get(in.path); if (in.block_flag == HG_TRUE) if (in.block_flag == HG_TRUE) md.blocks(in.blocks); md.blocks(in.blocks); Loading @@ -194,11 +214,11 @@ hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { md.ctime(in.ctime); md.ctime(in.ctime); gkfs::metadata::update(in.path, md); gkfs::metadata::update(in.path, md); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&] (std::exception& e) { //TODO handle NotFoundException //TODO handle NotFoundException GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__); GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__); out.err = 1; out.err = 1; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -225,19 +245,20 @@ hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.size, GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.size, in.offset, in.append); in.offset, in.append); try { boost::leaf::try_catch( [&]{ gkfs::metadata::update_size(in.path, in.size, in.offset, (in.append == HG_TRUE)); gkfs::metadata::update_size(in.path, in.size, in.offset, (in.append == HG_TRUE)); out.err = 0; out.err = 0; //TODO the actual size of the file could be different after the size update //TODO the actual size of the file could be different after the size update // do to concurrency on size // do to concurrency on size out.ret_size = in.size + in.offset; out.ret_size = in.size + in.offset; } catch (const gkfs::metadata::NotFoundException& e) { }, [&](gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; out.err = ENOENT; } catch (const std::exception& e) { }, [&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to update metadentry size on DB: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to update metadentry size on DB: '{}'", __func__, e.what()); out.err = EBUSY; out.err = EBUSY; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -264,16 +285,17 @@ hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() Got update metadentry size RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got update metadentry size RPC with path '{}'", __func__, in.path); // do update // do update try { boost::leaf::try_catch( [&]{ out.ret_size = gkfs::metadata::get_size(in.path); out.ret_size = gkfs::metadata::get_size(in.path); out.err = 0; out.err = 0; } catch (const gkfs::metadata::NotFoundException& e) { }, [&](gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; out.err = ENOENT; } catch (const std::exception& e) { }, [&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry size from DB: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry size from DB: '{}'", __func__, e.what()); out.err = EBUSY; out.err = EBUSY; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading Loading @@ -419,15 +441,16 @@ hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { } } GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); try { boost::leaf::try_catch( [&]{ gkfs::metadata::Metadata md = {gkfs::metadata::LINK_MODE, in.target_path}; gkfs::metadata::Metadata md = {gkfs::metadata::LINK_MODE, in.target_path}; // create metadentry // create metadentry gkfs::metadata::create(in.path, md); gkfs::metadata::create(in.path, md); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); out.err = -1; out.err = -1; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { if (hret != HG_SUCCESS) { Loading src/daemon/ops/data.cpp +64 −39 Original line number Original line Diff line number Diff line Loading @@ -15,6 +15,7 @@ #include <daemon/backend/data/chunk_storage.hpp> #include <daemon/backend/data/chunk_storage.hpp> #include <global/chunk_calc_util.hpp> #include <global/chunk_calc_util.hpp> #include <utility> #include <utility> #include "leaf/handle_errors.hpp" extern "C" { extern "C" { #include <mercury_types.h> #include <mercury_types.h> Loading Loading @@ -45,7 +46,10 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { const string& path = *(arg->path); const string& path = *(arg->path); const size_t size = arg->size; const size_t size = arg->size; int err_response = 0; int err_response = 0; try { boost::leaf::try_catch( [&] { // get chunk from where to cut off // get chunk from where to cut off auto chunk_id_start = gkfs::util::chnk_id_for_offset(size, gkfs::config::rpc::chunksize); auto chunk_id_start = gkfs::util::chnk_id_for_offset(size, gkfs::config::rpc::chunksize); // do not last delete chunk if it is in the middle of a chunk // do not last delete chunk if it is in the middle of a chunk Loading @@ -55,14 +59,19 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { chunk_id_start++; chunk_id_start++; } } GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start); GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start); } catch (const ChunkStorageException& err) { }, [&](const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); err_response = err.code().value(); err_response = err.code().value(); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error truncating file '{}' to length '{}'", __func__, path, GKFS_DATA->spdlogger()->error("{}() Unexpected error truncating file '{}' to length '{}'", __func__, path, size); size); err_response = EIO; err_response = EIO; } }); ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response)); ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response)); } } Loading Loading @@ -144,16 +153,24 @@ void ChunkWriteOperation::write_file_abt(void* _arg) { auto* arg = static_cast<struct chunk_write_args*>(_arg); auto* arg = static_cast<struct chunk_write_args*>(_arg); const string& path = *(arg->path); const string& path = *(arg->path); ssize_t wrote{0}; ssize_t wrote{0}; try { boost::leaf::try_catch( [&] { wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); } catch (const ChunkStorageException& err) { }, [&](const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); wrote = -(err.code().value()); wrote = -(err.code().value()); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, path); path); wrote = -EIO; wrote = -EIO; } }); ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote)); ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote)); } } Loading Loading @@ -267,17 +284,25 @@ void ChunkReadOperation::read_file_abt(void* _arg) { auto* arg = static_cast<struct chunk_read_args*>(_arg); auto* arg = static_cast<struct chunk_read_args*>(_arg); const string& path = *(arg->path); const string& path = *(arg->path); ssize_t read = 0; ssize_t read = 0; try { boost::leaf::try_catch( [&] { // Under expected circumstances (error or no error) read_chunk will signal the eventual // Under expected circumstances (error or no error) read_chunk will signal the eventual read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); } catch (const ChunkStorageException& err) { }, [&](const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); read = -(err.code().value()); read = -(err.code().value()); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, path); path); read = -EIO; read = -EIO; } }); ABT_eventual_set(arg->eventual, &read, sizeof(read)); ABT_eventual_set(arg->eventual, &read, sizeof(read)); } } Loading Loading
src/daemon/backend/data/chunk_storage.cpp +15 −9 Original line number Original line Diff line number Diff line Loading @@ -20,6 +20,8 @@ #include <boost/filesystem.hpp> #include <boost/filesystem.hpp> #include <spdlog/spdlog.h> #include <spdlog/spdlog.h> #include "leaf/handle_errors.hpp" extern "C" { extern "C" { #include <sys/statfs.h> #include <sys/statfs.h> Loading Loading @@ -98,15 +100,19 @@ ChunkStorage::ChunkStorage(string& path, const size_t chunksize) : */ */ void ChunkStorage::destroy_chunk_space(const string& file_path) const { void ChunkStorage::destroy_chunk_space(const string& file_path) const { auto chunk_dir = absolute(get_chunks_dir(file_path)); auto chunk_dir = absolute(get_chunks_dir(file_path)); try { boost::leaf::try_catch( [&] { // Note: remove_all does not throw an error when path doesn't exist. // Note: remove_all does not throw an error when path doesn't exist. auto n = bfs::remove_all(chunk_dir); auto n = bfs::remove_all(chunk_dir); log_->debug("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); log_->debug("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); } catch (const bfs::filesystem_error& e) { }, [&](const bfs::filesystem_error& e) { auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, chunk_dir, e.what()); chunk_dir, e.what()); throw ChunkStorageException(e.code().value(), err_str); throw ChunkStorageException(e.code().value(), err_str); } }); } } /** /** Loading
src/daemon/daemon.cpp +105 −59 Original line number Original line Diff line number Diff line Loading @@ -37,6 +37,7 @@ #include <fstream> #include <fstream> #include <csignal> #include <csignal> #include <condition_variable> #include <condition_variable> #include "leaf/handle_errors.hpp" extern "C" { extern "C" { #include <unistd.h> #include <unistd.h> Loading Loading @@ -153,12 +154,16 @@ void init_environment() { // Initialize metadata db // Initialize metadata db std::string metadata_path = GKFS_DATA->metadir() + "/rocksdb"s; std::string metadata_path = GKFS_DATA->metadir() + "/rocksdb"s; GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path); GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path); try { boost::leaf::try_catch( [&] { GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(metadata_path)); GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(metadata_path)); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize metadata DB: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize metadata DB: {}", __func__, e.what()); throw; throw; } }); #ifdef GKFS_ENABLE_FORWARDING #ifdef GKFS_ENABLE_FORWARDING GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__); Loading @@ -178,31 +183,47 @@ void init_environment() { std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; std::string chunk_storage_path = GKFS_DATA->rootdir() + "/data/chunks"s; GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); bfs::create_directories(chunk_storage_path); bfs::create_directories(chunk_storage_path); try { boost::leaf::try_catch( [&] { GKFS_DATA->storage( GKFS_DATA->storage( std::make_shared<gkfs::data::ChunkStorage>(chunk_storage_path, gkfs::config::rpc::chunksize)); std::make_shared<gkfs::data::ChunkStorage>(chunk_storage_path, gkfs::config::rpc::chunksize)); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize storage backend: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize storage backend: {}", __func__, e.what()); throw; throw; } }); // Init margo for RPC // Init margo for RPC GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", __func__, GKFS_DATA->bind_addr()); GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", try { __func__, GKFS_DATA->bind_addr()); boost::leaf::try_catch( [&] { init_rpc_server(); init_rpc_server(); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what()); throw; throw; } }); // Init Argobots ESs to drive IO // Init Argobots ESs to drive IO try { boost::leaf::try_catch( [&] { GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__); GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__); init_io_tasklet_pool(); init_io_tasklet_pool(); } catch (const std::exception& e) { }, [&](const std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to initialize Argobots pool for I/O: {}", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to initialize Argobots pool for I/O: {}", __func__, e.what()); throw; throw; } }); // TODO set metadata configurations. these have to go into a user configurable file that is parsed here // TODO set metadata configurations. these have to go into a user configurable file that is parsed here GKFS_DATA->atime_state(gkfs::config::metadata::use_atime); GKFS_DATA->atime_state(gkfs::config::metadata::use_atime); Loading @@ -212,11 +233,17 @@ void init_environment() { GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks); GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks); // Create metadentry for root directory // Create metadentry for root directory gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO}; gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO}; try { boost::leaf::try_catch( [&] { gkfs::metadata::create("/", root_md); gkfs::metadata::create("/", root_md); } catch (const std::exception& e) { }, [&](const std::exception &e) { throw runtime_error("Failed to write root metadentry to KV store: "s + e.what()); throw runtime_error("Failed to write root metadentry to KV store: "s + e.what()); } }); // setup hostfile to let clients know that a daemon is running on this host // setup hostfile to let clients know that a daemon is running on this host if (!GKFS_DATA->hosts_file().empty()) { if (!GKFS_DATA->hosts_file().empty()) { gkfs::util::populate_hosts_file(); gkfs::util::populate_hosts_file(); Loading Loading @@ -256,11 +283,16 @@ void destroy_enviroment() { if (!GKFS_DATA->hosts_file().empty()) { if (!GKFS_DATA->hosts_file().empty()) { GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__); GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__); try { boost::leaf::try_catch( [&] { gkfs::util::destroy_hosts_file(); gkfs::util::destroy_hosts_file(); } catch (const bfs::filesystem_error& e) { }, [&](const bfs::filesystem_error& e) { GKFS_DATA->spdlogger()->debug("{}() hosts file not found", __func__); GKFS_DATA->spdlogger()->debug("{}() hosts file not found", __func__); } }); } } if (RPC_DATA->server_rpc_mid() != nullptr) { if (RPC_DATA->server_rpc_mid() != nullptr) { Loading Loading @@ -457,38 +489,52 @@ int main(int argc, const char* argv[]) { return 0; return 0; } } try { boost::leaf::try_catch( [&] { po::notify(vm); po::notify(vm); } catch (po::required_option& e) { }, [&](po::required_option& e) { std::cerr << "Error: " << e.what() << "\n"; std::cerr << "Error: " << e.what() << "\n"; return 1; return 1; } }); // intitialize logging framework // intitialize logging framework initialize_loggers(); initialize_loggers(); GKFS_DATA->spdlogger(spdlog::get("main")); GKFS_DATA->spdlogger(spdlog::get("main")); // parse all input parameters and populate singleton structures // parse all input parameters and populate singleton structures try { boost::leaf::try_catch( [&] { parse_input(vm); parse_input(vm); } catch (const std::exception& e) { }, [&](const std::exception& e) { cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.", e.what()); cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.", e.what()); exit(EXIT_FAILURE); exit(EXIT_FAILURE); } }); /* /* * Initialize environment and start daemon. Wait until signaled to cancel before shutting down * Initialize environment and start daemon. Wait until signaled to cancel before shutting down */ */ try { boost::leaf::try_catch( [&] { GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); init_environment(); init_environment(); } catch (const std::exception& e) { }, [&](const std::exception& e) { auto emsg = fmt::format("Failed to initialize environment: {}", e.what()); auto emsg = fmt::format("Failed to initialize environment: {}", e.what()); GKFS_DATA->spdlogger()->error(emsg); GKFS_DATA->spdlogger()->error(emsg); cerr << emsg << endl; cerr << emsg << endl; destroy_enviroment(); destroy_enviroment(); exit(EXIT_FAILURE); exit(EXIT_FAILURE); } }); signal(SIGINT, shutdown_handler); signal(SIGINT, shutdown_handler); signal(SIGTERM, shutdown_handler); signal(SIGTERM, shutdown_handler); Loading
src/daemon/handler/srv_data.cpp +70 −39 Original line number Original line Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <global/rpc/rpc_types.hpp> #include <global/rpc/rpc_types.hpp> #include <global/rpc/distributor.hpp> #include <global/rpc/distributor.hpp> #include <global/chunk_calc_util.hpp> #include <global/chunk_calc_util.hpp> #include "leaf/handle_errors.hpp" #ifdef GKFS_ENABLE_AGIOS #ifdef GKFS_ENABLE_AGIOS #include <daemon/scheduler/agios.hpp> #include <daemon/scheduler/agios.hpp> Loading Loading @@ -213,15 +214,22 @@ hg_return_t rpc_srv_write(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; chnk_size_left_host -= transfer_size; } } try { // start tasklet for writing chunk int err = boost::leaf::try_catch( [&]() -> int { chunk_op.write_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chunk_op.write_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); } catch (const gkfs::data::ChunkWriteOpException& e) { return 0; }, [&](const gkfs::data::ChunkWriteOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); return -1; } }); if(err == -1) return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); // next chunk // next chunk chnk_id_curr++; chnk_id_curr++; Loading Loading @@ -403,17 +411,26 @@ hg_return_t rpc_srv_read(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; chnk_size_left_host -= transfer_size; } } try { bool err = boost::leaf::try_catch( [&]() -> bool { // start tasklet for read operation // start tasklet for read operation chunk_read_op.read_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chunk_read_op.read_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); } catch (const gkfs::data::ChunkReadOpException& e) { return false; }, [&](const gkfs::data::ChunkReadOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); return true; } }); if(err) return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); chnk_id_curr++; chnk_id_curr++; } } // Sanity check that all chunks where detected in previous loop // Sanity check that all chunks where detected in previous loop // TODO error out. If we continue this will crash the server when sending results back that don't exist. // TODO error out. If we continue this will crash the server when sending results back that don't exist. if (chnk_size_left_host != 0) if (chnk_size_left_host != 0) Loading Loading @@ -461,14 +478,22 @@ hg_return_t rpc_srv_truncate(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); gkfs::data::ChunkTruncateOperation chunk_op{in.path}; gkfs::data::ChunkTruncateOperation chunk_op{in.path}; try { int err = boost::leaf::try_catch( [&]() -> int { // start tasklet for truncate operation // start tasklet for truncate operation chunk_op.truncate(in.length); chunk_op.truncate(in.length); } catch (const gkfs::data::ChunkMetaOpException& e) { return 0; }, [&] (const gkfs::data::ChunkMetaOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out); return -1; } }); if (err == -1) return gkfs::rpc::cleanup_respond(&handle, &in, &out); // wait and get output // wait and get output out.err = chunk_op.wait_for_task(); out.err = chunk_op.wait_for_task(); Loading @@ -487,19 +512,25 @@ hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() enter", __func__); GKFS_DATA->spdlogger()->debug("{}() enter", __func__); rpc_chunk_stat_out_t out{}; rpc_chunk_stat_out_t out{}; out.err = EIO; out.err = EIO; try { boost::leaf::try_catch( [&] { auto chk_stat = GKFS_DATA->storage()->chunk_stat(); auto chk_stat = GKFS_DATA->storage()->chunk_stat(); out.chunk_size = chk_stat.chunk_size; out.chunk_size = chk_stat.chunk_size; out.chunk_total = chk_stat.chunk_total; out.chunk_total = chk_stat.chunk_total; out.chunk_free = chk_stat.chunk_free; out.chunk_free = chk_stat.chunk_free; out.err = 0; out.err = 0; } catch (const gkfs::data::ChunkStorageException& err) { }, [&](const gkfs::data::ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); out.err = err.code().value(); out.err = err.code().value(); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error when chunk stat '{}'", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() Unexpected error when chunk stat '{}'", __func__, err.what()); out.err = EAGAIN; out.err = EAGAIN; } }); // Create output and send it back // Create output and send it back return gkfs::rpc::cleanup_respond(&handle, &out); return gkfs::rpc::cleanup_respond(&handle, &out); Loading
src/daemon/handler/srv_metadata.cpp +118 −95 Original line number Original line Diff line number Diff line Loading @@ -20,6 +20,15 @@ #include <global/rpc/rpc_types.hpp> #include <global/rpc/rpc_types.hpp> #include <daemon/backend/data/chunk_storage.hpp> #include <daemon/backend/data/chunk_storage.hpp> #include "leaf/exception.hpp" #include "leaf/handle_errors.hpp" #include "leaf/pred.hpp" #include "leaf/on_error.hpp" #include "leaf/common.hpp" #include "leaf/result.hpp" #include "leaf/error.hpp" using namespace std; using namespace std; /* /* Loading @@ -38,14 +47,20 @@ hg_return_t rpc_srv_create(hg_handle_t handle) { assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS); GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); gkfs::metadata::Metadata md(in.mode); gkfs::metadata::Metadata md(in.mode); try { boost::leaf::try_catch( [&] { // create metadentry // create metadentry gkfs::metadata::create(in.path, md); gkfs::metadata::create(in.path, md); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&](std::exception &e) { GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); out.err = -1; out.err = -1; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { if (hret != HG_SUCCESS) { Loading @@ -69,19 +84,21 @@ hg_return_t rpc_srv_stat(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() path: '{}'", __func__, in.path); std::string val; std::string val; try { boost::leaf::try_catch( [&]{ // get the metadata // get the metadata val = gkfs::metadata::get_str(in.path); val = gkfs::metadata::get_str(in.path); out.db_val = val.c_str(); out.db_val = val.c_str(); out.err = 0; out.err = 0; GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__, out.db_val); GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__, out.db_val); } catch (const gkfs::metadata::NotFoundException& e) { },[&](gkfs::metadata::NotFoundException& e){ GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; out.err = ENOENT; } catch (const std::exception& e) { },[&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry from DB: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry from DB: '{}'", __func__, e.what()); out.err = EBUSY; out.err = EBUSY; } }); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { if (hret != HG_SUCCESS) { Loading @@ -107,13 +124,14 @@ hg_return_t rpc_srv_decr_size(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); try { boost::leaf::try_catch( [&]{ GKFS_DATA->mdb()->decrease_size(in.path, in.length); GKFS_DATA->mdb()->decrease_size(in.path, in.length); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&] (std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'", __func__, e.what()); out.err = EIO; out.err = EIO; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -139,20 +157,21 @@ hg_return_t rpc_srv_remove(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); // Remove metadentry if exists on the node and remove all chunks for that file // Remove metadentry if exists on the node and remove all chunks for that file try { boost::leaf::try_catch( [&]{ gkfs::metadata::remove(in.path); gkfs::metadata::remove(in.path); out.err = 0; out.err = 0; } catch (const gkfs::metadata::DBException& e) { }, [&] (gkfs::metadata::DBException& e) { GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__, in.path, e.what()); GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__, in.path, e.what()); out.err = EIO; out.err = EIO; } catch (const gkfs::data::ChunkStorageException& e) { }, [&] (gkfs::data::ChunkStorageException& e) { GKFS_DATA->spdlogger()->error("{}(): path '{}' errcode '{}' message '{}'", __func__, in.path, e.code().value(), GKFS_DATA->spdlogger()->error("{}(): path '{}' errcode '{}' message '{}'", __func__, in.path, e.code().value(), e.what()); e.what()); out.err = e.code().value(); out.err = e.code().value(); } catch (const std::exception& e) { }, [&] (std::exception& e) { GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__, in.path, e.what()); GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__, in.path, e.what()); out.err = EBUSY; out.err = EBUSY; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -178,7 +197,8 @@ hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() Got update metadentry RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got update metadentry RPC with path '{}'", __func__, in.path); // do update // do update try { boost::leaf::try_catch( [&]{ gkfs::metadata::Metadata md = gkfs::metadata::get(in.path); gkfs::metadata::Metadata md = gkfs::metadata::get(in.path); if (in.block_flag == HG_TRUE) if (in.block_flag == HG_TRUE) md.blocks(in.blocks); md.blocks(in.blocks); Loading @@ -194,11 +214,11 @@ hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { md.ctime(in.ctime); md.ctime(in.ctime); gkfs::metadata::update(in.path, md); gkfs::metadata::update(in.path, md); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&] (std::exception& e) { //TODO handle NotFoundException //TODO handle NotFoundException GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__); GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__); out.err = 1; out.err = 1; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -225,19 +245,20 @@ hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.size, GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.size, in.offset, in.append); in.offset, in.append); try { boost::leaf::try_catch( [&]{ gkfs::metadata::update_size(in.path, in.size, in.offset, (in.append == HG_TRUE)); gkfs::metadata::update_size(in.path, in.size, in.offset, (in.append == HG_TRUE)); out.err = 0; out.err = 0; //TODO the actual size of the file could be different after the size update //TODO the actual size of the file could be different after the size update // do to concurrency on size // do to concurrency on size out.ret_size = in.size + in.offset; out.ret_size = in.size + in.offset; } catch (const gkfs::metadata::NotFoundException& e) { }, [&](gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; out.err = ENOENT; } catch (const std::exception& e) { }, [&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to update metadentry size on DB: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to update metadentry size on DB: '{}'", __func__, e.what()); out.err = EBUSY; out.err = EBUSY; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading @@ -264,16 +285,17 @@ hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { GKFS_DATA->spdlogger()->debug("{}() Got update metadentry size RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got update metadentry size RPC with path '{}'", __func__, in.path); // do update // do update try { boost::leaf::try_catch( [&]{ out.ret_size = gkfs::metadata::get_size(in.path); out.ret_size = gkfs::metadata::get_size(in.path); out.err = 0; out.err = 0; } catch (const gkfs::metadata::NotFoundException& e) { }, [&](gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; out.err = ENOENT; } catch (const std::exception& e) { }, [&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry size from DB: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to get metadentry size from DB: '{}'", __func__, e.what()); out.err = EBUSY; out.err = EBUSY; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); Loading Loading @@ -419,15 +441,16 @@ hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { } } GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__, in.path); try { boost::leaf::try_catch( [&]{ gkfs::metadata::Metadata md = {gkfs::metadata::LINK_MODE, in.target_path}; gkfs::metadata::Metadata md = {gkfs::metadata::LINK_MODE, in.target_path}; // create metadentry // create metadentry gkfs::metadata::create(in.path, md); gkfs::metadata::create(in.path, md); out.err = 0; out.err = 0; } catch (const std::exception& e) { }, [&](std::exception& e) { GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); out.err = -1; out.err = -1; } }); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { if (hret != HG_SUCCESS) { Loading
src/daemon/ops/data.cpp +64 −39 Original line number Original line Diff line number Diff line Loading @@ -15,6 +15,7 @@ #include <daemon/backend/data/chunk_storage.hpp> #include <daemon/backend/data/chunk_storage.hpp> #include <global/chunk_calc_util.hpp> #include <global/chunk_calc_util.hpp> #include <utility> #include <utility> #include "leaf/handle_errors.hpp" extern "C" { extern "C" { #include <mercury_types.h> #include <mercury_types.h> Loading Loading @@ -45,7 +46,10 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { const string& path = *(arg->path); const string& path = *(arg->path); const size_t size = arg->size; const size_t size = arg->size; int err_response = 0; int err_response = 0; try { boost::leaf::try_catch( [&] { // get chunk from where to cut off // get chunk from where to cut off auto chunk_id_start = gkfs::util::chnk_id_for_offset(size, gkfs::config::rpc::chunksize); auto chunk_id_start = gkfs::util::chnk_id_for_offset(size, gkfs::config::rpc::chunksize); // do not last delete chunk if it is in the middle of a chunk // do not last delete chunk if it is in the middle of a chunk Loading @@ -55,14 +59,19 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { chunk_id_start++; chunk_id_start++; } } GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start); GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start); } catch (const ChunkStorageException& err) { }, [&](const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); err_response = err.code().value(); err_response = err.code().value(); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error truncating file '{}' to length '{}'", __func__, path, GKFS_DATA->spdlogger()->error("{}() Unexpected error truncating file '{}' to length '{}'", __func__, path, size); size); err_response = EIO; err_response = EIO; } }); ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response)); ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response)); } } Loading Loading @@ -144,16 +153,24 @@ void ChunkWriteOperation::write_file_abt(void* _arg) { auto* arg = static_cast<struct chunk_write_args*>(_arg); auto* arg = static_cast<struct chunk_write_args*>(_arg); const string& path = *(arg->path); const string& path = *(arg->path); ssize_t wrote{0}; ssize_t wrote{0}; try { boost::leaf::try_catch( [&] { wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); } catch (const ChunkStorageException& err) { }, [&](const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); wrote = -(err.code().value()); wrote = -(err.code().value()); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, path); path); wrote = -EIO; wrote = -EIO; } }); ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote)); ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote)); } } Loading Loading @@ -267,17 +284,25 @@ void ChunkReadOperation::read_file_abt(void* _arg) { auto* arg = static_cast<struct chunk_read_args*>(_arg); auto* arg = static_cast<struct chunk_read_args*>(_arg); const string& path = *(arg->path); const string& path = *(arg->path); ssize_t read = 0; ssize_t read = 0; try { boost::leaf::try_catch( [&] { // Under expected circumstances (error or no error) read_chunk will signal the eventual // Under expected circumstances (error or no error) read_chunk will signal the eventual read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); } catch (const ChunkStorageException& err) { }, [&](const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); read = -(err.code().value()); read = -(err.code().value()); } catch (const ::exception& err) { }, [&](const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, path); path); read = -EIO; read = -EIO; } }); ABT_eventual_set(arg->eventual, &read, sizeof(read)); ABT_eventual_set(arg->eventual, &read, sizeof(read)); } } Loading