Loading src/common/utils/CMakeLists.txt +1 −1 Original line number Diff line number Diff line Loading @@ -26,4 +26,4 @@ add_library(_utils STATIC) set_property(TARGET _utils PROPERTY POSITION_INDEPENDENT_CODE ON) target_sources(_utils PRIVATE utils.hpp utils.cpp signal_listener.hpp) target_sources(_utils PRIVATE utils.hpp utils.cpp signal_listener.hpp c_ptr.hpp) src/common/utils/c_ptr.hpp 0 → 100644 +96 −0 Original line number Diff line number Diff line /****************************************************************************** * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). * * This file is part of scord. * * scord is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * scord is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with scord. If not, see <https://www.gnu.org/licenses/>. * * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ #ifndef SCORD_UTILS_C_PTR_HPP #define SCORD_UTILS_C_PTR_HPP #include <memory> #include <vector> namespace scord::utils { // A manager for a C raw pointer. It allows to automatically delete dynamically // allocated C structs in a RAII manner (provided that there is a deleter // function for the struct available). template <typename T, auto fn> struct deleter { void operator()(T* ptr) { fn(ptr); } }; template <typename T, auto fn> using c_ptr = std::unique_ptr<typename std::remove_pointer<T>::type, deleter<typename std::remove_pointer<T>::type, fn>>; // A manager for a vector of C raw pointers. It allows to automatically // delete the dynamically allocated C structs pointed by each vector elements // in a RAII manner (provided that there is a deleter function for the struct // available). Can also be used to directly pass an array of C pointers to C // APIs by means of the data() function. template <typename T, auto fn> struct c_ptr_vector { c_ptr_vector() = default; ~c_ptr_vector() = default; constexpr void reserve(size_t n) { m_data.reserve(n); m_addrs.reserve(n); } template <typename... Args> constexpr void emplace_back(Args&&... args) { const auto& tmp = m_data.emplace_back(args...); m_addrs.push_back(tmp.get()); } constexpr const T* data() const noexcept { return m_addrs.data(); } constexpr T* data() noexcept { return m_addrs.data(); } constexpr std::size_t size() const noexcept { return m_data.size(); } std::vector<scord::utils::c_ptr<T, fn>> m_data{}; std::vector<T> m_addrs{}; }; } // namespace scord::utils #endif // SCORD_UTILS_C_PTR_HPP src/lib/admire.cpp +58 −2 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <network/engine.hpp> #include <network/proto/rpc_types.h> #include <logger/logger.hpp> #include <utils/c_ptr.hpp> #include "detail/impl.hpp" Loading Loading @@ -171,6 +172,61 @@ rpc_registration_cb(scord::network::rpc_client* client) { namespace admire { job_requirements::job_requirements(std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs) : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)) {} job_requirements::job_requirements( std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs, std::optional<storage::adhoc::context> adhoc_context) : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)), m_adhoc_context(adhoc_context) {} job_requirements::job_requirements(ADM_job_requirements_t reqs) { m_inputs.reserve(reqs->r_inputs->l_length); for(size_t i = 0; i < reqs->r_inputs->l_length; ++i) { m_inputs.emplace_back(reqs->r_inputs->l_datasets[i].d_id); } m_outputs.reserve(reqs->r_outputs->l_length); for(size_t i = 0; i < reqs->r_outputs->l_length; ++i) { m_outputs.emplace_back(reqs->r_outputs->l_datasets[i].d_id); } } ADM_job_requirements_t job_requirements::to_rpc_type() const { using scord::utils::c_ptr; using scord::utils::c_ptr_vector; using dataset_vector = c_ptr_vector<adm_dataset, ADM_dataset_destroy>; dataset_vector inputs; inputs.reserve(m_inputs.size()); for(const auto& in : m_inputs) { inputs.emplace_back(ADM_dataset_create(in.m_id.c_str())); } dataset_vector outputs; outputs.reserve(m_outputs.size()); for(const auto& out : m_outputs) { outputs.emplace_back(ADM_dataset_create(out.m_id.c_str())); } if(m_adhoc_context) { } return ADM_job_requirements_create(inputs.data(), inputs.size(), outputs.data(), outputs.size(), nullptr); } void ping(const server& srv) { Loading @@ -182,7 +238,7 @@ ping(const server& srv) { admire::job register_job(const server& srv, ADM_job_requirements_t reqs) { register_job(const server& srv, const job_requirements& reqs) { const auto rv = detail::register_job(srv, reqs); Loading @@ -195,7 +251,7 @@ register_job(const server& srv, ADM_job_requirements_t reqs) { } ADM_return_t update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs) { update_job(const server& srv, ADM_job_t job, const job_requirements& reqs) { (void) srv; (void) job; (void) reqs; Loading src/lib/admire.hpp +55 −2 Original line number Diff line number Diff line Loading @@ -24,8 +24,11 @@ #include <admire.h> #include <tl/expected.hpp> #include <optional> #include <cstdarg> #include <string> #include <utility> #include "network/proto/rpc_types.h" #ifndef SCORD_ADMIRE_HPP #define SCORD_ADMIRE_HPP Loading @@ -43,14 +46,64 @@ struct job { job_id m_id; }; struct dataset { explicit dataset(std::string id) : m_id(std::move(id)) {} std::string m_id; }; namespace storage::adhoc { enum class execution_mode : std::underlying_type<ADM_adhoc_mode_t>::type { in_job_shared = ADM_ADHOC_MODE_IN_JOB_SHARED, in_job_dedicated = ADM_ADHOC_MODE_IN_JOB_DEDICATED, separate_new = ADM_ADHOC_MODE_SEPARATE_NEW, separate_existing = ADM_ADHOC_MODE_SEPARATE_EXISTING }; enum class access_mode : std::underlying_type<ADM_adhoc_mode_t>::type { read_only = ADM_ADHOC_ACCESS_RDONLY, write_only = ADM_ADHOC_ACCESS_WRONLY, read_write = ADM_ADHOC_ACCESS_RDWR, }; struct context { execution_mode m_exec_mode; access_mode m_access_mode; std::uint32_t m_nodes; std::uint32_t m_walltime; bool m_should_flush; }; } // namespace storage::adhoc struct job_requirements { job_requirements(std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs); job_requirements(std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs, std::optional<storage::adhoc::context> adhoc_context); explicit job_requirements(ADM_job_requirements_t reqs); ADM_job_requirements_t to_rpc_type() const; std::vector<admire::dataset> m_inputs; std::vector<admire::dataset> m_outputs; std::optional<storage::adhoc::context> m_adhoc_context; }; void ping(const server& srv); admire::job register_job(const server& srv, ADM_job_requirements_t reqs); register_job(const server& srv, const job_requirements& reqs); ADM_return_t update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs); update_job(const server& srv, ADM_job_t job, const job_requirements& reqs); ADM_return_t remove_job(const server& srv, ADM_job_t job); Loading src/lib/c_wrapper.cpp +3 −2 Original line number Diff line number Diff line Loading @@ -641,7 +641,8 @@ ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, const admire::server srv{server->s_protocol, server->s_address}; const auto rv = admire::detail::register_job(srv, reqs); const auto rv = admire::detail::register_job(srv, admire::job_requirements{reqs}); if(!rv) { return rv.error(); Loading @@ -664,7 +665,7 @@ ADM_update_job(ADM_server_t server, ADM_job_t job, const admire::server srv{server->s_protocol, server->s_address}; return admire::update_job(srv, job, reqs); return admire::update_job(srv, job, admire::job_requirements{reqs}); } ADM_return_t Loading Loading
src/common/utils/CMakeLists.txt +1 −1 Original line number Diff line number Diff line Loading @@ -26,4 +26,4 @@ add_library(_utils STATIC) set_property(TARGET _utils PROPERTY POSITION_INDEPENDENT_CODE ON) target_sources(_utils PRIVATE utils.hpp utils.cpp signal_listener.hpp) target_sources(_utils PRIVATE utils.hpp utils.cpp signal_listener.hpp c_ptr.hpp)
src/common/utils/c_ptr.hpp 0 → 100644 +96 −0 Original line number Diff line number Diff line /****************************************************************************** * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). * * This file is part of scord. * * scord is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * scord is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with scord. If not, see <https://www.gnu.org/licenses/>. * * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ #ifndef SCORD_UTILS_C_PTR_HPP #define SCORD_UTILS_C_PTR_HPP #include <memory> #include <vector> namespace scord::utils { // A manager for a C raw pointer. It allows to automatically delete dynamically // allocated C structs in a RAII manner (provided that there is a deleter // function for the struct available). template <typename T, auto fn> struct deleter { void operator()(T* ptr) { fn(ptr); } }; template <typename T, auto fn> using c_ptr = std::unique_ptr<typename std::remove_pointer<T>::type, deleter<typename std::remove_pointer<T>::type, fn>>; // A manager for a vector of C raw pointers. It allows to automatically // delete the dynamically allocated C structs pointed by each vector elements // in a RAII manner (provided that there is a deleter function for the struct // available). Can also be used to directly pass an array of C pointers to C // APIs by means of the data() function. template <typename T, auto fn> struct c_ptr_vector { c_ptr_vector() = default; ~c_ptr_vector() = default; constexpr void reserve(size_t n) { m_data.reserve(n); m_addrs.reserve(n); } template <typename... Args> constexpr void emplace_back(Args&&... args) { const auto& tmp = m_data.emplace_back(args...); m_addrs.push_back(tmp.get()); } constexpr const T* data() const noexcept { return m_addrs.data(); } constexpr T* data() noexcept { return m_addrs.data(); } constexpr std::size_t size() const noexcept { return m_data.size(); } std::vector<scord::utils::c_ptr<T, fn>> m_data{}; std::vector<T> m_addrs{}; }; } // namespace scord::utils #endif // SCORD_UTILS_C_PTR_HPP
src/lib/admire.cpp +58 −2 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <network/engine.hpp> #include <network/proto/rpc_types.h> #include <logger/logger.hpp> #include <utils/c_ptr.hpp> #include "detail/impl.hpp" Loading Loading @@ -171,6 +172,61 @@ rpc_registration_cb(scord::network::rpc_client* client) { namespace admire { job_requirements::job_requirements(std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs) : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)) {} job_requirements::job_requirements( std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs, std::optional<storage::adhoc::context> adhoc_context) : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)), m_adhoc_context(adhoc_context) {} job_requirements::job_requirements(ADM_job_requirements_t reqs) { m_inputs.reserve(reqs->r_inputs->l_length); for(size_t i = 0; i < reqs->r_inputs->l_length; ++i) { m_inputs.emplace_back(reqs->r_inputs->l_datasets[i].d_id); } m_outputs.reserve(reqs->r_outputs->l_length); for(size_t i = 0; i < reqs->r_outputs->l_length; ++i) { m_outputs.emplace_back(reqs->r_outputs->l_datasets[i].d_id); } } ADM_job_requirements_t job_requirements::to_rpc_type() const { using scord::utils::c_ptr; using scord::utils::c_ptr_vector; using dataset_vector = c_ptr_vector<adm_dataset, ADM_dataset_destroy>; dataset_vector inputs; inputs.reserve(m_inputs.size()); for(const auto& in : m_inputs) { inputs.emplace_back(ADM_dataset_create(in.m_id.c_str())); } dataset_vector outputs; outputs.reserve(m_outputs.size()); for(const auto& out : m_outputs) { outputs.emplace_back(ADM_dataset_create(out.m_id.c_str())); } if(m_adhoc_context) { } return ADM_job_requirements_create(inputs.data(), inputs.size(), outputs.data(), outputs.size(), nullptr); } void ping(const server& srv) { Loading @@ -182,7 +238,7 @@ ping(const server& srv) { admire::job register_job(const server& srv, ADM_job_requirements_t reqs) { register_job(const server& srv, const job_requirements& reqs) { const auto rv = detail::register_job(srv, reqs); Loading @@ -195,7 +251,7 @@ register_job(const server& srv, ADM_job_requirements_t reqs) { } ADM_return_t update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs) { update_job(const server& srv, ADM_job_t job, const job_requirements& reqs) { (void) srv; (void) job; (void) reqs; Loading
src/lib/admire.hpp +55 −2 Original line number Diff line number Diff line Loading @@ -24,8 +24,11 @@ #include <admire.h> #include <tl/expected.hpp> #include <optional> #include <cstdarg> #include <string> #include <utility> #include "network/proto/rpc_types.h" #ifndef SCORD_ADMIRE_HPP #define SCORD_ADMIRE_HPP Loading @@ -43,14 +46,64 @@ struct job { job_id m_id; }; struct dataset { explicit dataset(std::string id) : m_id(std::move(id)) {} std::string m_id; }; namespace storage::adhoc { enum class execution_mode : std::underlying_type<ADM_adhoc_mode_t>::type { in_job_shared = ADM_ADHOC_MODE_IN_JOB_SHARED, in_job_dedicated = ADM_ADHOC_MODE_IN_JOB_DEDICATED, separate_new = ADM_ADHOC_MODE_SEPARATE_NEW, separate_existing = ADM_ADHOC_MODE_SEPARATE_EXISTING }; enum class access_mode : std::underlying_type<ADM_adhoc_mode_t>::type { read_only = ADM_ADHOC_ACCESS_RDONLY, write_only = ADM_ADHOC_ACCESS_WRONLY, read_write = ADM_ADHOC_ACCESS_RDWR, }; struct context { execution_mode m_exec_mode; access_mode m_access_mode; std::uint32_t m_nodes; std::uint32_t m_walltime; bool m_should_flush; }; } // namespace storage::adhoc struct job_requirements { job_requirements(std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs); job_requirements(std::vector<admire::dataset> inputs, std::vector<admire::dataset> outputs, std::optional<storage::adhoc::context> adhoc_context); explicit job_requirements(ADM_job_requirements_t reqs); ADM_job_requirements_t to_rpc_type() const; std::vector<admire::dataset> m_inputs; std::vector<admire::dataset> m_outputs; std::optional<storage::adhoc::context> m_adhoc_context; }; void ping(const server& srv); admire::job register_job(const server& srv, ADM_job_requirements_t reqs); register_job(const server& srv, const job_requirements& reqs); ADM_return_t update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs); update_job(const server& srv, ADM_job_t job, const job_requirements& reqs); ADM_return_t remove_job(const server& srv, ADM_job_t job); Loading
src/lib/c_wrapper.cpp +3 −2 Original line number Diff line number Diff line Loading @@ -641,7 +641,8 @@ ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, const admire::server srv{server->s_protocol, server->s_address}; const auto rv = admire::detail::register_job(srv, reqs); const auto rv = admire::detail::register_job(srv, admire::job_requirements{reqs}); if(!rv) { return rv.error(); Loading @@ -664,7 +665,7 @@ ADM_update_job(ADM_server_t server, ADM_job_t job, const admire::server srv{server->s_protocol, server->s_address}; return admire::update_job(srv, job, reqs); return admire::update_job(srv, job, admire::job_requirements{reqs}); } ADM_return_t Loading