Skip to content
Snippets Groups Projects
Verified Commit 68b08afe authored by ANA MANZANO RODRIGUEZ's avatar ANA MANZANO RODRIGUEZ Committed by Alberto Miranda
Browse files

Formatting and cleanup

parent 73cdd8c2
No related branches found
No related tags found
1 merge request!26Resolve "Refactor library RPC implementation of `admire::register_adhoc_storage`"
......@@ -45,8 +45,6 @@ main(int argc, char* argv[]) {
const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS);
const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS);
//const auto server_id = admire::adhoc_storage::impl::generate_id();
auto p = std::make_unique<admire::adhoc_storage>(
admire::storage::type::gekkofs, "foobar",
admire::adhoc_storage::execution_mode::separate_new,
......@@ -55,13 +53,15 @@ main(int argc, char* argv[]) {
admire::job_requirements reqs(inputs, outputs, std::move(p));
std::string id;
//ADM_storage_t adhoc_storage{};
const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false};
const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{
admire::adhoc_storage::execution_mode::separate_new,
admire::adhoc_storage::access_type::read_write, 42, 100, false};
ADM_return_t ret = ADM_SUCCESS;
try {
[[maybe_unused]] const auto job = admire::register_job(server, reqs);
const auto adhoc_storage = admire::register_adhoc_storage(server, job, id, adhoc_storage_ctx);
const auto adhoc_storage = admire::register_adhoc_storage(
server, job, id, adhoc_storage_ctx);
} catch(const std::exception& e) {
fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n",
e.what());
......
......@@ -44,7 +44,7 @@ main(int argc, char* argv[]) {
const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS);
auto p = std::make_unique<admire::adhoc_storage>(
admire::storage::type::gekkofs, "foobar",
admire::storage::type::gekkofs, "foobar",
admire::adhoc_storage::execution_mode::separate_new,
admire::adhoc_storage::access_type::read_write, 42, 100, false);
......
......@@ -44,12 +44,10 @@ main(int argc, char* argv[]) {
const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS);
auto p = std::make_unique<admire::adhoc_storage>(
admire::storage::type::gekkofs, "foobar",
admire::storage::type::gekkofs, "foobar",
admire::adhoc_storage::execution_mode::separate_new,
admire::adhoc_storage::access_type::read_write, 42, 100, false);
//const auto adhoc_storage = admire::register_adhoc_storage(server, job, "foobar", adhoc_storage_ctx);
admire::job_requirements reqs{inputs, outputs, std::move(p)};
......@@ -58,7 +56,7 @@ main(int argc, char* argv[]) {
prepare_datasets("output-new-dataset-{}", NOUTPUTS);
auto p2 = std::make_unique<admire::adhoc_storage>(
admire::storage::type::gekkofs, "foobar",
admire::storage::type::gekkofs, "foobar",
admire::adhoc_storage::execution_mode::separate_new,
admire::adhoc_storage::access_type::read_write, 42, 100, false);
......
......@@ -236,7 +236,7 @@ struct storage {
virtual ~ctx() = default;
};
storage(storage::type type, std::string id);//std::uint64_t server_id);
storage(storage::type type, std::string id);
virtual ~storage() = default;
......@@ -252,7 +252,6 @@ struct storage {
protected:
std::string m_id;
enum type m_type;
//std::uint64_t m_server_id;
};
struct adhoc_storage : public storage {
......@@ -295,21 +294,17 @@ struct adhoc_storage : public storage {
std::uint32_t m_walltime;
bool m_should_flush;
};
adhoc_storage(enum storage::type type, std::string id,
adhoc_storage(enum storage::type type, std::string id,
execution_mode exec_mode, access_type access_type,
std::uint32_t nodes, std::uint32_t walltime,
bool should_flush);
adhoc_storage(enum storage::type type, std::string id,
adhoc_storage(enum storage::type type, std::string id,
ADM_adhoc_context_t ctx);
adhoc_storage(const adhoc_storage& other) noexcept;
adhoc_storage(enum storage::type type, std::string id,
const admire::adhoc_storage::ctx& ctx);
//adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id,
//ADM_adhoc_context_t ctx);
/*adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id,
const admire::adhoc_storage::ctx& ctx);*/
adhoc_storage(adhoc_storage&&) noexcept = default;
adhoc_storage&
operator=(const adhoc_storage&) noexcept;
......@@ -344,9 +339,9 @@ struct pfs_storage : public storage {
std::filesystem::path m_mount_point;
};
pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id,
pfs_storage(enum storage::type type, std::string id,
std::filesystem::path mount_point);
pfs_storage(enum storage::type type, std::string id, ADM_pfs_context_t ctx); //std::uint64_t server_id, ADM_pfs_context_t ctx);
pfs_storage(enum storage::type type, std::string id, ADM_pfs_context_t ctx);
pfs_storage(const pfs_storage& other) noexcept;
pfs_storage(pfs_storage&&) noexcept = default;
pfs_storage&
......
......@@ -1060,8 +1060,7 @@ dataset::id() const {
storage::storage(enum storage::type type, std::string id)
//std::uint64_t server_id)
: m_id(std::move(id)), m_type(type) {}//, m_server_id(server_id) {}
: m_id(std::move(id)), m_type(type) {}
std::string
storage::id() const {
......@@ -1114,7 +1113,7 @@ adhoc_storage::ctx::should_flush() const {
class adhoc_storage::impl {
static std::uint64_t
generate_id() {
static std::atomic_uint64_t s_current_server_id = 0;
......@@ -1122,7 +1121,7 @@ class adhoc_storage::impl {
}
public:
explicit impl(adhoc_storage::ctx ctx) //, std::uint64_t counter)
explicit impl(adhoc_storage::ctx ctx)
: m_id(generate_id()), m_ctx(std::move(ctx)) {}
impl(const impl& rhs) = default;
impl(impl&& rhs) = default;
......@@ -1147,34 +1146,25 @@ private:
};
adhoc_storage::adhoc_storage(enum storage::type type, std::string id, //std::uint64_t server_id,
execution_mode exec_mode,
access_type access_type, std::uint32_t nodes,
std::uint32_t walltime, bool should_flush)
: storage(type, std::move(id)),//, server_id),
adhoc_storage::adhoc_storage(enum storage::type type, std::string id,
execution_mode exec_mode, access_type access_type,
std::uint32_t nodes, std::uint32_t walltime,
bool should_flush)
: storage(type, std::move(id)),
m_pimpl(std::make_unique<impl>(adhoc_storage::ctx{
exec_mode, access_type, nodes, walltime, should_flush})) {}
adhoc_storage::adhoc_storage(enum storage::type type, std::string id, //std::uint64_t server_id,
adhoc_storage::adhoc_storage(enum storage::type type, std::string id,
ADM_adhoc_context_t ctx)
: storage(type, std::move(id)), //, server_id),
: storage(type, std::move(id)),
m_pimpl(std::make_unique<impl>(adhoc_storage::ctx{ctx})) {}
/*
adhoc_storage::adhoc_storage(enum storage::type type, std::string id,
std::uint64_t server_id,
const adhoc_storage::ctx& ctx)
: storage(type, std::move(id), server_id),
m_pimpl(std::make_unique<impl>(ctx)) {}*/
adhoc_storage::adhoc_storage(enum storage::type type, std::string id,
//std::uint64_t server_id,
const adhoc_storage::ctx& ctx)
: storage(type, std::move(id)),//, server_id),
m_pimpl(std::make_unique<impl>(ctx)) {} // este es el nuevo añadido
: storage(type, std::move(id)), m_pimpl(std::make_unique<impl>(ctx)) {}
adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept
: storage(other.m_type, other.m_id), //other.m_server_id),
: storage(other.m_type, other.m_id),
m_pimpl(std::make_unique<impl>(*other.m_pimpl)) {}
adhoc_storage&
......@@ -1202,7 +1192,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point)
pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {}
pfs_storage::pfs_storage(const pfs_storage& other) noexcept
: storage(other.m_type, other.m_id), //other.m_server_id),
: storage(other.m_type, other.m_id), // other.m_server_id),
m_pimpl(std::make_unique<impl>(*other.m_pimpl)) {}
pfs_storage&
......@@ -1236,15 +1226,17 @@ private:
pfs_storage::ctx m_ctx;
};
pfs_storage::pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id,
pfs_storage::pfs_storage(enum storage::type type,
std::string id, // std::uint64_t server_id,
std::filesystem::path mount_point)
: storage(type, std::move(id)), //server_id),
: storage(type, std::move(id)), // server_id),
m_pimpl(std::make_unique<impl>(
pfs_storage::ctx{std::move(mount_point)})) {}
pfs_storage::pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id,
pfs_storage::pfs_storage(enum storage::type type,
std::string id, // std::uint64_t server_id,
ADM_pfs_context_t ctx)
: storage(type, std::move(id)), //server_id),
: storage(type, std::move(id)), // server_id),
m_pimpl(std::make_unique<impl>(pfs_storage::ctx{ctx})) {}
pfs_storage::~pfs_storage() = default;
......@@ -1292,7 +1284,7 @@ public:
static_cast<enum storage::type>(
reqs->r_storage->s_type),
reqs->r_storage->s_id,
//reqs->r_storage->s_server_id,
// reqs->r_storage->s_server_id,
reqs->r_storage->s_adhoc_ctx);
break;
case ADM_STORAGE_LUSTRE:
......@@ -1300,8 +1292,8 @@ public:
m_storage = std::make_unique<pfs_storage>(
static_cast<enum storage::type>(
reqs->r_storage->s_type),
reqs->r_storage->s_id,
//reqs->r_storage->s_server_id,
reqs->r_storage->s_id,
// reqs->r_storage->s_server_id,
reqs->r_storage->s_pfs_ctx);
break;
}
......
......@@ -179,7 +179,7 @@ MERCURY_GEN_STRUCT_PROC(
typedef struct adm_storage {
const char* s_id;
ADM_storage_type_t s_type;
uint64_t s_server_id;
uint64_t s_server_id;
union {
ADM_adhoc_context_t s_adhoc_ctx;
ADM_pfs_context_t s_pfs_ctx;
......@@ -297,9 +297,18 @@ MERCURY_GEN_PROC(
);
/// ADM_register_adhoc_storage
MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((ADM_job_t) (job))((hg_const_string_t) (id))((ADM_adhoc_context_t)(ctx)));
MERCURY_GEN_PROC(
ADM_register_adhoc_storage_in_t,
((ADM_job_t) (job))
((hg_const_string_t) (id))
((ADM_adhoc_context_t)(ctx))
);
MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (retval)) ((uint64_t)(server_id)));
MERCURY_GEN_PROC(
ADM_register_adhoc_storage_out_t,
((int32_t) (retval))
((uint64_t)(server_id))
);
/// ADM_update_adhoc_storage
MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs)))
......
......@@ -235,15 +235,13 @@ remove_job(const server& srv, const job& job) {
admire::adhoc_storage
register_adhoc_storage(const server& srv, const job& job, const std::string& id,
const adhoc_storage::ctx& ctx) {
//return detail::register_adhoc_storage(srv, job, id, ctx);
const auto rv = detail::register_adhoc_storage(srv, job, id, ctx);
if(!rv) {
throw std::runtime_error(fmt::format("ADM_register_adhoc_storage() error: {}",
ADM_strerror(rv.error())));
throw std::runtime_error(
fmt::format("ADM_register_adhoc_storage() error: {}",
ADM_strerror(rv.error())));
}
return rv.value();
......
......@@ -81,18 +81,14 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) {
ADM_return_t
ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, const char* id,
ADM_adhoc_context_t ctx,
ADM_storage_t* adhoc_storage) {
ADM_storage_t* adhoc_storage) {
const admire::server srv{server};
//return admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx});
const auto rv = admire::detail::register_adhoc_storage(
srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx});
const auto rv =
admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx});
if(!rv) {
return rv.error();
}
......
......@@ -321,7 +321,6 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id,
const auto rpc_id = id.c_str();
const auto rpc_ctx = api::convert(ctx);
//ADM_register_adhoc_storage_in_t in{rpc_job.get()};
ADM_register_adhoc_storage_in_t in{rpc_job.get(), rpc_id, rpc_ctx.get()};
ADM_register_adhoc_storage_out_t out;
......@@ -332,7 +331,8 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id,
return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
}
const auto rpc_adhoc_storage = admire::adhoc_storage{admire::storage::type::gekkofs, id, ctx};
const auto rpc_adhoc_storage =
admire::adhoc_storage{admire::storage::type::gekkofs, id, ctx};
LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS);
......
......@@ -225,32 +225,20 @@ ADM_register_adhoc_storage(hg_handle_t h) {
const admire::job job(in.job);
const std::string id(in.id);
const admire::adhoc_storage::ctx ctx(in.ctx);
// const admire::job_requirements reqs(&in.reqs);
const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", rpc_id, __FUNCTION__, job);
const auto adhoc_storage = admire::adhoc_storage(
admire::adhoc_storage::type::gekkofs, id, ctx); //ctx.get()
uint64_t server_id = adhoc_storage.id();
LOGGER_WARN("server_id: {}", server_id);
// LOGGER_INFO("RPC ID {} ({}) <= {{server_id: {{{}}}}}", rpc_id, __FUNCTION__, server_id); //new
// admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::in_job_shared, admire::adhoc_storage::access_type::write_only, 10, 10, false}
admire::adhoc_storage::type::gekkofs, id, ctx);
admire::error_code rv = ADM_SUCCESS;
out.retval = rv;
out.server_id = adhoc_storage.id();
/*
out.adhoc_storage = admire::api::convert(adhoc_storage).get();
LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, adhoc_storage: {{{}}}}}", rpc_id,
__FUNCTION__, rv, adhoc_storage); */
LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, server_id: {}}}", rpc_id, out.server_id,
__FUNCTION__, rv);
LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, server_id: {}}}", rpc_id,
out.server_id, __FUNCTION__, rv);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment