Loading src/scord/adhoc_storage_manager.hpp +10 −8 Original line number Diff line number Diff line Loading @@ -113,8 +113,8 @@ struct adhoc_storage_manager { if(const auto it = m_adhoc_storages.find(id); it != m_adhoc_storages.end()) { const auto current_adhoc_info = it->second; current_adhoc_info->update(std::move(new_resources)); const auto adhoc_metadata_ptr = it->second; adhoc_metadata_ptr->update(std::move(new_resources)); return scord::error_code::success; } Loading Loading @@ -156,12 +156,14 @@ struct adhoc_storage_manager { } scord::error_code add_client_info(std::uint64_t adhoc_id, std::shared_ptr<scord::internal::job_metadata> job_info) { add_client_info( std::uint64_t adhoc_id, std::shared_ptr<scord::internal::job_metadata> job_metadata_ptr) { if(auto am_result = find(adhoc_id); am_result.has_value()) { const auto adhoc_storage_info = am_result.value(); return adhoc_storage_info->add_client_info(std::move(job_info)); const auto adhoc_metadata_ptr = am_result.value(); return adhoc_metadata_ptr->add_client_info( std::move(job_metadata_ptr)); } return scord::error_code::no_such_entity; Loading @@ -170,8 +172,8 @@ struct adhoc_storage_manager { scord::error_code remove_client_info(std::uint64_t adhoc_id) { if(auto am_result = find(adhoc_id); am_result.has_value()) { const auto adhoc_storage_info = *am_result; adhoc_storage_info->remove_client_info(); const auto adhoc_metadata_ptr = *am_result; adhoc_metadata_ptr->remove_client_info(); return scord::error_code::success; } Loading src/scord/internal_types.cpp +5 −5 Original line number Diff line number Diff line Loading @@ -78,9 +78,9 @@ adhoc_storage_metadata::update(scord::adhoc_storage::resources new_resources) { scord::error_code adhoc_storage_metadata::add_client_info( std::shared_ptr<scord::internal::job_metadata> job_info) { std::shared_ptr<scord::internal::job_metadata> job_metadata_ptr) { scord::abt::unique_lock lock(m_info_mutex); scord::abt::unique_lock lock(m_mutex); if(m_client_info) { LOGGER_ERROR("adhoc storage {} already has a client", Loading @@ -88,20 +88,20 @@ adhoc_storage_metadata::add_client_info( return error_code::adhoc_in_use; } m_client_info = std::move(job_info); m_client_info = std::move(job_metadata_ptr); return error_code::success; } void adhoc_storage_metadata::remove_client_info() { scord::abt::unique_lock lock(m_info_mutex); scord::abt::unique_lock lock(m_mutex); m_client_info.reset(); } std::shared_ptr<scord::internal::job_metadata> adhoc_storage_metadata::client_info() const { scord::abt::shared_lock lock(m_info_mutex); scord::abt::shared_lock lock(m_mutex); return m_client_info; } Loading src/scord/internal_types.hpp +3 −2 Original line number Diff line number Diff line Loading @@ -74,7 +74,8 @@ struct adhoc_storage_metadata { update(scord::adhoc_storage::resources new_resources); scord::error_code add_client_info(std::shared_ptr<scord::internal::job_metadata> job_info); add_client_info( std::shared_ptr<scord::internal::job_metadata> job_metadata_ptr); void remove_client_info(); Loading @@ -85,7 +86,7 @@ struct adhoc_storage_metadata { std::string m_uuid; scord::adhoc_storage m_adhoc_storage; std::shared_ptr<scord::internal::job_metadata> m_client_info; mutable scord::abt::shared_mutex m_info_mutex; mutable scord::abt::shared_mutex m_mutex; }; struct pfs_storage_metadata { Loading src/scord/pfs_storage_manager.hpp +2 −2 Original line number Diff line number Diff line Loading @@ -73,8 +73,8 @@ struct pfs_storage_manager { if(const auto it = m_pfs_storages.find(id); it != m_pfs_storages.end()) { const auto current_pfs_info = it->second; current_pfs_info->update(std::move(new_ctx)); const auto pfs_metadata_ptr = it->second; pfs_metadata_ptr->update(std::move(new_ctx)); return scord::error_code::success; } Loading src/scord/rpc_server.cpp +27 −25 Original line number Diff line number Diff line Loading @@ -105,13 +105,13 @@ rpc_server::query(const network::request& req, slurm_job_id job_id) { .or_else([&](auto&& ec) { LOGGER_ERROR("Error retrieving job metadata: {}", ec); }) .and_then([&](auto&& job_metadata) .and_then([&](auto&& job_metadata_ptr) -> tl::expected<job_info, error_code> { if(!job_metadata->resources()) { if(!job_metadata_ptr->resources()) { return tl::make_unexpected( error_code::no_resources); } return job_info{job_metadata->io_procs()}; return job_info{job_metadata_ptr->io_procs()}; }); const response_type resp = Loading Loading @@ -148,20 +148,20 @@ rpc_server::register_job(const network::request& req, m_job_manager.create(slurm_id, job_resources, job_requirements); jm_result.has_value()) { const auto& job_metadata = jm_result.value(); const auto& job_metadata_ptr = jm_result.value(); // if the job requires an adhoc storage instance, inform the appropriate // adhoc_storage instance (if registered) if(job_requirements.adhoc_storage()) { const auto adhoc_id = job_requirements.adhoc_storage()->id(); ec = m_adhoc_manager.add_client_info(adhoc_id, job_metadata); ec = m_adhoc_manager.add_client_info(adhoc_id, job_metadata_ptr); if(!ec) { goto respond; } } job_id = job_metadata->job().id(); job_id = job_metadata_ptr->job().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc.id(), jm_result.error()); Loading Loading @@ -220,10 +220,10 @@ rpc_server::remove_job(const network::request& req, scord::job_id job_id) { if(jm_result) { // if the job was using an adhoc storage instance, inform the // appropriate adhoc_storage that the job is no longer its client const auto& job_metadata = jm_result.value(); const auto& job_metadata_ptr = jm_result.value(); if(const auto adhoc_storage = job_metadata->requirements()->adhoc_storage(); job_metadata_ptr->requirements()->adhoc_storage(); adhoc_storage.has_value()) { ec = m_adhoc_manager.remove_client_info(adhoc_storage->id()); } Loading Loading @@ -263,8 +263,8 @@ rpc_server::register_adhoc_storage( if(const auto am_result = m_adhoc_manager.create(type, name, ctx, resources); am_result.has_value()) { const auto& adhoc_storage_info = am_result.value(); adhoc_id = adhoc_storage_info->adhoc_storage().id(); const auto& adhoc_metadata_ptr = am_result.value(); adhoc_id = adhoc_metadata_ptr->adhoc_storage().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: " "{}\"", Loading Loading @@ -356,10 +356,10 @@ rpc_server::deploy_adhoc_storage(const network::request& req, * information about the instance to deploy. * @return */ const auto deploy_helper = [&](const auto& adhoc_metadata) const auto deploy_helper = [&](const auto& adhoc_metadata_ptr) -> tl::expected<std::filesystem::path, error_code> { assert(adhoc_metadata); const auto adhoc_storage = adhoc_metadata->adhoc_storage(); assert(adhoc_metadata_ptr); const auto adhoc_storage = adhoc_metadata_ptr->adhoc_storage(); const auto endp = lookup(adhoc_storage.context().controller_address()); if(!endp) { Loading @@ -371,11 +371,11 @@ rpc_server::deploy_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", child_rpc, std::quoted(adhoc_metadata->uuid()), child_rpc, std::quoted(adhoc_metadata_ptr->uuid()), adhoc_storage.type(), adhoc_storage.get_resources()); if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata->uuid(), adhoc_storage.type(), if(const auto call_rv = endp->call( rpc.name(), adhoc_metadata_ptr->uuid(), adhoc_storage.type(), adhoc_storage.get_resources()); call_rv.has_value()) { Loading Loading @@ -436,9 +436,9 @@ rpc_server::terminate_adhoc_storage(const network::request& req, * @return */ const auto terminate_helper = [&](const auto& adhoc_metadata) -> error_code { assert(adhoc_metadata); const auto adhoc_storage = adhoc_metadata->adhoc_storage(); [&](const auto& adhoc_metadata_ptr) -> error_code { assert(adhoc_metadata_ptr); const auto adhoc_storage = adhoc_metadata_ptr->adhoc_storage(); const auto endp = lookup(adhoc_storage.context().controller_address()); if(!endp) { Loading @@ -450,9 +450,11 @@ rpc_server::terminate_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}}}", child_rpc, std::quoted(adhoc_metadata->uuid()), adhoc_storage.type()); std::quoted(adhoc_metadata_ptr->uuid()), adhoc_storage.type()); if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata->uuid(), if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata_ptr->uuid(), adhoc_storage.type()); call_rv.has_value()) { Loading Loading @@ -503,8 +505,8 @@ rpc_server::register_pfs_storage(const network::request& req, if(const auto pm_result = m_pfs_manager.create(type, name, ctx); pm_result.has_value()) { const auto& adhoc_storage_info = pm_result.value(); pfs_id = adhoc_storage_info->pfs_storage().id(); const auto& adhoc_metadata_ptr = pm_result.value(); pfs_id = adhoc_metadata_ptr->pfs_storage().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating pfs_storage: {}\"", rpc.id(), pm_result.error()); Loading Loading
src/scord/adhoc_storage_manager.hpp +10 −8 Original line number Diff line number Diff line Loading @@ -113,8 +113,8 @@ struct adhoc_storage_manager { if(const auto it = m_adhoc_storages.find(id); it != m_adhoc_storages.end()) { const auto current_adhoc_info = it->second; current_adhoc_info->update(std::move(new_resources)); const auto adhoc_metadata_ptr = it->second; adhoc_metadata_ptr->update(std::move(new_resources)); return scord::error_code::success; } Loading Loading @@ -156,12 +156,14 @@ struct adhoc_storage_manager { } scord::error_code add_client_info(std::uint64_t adhoc_id, std::shared_ptr<scord::internal::job_metadata> job_info) { add_client_info( std::uint64_t adhoc_id, std::shared_ptr<scord::internal::job_metadata> job_metadata_ptr) { if(auto am_result = find(adhoc_id); am_result.has_value()) { const auto adhoc_storage_info = am_result.value(); return adhoc_storage_info->add_client_info(std::move(job_info)); const auto adhoc_metadata_ptr = am_result.value(); return adhoc_metadata_ptr->add_client_info( std::move(job_metadata_ptr)); } return scord::error_code::no_such_entity; Loading @@ -170,8 +172,8 @@ struct adhoc_storage_manager { scord::error_code remove_client_info(std::uint64_t adhoc_id) { if(auto am_result = find(adhoc_id); am_result.has_value()) { const auto adhoc_storage_info = *am_result; adhoc_storage_info->remove_client_info(); const auto adhoc_metadata_ptr = *am_result; adhoc_metadata_ptr->remove_client_info(); return scord::error_code::success; } Loading
src/scord/internal_types.cpp +5 −5 Original line number Diff line number Diff line Loading @@ -78,9 +78,9 @@ adhoc_storage_metadata::update(scord::adhoc_storage::resources new_resources) { scord::error_code adhoc_storage_metadata::add_client_info( std::shared_ptr<scord::internal::job_metadata> job_info) { std::shared_ptr<scord::internal::job_metadata> job_metadata_ptr) { scord::abt::unique_lock lock(m_info_mutex); scord::abt::unique_lock lock(m_mutex); if(m_client_info) { LOGGER_ERROR("adhoc storage {} already has a client", Loading @@ -88,20 +88,20 @@ adhoc_storage_metadata::add_client_info( return error_code::adhoc_in_use; } m_client_info = std::move(job_info); m_client_info = std::move(job_metadata_ptr); return error_code::success; } void adhoc_storage_metadata::remove_client_info() { scord::abt::unique_lock lock(m_info_mutex); scord::abt::unique_lock lock(m_mutex); m_client_info.reset(); } std::shared_ptr<scord::internal::job_metadata> adhoc_storage_metadata::client_info() const { scord::abt::shared_lock lock(m_info_mutex); scord::abt::shared_lock lock(m_mutex); return m_client_info; } Loading
src/scord/internal_types.hpp +3 −2 Original line number Diff line number Diff line Loading @@ -74,7 +74,8 @@ struct adhoc_storage_metadata { update(scord::adhoc_storage::resources new_resources); scord::error_code add_client_info(std::shared_ptr<scord::internal::job_metadata> job_info); add_client_info( std::shared_ptr<scord::internal::job_metadata> job_metadata_ptr); void remove_client_info(); Loading @@ -85,7 +86,7 @@ struct adhoc_storage_metadata { std::string m_uuid; scord::adhoc_storage m_adhoc_storage; std::shared_ptr<scord::internal::job_metadata> m_client_info; mutable scord::abt::shared_mutex m_info_mutex; mutable scord::abt::shared_mutex m_mutex; }; struct pfs_storage_metadata { Loading
src/scord/pfs_storage_manager.hpp +2 −2 Original line number Diff line number Diff line Loading @@ -73,8 +73,8 @@ struct pfs_storage_manager { if(const auto it = m_pfs_storages.find(id); it != m_pfs_storages.end()) { const auto current_pfs_info = it->second; current_pfs_info->update(std::move(new_ctx)); const auto pfs_metadata_ptr = it->second; pfs_metadata_ptr->update(std::move(new_ctx)); return scord::error_code::success; } Loading
src/scord/rpc_server.cpp +27 −25 Original line number Diff line number Diff line Loading @@ -105,13 +105,13 @@ rpc_server::query(const network::request& req, slurm_job_id job_id) { .or_else([&](auto&& ec) { LOGGER_ERROR("Error retrieving job metadata: {}", ec); }) .and_then([&](auto&& job_metadata) .and_then([&](auto&& job_metadata_ptr) -> tl::expected<job_info, error_code> { if(!job_metadata->resources()) { if(!job_metadata_ptr->resources()) { return tl::make_unexpected( error_code::no_resources); } return job_info{job_metadata->io_procs()}; return job_info{job_metadata_ptr->io_procs()}; }); const response_type resp = Loading Loading @@ -148,20 +148,20 @@ rpc_server::register_job(const network::request& req, m_job_manager.create(slurm_id, job_resources, job_requirements); jm_result.has_value()) { const auto& job_metadata = jm_result.value(); const auto& job_metadata_ptr = jm_result.value(); // if the job requires an adhoc storage instance, inform the appropriate // adhoc_storage instance (if registered) if(job_requirements.adhoc_storage()) { const auto adhoc_id = job_requirements.adhoc_storage()->id(); ec = m_adhoc_manager.add_client_info(adhoc_id, job_metadata); ec = m_adhoc_manager.add_client_info(adhoc_id, job_metadata_ptr); if(!ec) { goto respond; } } job_id = job_metadata->job().id(); job_id = job_metadata_ptr->job().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc.id(), jm_result.error()); Loading Loading @@ -220,10 +220,10 @@ rpc_server::remove_job(const network::request& req, scord::job_id job_id) { if(jm_result) { // if the job was using an adhoc storage instance, inform the // appropriate adhoc_storage that the job is no longer its client const auto& job_metadata = jm_result.value(); const auto& job_metadata_ptr = jm_result.value(); if(const auto adhoc_storage = job_metadata->requirements()->adhoc_storage(); job_metadata_ptr->requirements()->adhoc_storage(); adhoc_storage.has_value()) { ec = m_adhoc_manager.remove_client_info(adhoc_storage->id()); } Loading Loading @@ -263,8 +263,8 @@ rpc_server::register_adhoc_storage( if(const auto am_result = m_adhoc_manager.create(type, name, ctx, resources); am_result.has_value()) { const auto& adhoc_storage_info = am_result.value(); adhoc_id = adhoc_storage_info->adhoc_storage().id(); const auto& adhoc_metadata_ptr = am_result.value(); adhoc_id = adhoc_metadata_ptr->adhoc_storage().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: " "{}\"", Loading Loading @@ -356,10 +356,10 @@ rpc_server::deploy_adhoc_storage(const network::request& req, * information about the instance to deploy. * @return */ const auto deploy_helper = [&](const auto& adhoc_metadata) const auto deploy_helper = [&](const auto& adhoc_metadata_ptr) -> tl::expected<std::filesystem::path, error_code> { assert(adhoc_metadata); const auto adhoc_storage = adhoc_metadata->adhoc_storage(); assert(adhoc_metadata_ptr); const auto adhoc_storage = adhoc_metadata_ptr->adhoc_storage(); const auto endp = lookup(adhoc_storage.context().controller_address()); if(!endp) { Loading @@ -371,11 +371,11 @@ rpc_server::deploy_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", child_rpc, std::quoted(adhoc_metadata->uuid()), child_rpc, std::quoted(adhoc_metadata_ptr->uuid()), adhoc_storage.type(), adhoc_storage.get_resources()); if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata->uuid(), adhoc_storage.type(), if(const auto call_rv = endp->call( rpc.name(), adhoc_metadata_ptr->uuid(), adhoc_storage.type(), adhoc_storage.get_resources()); call_rv.has_value()) { Loading Loading @@ -436,9 +436,9 @@ rpc_server::terminate_adhoc_storage(const network::request& req, * @return */ const auto terminate_helper = [&](const auto& adhoc_metadata) -> error_code { assert(adhoc_metadata); const auto adhoc_storage = adhoc_metadata->adhoc_storage(); [&](const auto& adhoc_metadata_ptr) -> error_code { assert(adhoc_metadata_ptr); const auto adhoc_storage = adhoc_metadata_ptr->adhoc_storage(); const auto endp = lookup(adhoc_storage.context().controller_address()); if(!endp) { Loading @@ -450,9 +450,11 @@ rpc_server::terminate_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}}}", child_rpc, std::quoted(adhoc_metadata->uuid()), adhoc_storage.type()); std::quoted(adhoc_metadata_ptr->uuid()), adhoc_storage.type()); if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata->uuid(), if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata_ptr->uuid(), adhoc_storage.type()); call_rv.has_value()) { Loading Loading @@ -503,8 +505,8 @@ rpc_server::register_pfs_storage(const network::request& req, if(const auto pm_result = m_pfs_manager.create(type, name, ctx); pm_result.has_value()) { const auto& adhoc_storage_info = pm_result.value(); pfs_id = adhoc_storage_info->pfs_storage().id(); const auto& adhoc_metadata_ptr = pm_result.value(); pfs_id = adhoc_metadata_ptr->pfs_storage().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating pfs_storage: {}\"", rpc.id(), pm_result.error()); Loading