Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • eu/admire/io-scheduler
1 result
Show changes
...@@ -439,16 +439,16 @@ ADM_dataset_list_destroy(ADM_dataset_list_t list) { ...@@ -439,16 +439,16 @@ ADM_dataset_list_destroy(ADM_dataset_list_t list) {
return ret; return ret;
} }
ADM_storage_t ADM_adhoc_storage_t
ADM_storage_create(const char* name, ADM_storage_type_t type, uint64_t id, ADM_adhoc_storage_create(const char* name, ADM_adhoc_storage_type_t type,
void* ctx) { uint64_t id, ADM_adhoc_context_t adhoc_ctx) {
struct adm_storage* adm_storage = struct adm_adhoc_storage* adm_adhoc_storage =
(struct adm_storage*) malloc(sizeof(*adm_storage)); (struct adm_adhoc_storage*) malloc(sizeof(*adm_adhoc_storage));
const char* error_msg = NULL; const char* error_msg = NULL;
if(!adm_storage) { if(!adm_adhoc_storage) {
LOGGER_ERROR("Could not allocate ADM_storage_t"); LOGGER_ERROR("Could not allocate ADM_adhoc_storage_t");
return NULL; return NULL;
} }
...@@ -457,92 +457,59 @@ ADM_storage_create(const char* name, ADM_storage_type_t type, uint64_t id, ...@@ -457,92 +457,59 @@ ADM_storage_create(const char* name, ADM_storage_type_t type, uint64_t id,
return NULL; return NULL;
} }
if(!ctx) { if(!adhoc_ctx) {
LOGGER_ERROR("Null storage context") LOGGER_ERROR("Null storage context")
return NULL; return NULL;
} }
adm_storage->s_name = (const char*) calloc(strlen(name) + 1, sizeof(char)); adm_adhoc_storage->s_name =
strcpy((char*) adm_storage->s_name, name); (const char*) calloc(strlen(name) + 1, sizeof(char));
adm_storage->s_type = type; strcpy((char*) adm_adhoc_storage->s_name, name);
adm_storage->s_id = id; adm_adhoc_storage->s_type = type;
adm_adhoc_storage->s_id = id;
switch(adm_storage->s_type) {
case ADM_STORAGE_GEKKOFS:
case ADM_STORAGE_DATACLAY:
case ADM_STORAGE_EXPAND:
case ADM_STORAGE_HERCULES:
adm_storage->s_adhoc_ctx =
(ADM_adhoc_context_t) calloc(1, sizeof(adm_adhoc_context));
if(!adm_storage->s_adhoc_ctx) {
error_msg = "Could not allocate ADM_adhoc_context_t";
goto cleanup_on_error;
}
memcpy(adm_storage->s_adhoc_ctx, (ADM_adhoc_context_t) ctx,
sizeof(*(ADM_adhoc_context_t) ctx));
break;
case ADM_STORAGE_LUSTRE:
case ADM_STORAGE_GPFS:
adm_storage->s_pfs_ctx =
(ADM_pfs_context_t) calloc(1, sizeof(adm_pfs_context));
if(!adm_storage->s_pfs_ctx) {
error_msg = "Could not allocate ADM_pfs_context_t";
goto cleanup_on_error;
}
memcpy(adm_storage->s_pfs_ctx, (ADM_pfs_context_t) ctx, adm_adhoc_storage->s_adhoc_ctx =
sizeof(*(ADM_pfs_context_t) ctx)); (ADM_adhoc_context_t) calloc(1, sizeof(adm_adhoc_context));
break; if(!adm_adhoc_storage->s_adhoc_ctx) {
error_msg = "Could not allocate ADM_adhoc_context_t";
goto cleanup_on_error;
} }
return adm_storage; memcpy(adm_adhoc_storage->s_adhoc_ctx, adhoc_ctx, sizeof(*adhoc_ctx));
return adm_adhoc_storage;
cleanup_on_error: cleanup_on_error:
if(error_msg) { if(error_msg) {
LOGGER_ERROR(error_msg); LOGGER_ERROR(error_msg);
} }
[[maybe_unused]] ADM_return_t ret = ADM_storage_destroy(adm_storage); [[maybe_unused]] ADM_return_t ret =
ADM_adhoc_storage_destroy(adm_adhoc_storage);
assert(ret); assert(ret);
return NULL; return NULL;
} }
ADM_return_t ADM_return_t
ADM_storage_destroy(ADM_storage_t storage) { ADM_adhoc_storage_destroy(ADM_adhoc_storage_t adhoc_storage) {
ADM_return_t ret = ADM_SUCCESS; ADM_return_t ret = ADM_SUCCESS;
if(!storage) { if(!adhoc_storage) {
LOGGER_ERROR("Invalid ADM_storage_t") LOGGER_ERROR("Invalid ADM_adhoc_storage_t")
return ADM_EBADARGS; return ADM_EBADARGS;
} }
if(storage->s_name) { if(adhoc_storage->s_name) {
free((void*) storage->s_name); free((void*) adhoc_storage->s_name);
} }
switch(storage->s_type) { if(adhoc_storage->s_adhoc_ctx) {
case ADM_STORAGE_GEKKOFS: free(adhoc_storage->s_adhoc_ctx);
case ADM_STORAGE_DATACLAY:
case ADM_STORAGE_EXPAND:
case ADM_STORAGE_HERCULES:
if(storage->s_adhoc_ctx) {
free(storage->s_adhoc_ctx);
}
break;
case ADM_STORAGE_LUSTRE:
case ADM_STORAGE_GPFS:
if(storage->s_pfs_ctx) {
free(storage->s_pfs_ctx);
}
break;
} }
free(storage); free(adhoc_storage);
return ret; return ret;
} }
...@@ -601,6 +568,80 @@ ADM_adhoc_resources_destroy(ADM_adhoc_resources_t res) { ...@@ -601,6 +568,80 @@ ADM_adhoc_resources_destroy(ADM_adhoc_resources_t res) {
return ret; return ret;
} }
ADM_pfs_storage_t
ADM_pfs_storage_create(const char* name, ADM_pfs_storage_type_t type,
uint64_t id, ADM_pfs_context_t pfs_ctx) {
struct adm_pfs_storage* adm_pfs_storage =
(struct adm_pfs_storage*) malloc(sizeof(*adm_pfs_storage));
const char* error_msg = NULL;
if(!adm_pfs_storage) {
LOGGER_ERROR("Could not allocate ADM_pfs_storage_t");
return NULL;
}
if(!name) {
LOGGER_ERROR("Null storage name")
return NULL;
}
if(!pfs_ctx) {
LOGGER_ERROR("Null storage context")
return NULL;
}
adm_pfs_storage->s_name =
(const char*) calloc(strlen(name) + 1, sizeof(char));
strcpy((char*) adm_pfs_storage->s_name, name);
adm_pfs_storage->s_type = type;
adm_pfs_storage->s_id = id;
adm_pfs_storage->s_pfs_ctx =
(ADM_pfs_context_t) calloc(1, sizeof(adm_pfs_context));
if(!adm_pfs_storage->s_pfs_ctx) {
error_msg = "Could not allocate ADM_pfs_context_t";
goto cleanup_on_error;
}
memcpy(adm_pfs_storage->s_pfs_ctx, pfs_ctx, sizeof(*pfs_ctx));
return adm_pfs_storage;
cleanup_on_error:
if(error_msg) {
LOGGER_ERROR(error_msg);
}
[[maybe_unused]] ADM_return_t ret =
ADM_pfs_storage_destroy(adm_pfs_storage);
assert(ret);
return NULL;
}
ADM_return_t
ADM_pfs_storage_destroy(ADM_pfs_storage_t pfs_storage) {
ADM_return_t ret = ADM_SUCCESS;
if(!pfs_storage) {
LOGGER_ERROR("Invalid ADM_pfs_storage_t")
return ADM_EBADARGS;
}
if(pfs_storage->s_name) {
free((void*) pfs_storage->s_name);
}
if(pfs_storage->s_pfs_ctx) {
free(pfs_storage->s_pfs_ctx);
}
free(pfs_storage);
return ret;
}
ADM_data_operation_t ADM_data_operation_t
ADM_data_operation_create() { ADM_data_operation_create() {
...@@ -755,7 +796,7 @@ ADM_job_resources_destroy(ADM_job_resources_t res) { ...@@ -755,7 +796,7 @@ ADM_job_resources_destroy(ADM_job_resources_t res) {
ADM_job_requirements_t ADM_job_requirements_t
ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len, ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len,
ADM_dataset_t outputs[], size_t outputs_len, ADM_dataset_t outputs[], size_t outputs_len,
ADM_storage_t storage) { ADM_adhoc_storage_t adhoc_storage) {
struct adm_job_requirements* adm_job_reqs = struct adm_job_requirements* adm_job_reqs =
(struct adm_job_requirements*) calloc( (struct adm_job_requirements*) calloc(
...@@ -787,21 +828,21 @@ ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len, ...@@ -787,21 +828,21 @@ ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len,
adm_job_reqs->r_inputs = inputs_list; adm_job_reqs->r_inputs = inputs_list;
adm_job_reqs->r_outputs = outputs_list; adm_job_reqs->r_outputs = outputs_list;
if(!storage) { if(!adhoc_storage) {
return adm_job_reqs; return adm_job_reqs;
} }
if(storage->s_type != ADM_STORAGE_GEKKOFS && if(adhoc_storage->s_type != ADM_ADHOC_STORAGE_GEKKOFS &&
storage->s_type != ADM_STORAGE_DATACLAY && adhoc_storage->s_type != ADM_ADHOC_STORAGE_DATACLAY &&
storage->s_type != ADM_STORAGE_EXPAND && adhoc_storage->s_type != ADM_ADHOC_STORAGE_EXPAND &&
storage->s_type != ADM_STORAGE_HERCULES) { adhoc_storage->s_type != ADM_ADHOC_STORAGE_HERCULES) {
error_msg = "Invalid adhoc_storage type"; error_msg = "Invalid adhoc_storage type";
goto cleanup_on_error; goto cleanup_on_error;
} }
adm_job_reqs->r_storage = adm_job_reqs->r_adhoc_storage = ADM_adhoc_storage_create(
ADM_storage_create(storage->s_name, storage->s_type, storage->s_id, adhoc_storage->s_name, adhoc_storage->s_type, adhoc_storage->s_id,
storage->s_adhoc_ctx); adhoc_storage->s_adhoc_ctx);
return adm_job_reqs; return adm_job_reqs;
...@@ -835,8 +876,8 @@ ADM_job_requirements_destroy(ADM_job_requirements_t reqs) { ...@@ -835,8 +876,8 @@ ADM_job_requirements_destroy(ADM_job_requirements_t reqs) {
ADM_dataset_list_destroy(reqs->r_outputs); ADM_dataset_list_destroy(reqs->r_outputs);
} }
if(reqs->r_storage) { if(reqs->r_adhoc_storage) {
ADM_storage_destroy(reqs->r_storage); ADM_adhoc_storage_destroy(reqs->r_adhoc_storage);
} }
free(reqs); free(reqs);
...@@ -1279,24 +1320,6 @@ dataset::id() const { ...@@ -1279,24 +1320,6 @@ dataset::id() const {
return m_pimpl->id(); return m_pimpl->id();
} }
storage::storage(enum storage::type type, std::string name, std::uint64_t id)
: m_name(std::move(name)), m_type(type), m_id(id) {}
std::string
storage::name() const {
return m_name;
}
enum storage::type
storage::type() const {
return m_type;
}
std::uint64_t
storage::id() const {
return m_id;
}
adhoc_storage::resources::resources(std::vector<admire::node> nodes) adhoc_storage::resources::resources(std::vector<admire::node> nodes)
: m_nodes(std::move(nodes)) {} : m_nodes(std::move(nodes)) {}
...@@ -1356,7 +1379,10 @@ adhoc_storage::ctx::should_flush() const { ...@@ -1356,7 +1379,10 @@ adhoc_storage::ctx::should_flush() const {
class adhoc_storage::impl { class adhoc_storage::impl {
public: public:
explicit impl(adhoc_storage::ctx ctx) : m_ctx(std::move(ctx)) {} explicit impl(enum adhoc_storage::type type, std::string name,
std::uint64_t id, adhoc_storage::ctx ctx)
: m_type(type), m_name(std::move(name)), m_id(id),
m_ctx(std::move(ctx)) {}
impl(const impl& rhs) = default; impl(const impl& rhs) = default;
impl(impl&& rhs) = default; impl(impl&& rhs) = default;
impl& impl&
...@@ -1365,6 +1391,21 @@ public: ...@@ -1365,6 +1391,21 @@ public:
operator=(impl&&) noexcept = default; operator=(impl&&) noexcept = default;
~impl() = default; ~impl() = default;
std::string
name() const {
return m_name;
}
enum type
type() const {
return m_type;
}
std::uint64_t
id() const {
return m_id;
}
adhoc_storage::ctx adhoc_storage::ctx
context() const { context() const {
return m_ctx; return m_ctx;
...@@ -1376,50 +1417,33 @@ public: ...@@ -1376,50 +1417,33 @@ public:
} }
private: private:
enum type m_type;
std::string m_name;
std::uint64_t m_id;
adhoc_storage::ctx m_ctx; adhoc_storage::ctx m_ctx;
}; };
adhoc_storage::adhoc_storage(enum storage::type type, std::string name, adhoc_storage::adhoc_storage(enum adhoc_storage::type type, std::string name,
std::uint64_t id, execution_mode exec_mode, std::uint64_t id, execution_mode exec_mode,
access_type access_type, access_type access_type,
adhoc_storage::resources res, adhoc_storage::resources res,
std::uint32_t walltime, bool should_flush) std::uint32_t walltime, bool should_flush)
: storage(type, std::move(name), id), : m_pimpl(std::make_unique<impl>(
m_pimpl(std::make_unique<impl>( type, std::move(name), id,
adhoc_storage::ctx{exec_mode, access_type, std::move(res), adhoc_storage::ctx{exec_mode, access_type, std::move(res),
walltime, should_flush})) {} walltime, should_flush})) {}
adhoc_storage::adhoc_storage(ADM_storage_t st) adhoc_storage::adhoc_storage(ADM_adhoc_storage_t st)
: storage(static_cast<enum storage::type>(st->s_type), st->s_name, : m_pimpl(std::make_unique<impl>(
st->s_id) { static_cast<enum adhoc_storage::type>(st->s_type), st->s_name,
st->s_id, adhoc_storage::ctx{st->s_adhoc_ctx})) {}
switch(st->s_type) {
case ADM_STORAGE_LUSTRE:
case ADM_STORAGE_GPFS:
throw std::runtime_error(
fmt::format("Invalid type {} for adhoc_storage",
static_cast<enum storage::type>(st->s_type)));
break;
default:
break;
}
m_pimpl = std::make_unique<impl>(adhoc_storage::ctx{st->s_adhoc_ctx});
}
adhoc_storage::adhoc_storage(enum storage::type type, std::string name,
std::uint64_t id, ADM_adhoc_context_t ctx)
: storage(type, std::move(name), id),
m_pimpl(std::make_unique<impl>(adhoc_storage::ctx{ctx})) {}
adhoc_storage::adhoc_storage(enum storage::type type, std::string name, adhoc_storage::adhoc_storage(enum adhoc_storage::type type, std::string name,
std::uint64_t id, const adhoc_storage::ctx& ctx) std::uint64_t id, const adhoc_storage::ctx& ctx)
: storage(type, std::move(name), id), m_pimpl(std::make_unique<impl>(ctx)) { : m_pimpl(std::make_unique<impl>(type, std::move(name), id, ctx)) {}
}
adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept
: storage(other.m_type, other.m_name, other.m_id), : m_pimpl(std::make_unique<impl>(*other.m_pimpl)) {}
m_pimpl(std::make_unique<impl>(*other.m_pimpl)) {}
adhoc_storage::adhoc_storage(adhoc_storage&&) noexcept = default; adhoc_storage::adhoc_storage(adhoc_storage&&) noexcept = default;
...@@ -1432,9 +1456,24 @@ adhoc_storage::operator=(const adhoc_storage& other) noexcept { ...@@ -1432,9 +1456,24 @@ adhoc_storage::operator=(const adhoc_storage& other) noexcept {
adhoc_storage& adhoc_storage&
adhoc_storage::operator=(adhoc_storage&&) noexcept = default; adhoc_storage::operator=(adhoc_storage&&) noexcept = default;
std::shared_ptr<storage::ctx> std::string
adhoc_storage::name() const {
return m_pimpl->name();
}
enum adhoc_storage::type
adhoc_storage::type() const {
return m_pimpl->type();
}
std::uint64_t
adhoc_storage::id() const {
return m_pimpl->id();
}
adhoc_storage::ctx
adhoc_storage::context() const { adhoc_storage::context() const {
return std::make_shared<adhoc_storage::ctx>(m_pimpl->context()); return m_pimpl->context();
} }
void void
...@@ -1450,8 +1489,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) ...@@ -1450,8 +1489,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point)
pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {}
pfs_storage::pfs_storage(const pfs_storage& other) noexcept pfs_storage::pfs_storage(const pfs_storage& other) noexcept
: storage(other.m_type, other.m_name, other.m_id), : m_pimpl(std::make_unique<impl>(*other.m_pimpl)) {}
m_pimpl(std::make_unique<impl>(*other.m_pimpl)) {}
pfs_storage& pfs_storage&
pfs_storage::operator=(const pfs_storage& other) noexcept { pfs_storage::operator=(const pfs_storage& other) noexcept {
...@@ -1465,9 +1503,11 @@ pfs_storage::ctx::mount_point() const { ...@@ -1465,9 +1503,11 @@ pfs_storage::ctx::mount_point() const {
} }
class pfs_storage::impl { class pfs_storage::impl {
public: public:
explicit impl(pfs_storage::ctx ctx) : m_ctx(std::move(ctx)) {} explicit impl(enum pfs_storage::type type, std::string name,
std::uint64_t id, pfs_storage::ctx ctx)
: m_type(type), m_name(std::move(name)), m_id(id),
m_ctx(std::move(ctx)) {}
impl(const impl& rhs) = default; impl(const impl& rhs) = default;
impl(impl&& rhs) = default; impl(impl&& rhs) = default;
impl& impl&
...@@ -1475,35 +1515,62 @@ public: ...@@ -1475,35 +1515,62 @@ public:
impl& impl&
operator=(impl&&) noexcept = default; operator=(impl&&) noexcept = default;
enum type
type() const {
return m_type;
};
std::string
name() const {
return m_name;
}
std::uint64_t
id() const {
return m_id;
};
pfs_storage::ctx pfs_storage::ctx
context() const { context() const {
return m_ctx; return m_ctx;
} }
private: private:
enum type m_type;
std::string m_name;
std::uint64_t m_id;
pfs_storage::ctx m_ctx; pfs_storage::ctx m_ctx;
}; };
pfs_storage::pfs_storage(enum storage::type type, std::string name, pfs_storage::pfs_storage(enum pfs_storage::type type, std::string name,
std::uint64_t id, std::filesystem::path mount_point) std::uint64_t id, std::filesystem::path mount_point)
: storage(type, std::move(name), id), : m_pimpl(std::make_unique<impl>(
m_pimpl(std::make_unique<impl>( type, std::move(name), id,
pfs_storage::ctx{std::move(mount_point)})) {} pfs_storage::ctx{std::move(mount_point)})) {}
pfs_storage::pfs_storage(enum storage::type type, std::string name,
std::uint64_t id, ADM_pfs_context_t ctx)
: storage(type, std::move(name), id),
m_pimpl(std::make_unique<impl>(pfs_storage::ctx{ctx})) {}
pfs_storage::~pfs_storage() = default; pfs_storage::~pfs_storage() = default;
std::shared_ptr<storage::ctx> std::string
pfs_storage::name() const {
return m_pimpl->name();
}
enum pfs_storage::type
pfs_storage::type() const {
return m_pimpl->type();
}
std::uint64_t
pfs_storage::id() const {
return m_pimpl->id();
}
pfs_storage::ctx
pfs_storage::context() const { pfs_storage::context() const {
return std::make_shared<pfs_storage::ctx>(m_pimpl->context()); return m_pimpl->context();
} }
class job_requirements::impl { class job_requirements::impl {
public: public:
impl(std::vector<admire::dataset> inputs, impl(std::vector<admire::dataset> inputs,
std::vector<admire::dataset> outputs) std::vector<admire::dataset> outputs)
...@@ -1529,12 +1596,8 @@ public: ...@@ -1529,12 +1596,8 @@ public:
m_outputs.emplace_back(reqs->r_outputs->l_datasets[i].d_id); m_outputs.emplace_back(reqs->r_outputs->l_datasets[i].d_id);
} }
if(reqs->r_storage) { if(reqs->r_adhoc_storage) {
// TODO add a conversion constructor m_adhoc_storage = admire::adhoc_storage(reqs->r_adhoc_storage);
m_adhoc_storage = admire::adhoc_storage(
static_cast<enum storage::type>(reqs->r_storage->s_type),
reqs->r_storage->s_name, reqs->r_storage->s_id,
reqs->r_storage->s_adhoc_ctx);
} }
} }
...@@ -1612,7 +1675,6 @@ job_requirements::adhoc_storage() const { ...@@ -1612,7 +1675,6 @@ job_requirements::adhoc_storage() const {
} }
namespace qos { namespace qos {
class entity::impl { class entity::impl {
public: public:
template <typename T> template <typename T>
......
...@@ -24,8 +24,10 @@ ...@@ -24,8 +24,10 @@
#include "rpc_types.h" #include "rpc_types.h"
hg_return_t (*hg_proc_ADM_storage_type_t)(hg_proc_t, hg_return_t (*hg_proc_ADM_adhoc_storage_type_t)(hg_proc_t,
void*) = hg_proc_hg_uint32_t; void*) = hg_proc_hg_uint32_t;
hg_return_t (*hg_proc_ADM_pfs_storage_type_t)(hg_proc_t,
void*) = hg_proc_hg_uint32_t;
hg_return_t (*hg_proc_ADM_qos_scope_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t; hg_return_t (*hg_proc_ADM_qos_scope_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t;
hg_return_t (*hg_proc_ADM_qos_class_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t; hg_return_t (*hg_proc_ADM_qos_class_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t;
...@@ -387,20 +389,19 @@ hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data) { ...@@ -387,20 +389,19 @@ hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data) {
hg_return_t hg_return_t
hg_proc_ADM_storage_t(hg_proc_t proc, void* data) { hg_proc_ADM_adhoc_storage_t(hg_proc_t proc, void* data) {
(void) proc;
(void) data;
hg_return_t ret = HG_SUCCESS; hg_return_t ret = HG_SUCCESS;
ADM_storage_t* storage = (ADM_storage_t*) data; ADM_adhoc_storage_t* storage = (ADM_adhoc_storage_t*) data;
ADM_storage_t tmp = NULL; ADM_adhoc_storage_t tmp = NULL;
hg_size_t storage_length = 0; hg_size_t storage_length = 0;
switch(hg_proc_get_op(proc)) { switch(hg_proc_get_op(proc)) {
case HG_ENCODE: case HG_ENCODE:
// find out the length of the adm_storage object we need to send // find out the length of the adm_adhoc_storage object we need to
storage_length = *storage ? sizeof(adm_storage) : 0; // send
storage_length = *storage ? sizeof(adm_adhoc_storage) : 0;
ret = hg_proc_hg_size_t(proc, &storage_length); ret = hg_proc_hg_size_t(proc, &storage_length);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
...@@ -411,7 +412,7 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) { ...@@ -411,7 +412,7 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) {
return HG_SUCCESS; return HG_SUCCESS;
} }
// if we actually need to send an adm_storage object, // if we actually need to send an adm_adhoc_storage object,
// write each of its fields to the mercury buffer // write each of its fields to the mercury buffer
tmp = *storage; tmp = *storage;
...@@ -437,23 +438,16 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) { ...@@ -437,23 +438,16 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) {
} }
// 4. the appropriate storage context // 4. the appropriate storage context
switch(tmp->s_type) { ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx);
case ADM_STORAGE_GEKKOFS:
case ADM_STORAGE_DATACLAY: if(ret != HG_SUCCESS) {
case ADM_STORAGE_EXPAND: break;
case ADM_STORAGE_HERCULES:
ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx);
break;
case ADM_STORAGE_LUSTRE:
case ADM_STORAGE_GPFS:
ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_pfs_ctx);
break;
} }
break; break;
case HG_DECODE: case HG_DECODE:
// find out the length of the adm_storage object // find out the length of the adm_adhoc_storage object
ret = hg_proc_hg_size_t(proc, &storage_length); ret = hg_proc_hg_size_t(proc, &storage_length);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
...@@ -465,9 +459,9 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) { ...@@ -465,9 +459,9 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) {
break; break;
} }
// if the received adm_storage object was not NULL, read each of // if the received adm_adhoc_storage object was not NULL, read each
// its fields from the mercury buffer // of its fields from the mercury buffer
tmp = (adm_storage*) calloc(1, sizeof(adm_storage)); tmp = (adm_adhoc_storage*) calloc(1, sizeof(adm_adhoc_storage));
// 1. the storage type // 1. the storage type
ret = hg_proc_uint32_t(proc, &tmp->s_type); ret = hg_proc_uint32_t(proc, &tmp->s_type);
...@@ -491,17 +485,10 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) { ...@@ -491,17 +485,10 @@ hg_proc_ADM_storage_t(hg_proc_t proc, void* data) {
} }
// 4. the appropriate storage context // 4. the appropriate storage context
switch(tmp->s_type) { ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx);
case ADM_STORAGE_GEKKOFS:
case ADM_STORAGE_DATACLAY: if(ret != HG_SUCCESS) {
case ADM_STORAGE_EXPAND: break;
case ADM_STORAGE_HERCULES:
ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx);
break;
case ADM_STORAGE_LUSTRE:
case ADM_STORAGE_GPFS:
ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_adhoc_ctx);
break;
} }
// return the newly-created ctx // return the newly-created ctx
...@@ -588,6 +575,116 @@ hg_proc_ADM_adhoc_context_t(hg_proc_t proc, void* data) { ...@@ -588,6 +575,116 @@ hg_proc_ADM_adhoc_context_t(hg_proc_t proc, void* data) {
return ret; return ret;
} }
hg_return_t
hg_proc_ADM_pfs_storage_t(hg_proc_t proc, void* data) {
hg_return_t ret = HG_SUCCESS;
ADM_pfs_storage_t* storage = (ADM_pfs_storage_t*) data;
ADM_pfs_storage_t tmp = NULL;
hg_size_t storage_length = 0;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
// find out the length of the adm_pfs_storage object we need to send
storage_length = *storage ? sizeof(adm_pfs_storage) : 0;
ret = hg_proc_hg_size_t(proc, &storage_length);
if(ret != HG_SUCCESS) {
break;
}
if(!storage_length) {
return HG_SUCCESS;
}
// if we actually need to send an adm_pfs_storage object,
// write each of its fields to the mercury buffer
tmp = *storage;
// 1. the storage type
ret = hg_proc_hg_uint32_t(proc, &tmp->s_type);
if(ret != HG_SUCCESS) {
break;
}
// 2. the storage id
ret = hg_proc_hg_const_string_t(proc, &tmp->s_name);
if(ret != HG_SUCCESS) {
break;
}
// 3. the server-assigned id
ret = hg_proc_hg_int64_t(proc, &tmp->s_id);
if(ret != HG_SUCCESS) {
break;
}
// 4. the appropriate storage context
ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_pfs_ctx);
break;
case HG_DECODE:
// find out the length of the adm_pfs_storage object
ret = hg_proc_hg_size_t(proc, &storage_length);
if(ret != HG_SUCCESS) {
break;
}
if(!storage_length) {
*storage = NULL;
break;
}
// if the received adm_pfs_storage object was not NULL, read each of
// its fields from the mercury buffer
tmp = (adm_pfs_storage*) calloc(1, sizeof(adm_pfs_storage));
// 1. the storage type
ret = hg_proc_uint32_t(proc, &tmp->s_type);
if(ret != HG_SUCCESS) {
break;
}
// 2. the storage id
ret = hg_proc_hg_const_string_t(proc, &tmp->s_name);
if(ret != HG_SUCCESS) {
break;
}
// 3. the server-assigned id
ret = hg_proc_hg_int64_t(proc, &tmp->s_id);
if(ret != HG_SUCCESS) {
break;
}
// 4. the appropriate storage context
ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_pfs_ctx);
if(ret != HG_SUCCESS) {
break;
}
// return the newly-created ctx
*storage = tmp;
break;
case HG_FREE:
tmp = *storage;
free(tmp);
break;
}
return ret;
}
hg_return_t hg_return_t
hg_proc_ADM_pfs_context_t(hg_proc_t proc, void* data) { hg_proc_ADM_pfs_context_t(hg_proc_t proc, void* data) {
......
...@@ -174,6 +174,7 @@ typedef struct adm_pfs_context { ...@@ -174,6 +174,7 @@ typedef struct adm_pfs_context {
const char* c_mount; const char* c_mount;
} adm_pfs_context; } adm_pfs_context;
// clang-format off // clang-format off
MERCURY_GEN_STRUCT_PROC( MERCURY_GEN_STRUCT_PROC(
adm_pfs_context, // NOLINT adm_pfs_context, // NOLINT
...@@ -181,21 +182,29 @@ MERCURY_GEN_STRUCT_PROC( ...@@ -181,21 +182,29 @@ MERCURY_GEN_STRUCT_PROC(
); );
// clang-format on // clang-format on
extern hg_return_t (*hg_proc_ADM_storage_type_t)(hg_proc_t, void*); extern hg_return_t (*hg_proc_ADM_adhoc_storage_type_t)(hg_proc_t, void*);
typedef struct adm_storage { typedef struct adm_adhoc_storage {
const char* s_name; const char* s_name;
ADM_storage_type_t s_type; ADM_adhoc_storage_type_t s_type;
uint64_t s_id; uint64_t s_id;
union { ADM_adhoc_context_t s_adhoc_ctx;
ADM_adhoc_context_t s_adhoc_ctx; } adm_adhoc_storage;
ADM_pfs_context_t s_pfs_ctx;
};
} adm_storage;
hg_return_t hg_return_t
hg_proc_ADM_storage_t(hg_proc_t proc, void* data); hg_proc_ADM_adhoc_storage_t(hg_proc_t proc, void* data);
extern hg_return_t (*hg_proc_ADM_pfs_storage_type_t)(hg_proc_t, void*);
typedef struct adm_pfs_storage {
const char* s_name;
ADM_pfs_storage_type_t s_type;
uint64_t s_id;
ADM_pfs_context_t s_pfs_ctx;
} adm_pfs_storage;
hg_return_t
hg_proc_ADM_pfs_storage_t(hg_proc_t proc, void* data);
struct adm_node_list { struct adm_node_list {
/** An array of nodes */ /** An array of nodes */
...@@ -255,15 +264,15 @@ typedef struct adm_job_requirements { ...@@ -255,15 +264,15 @@ typedef struct adm_job_requirements {
/** An array of output datasets */ /** An array of output datasets */
ADM_dataset_list_t r_outputs; ADM_dataset_list_t r_outputs;
/** An optional definition for a specific storage instance */ /** An optional definition for a specific storage instance */
ADM_storage_t r_storage; ADM_adhoc_storage_t r_adhoc_storage;
} adm_job_requirements; } adm_job_requirements;
// clang-format off // clang-format off
MERCURY_GEN_STRUCT_PROC( MERCURY_GEN_STRUCT_PROC(
adm_job_requirements, // NOLINT adm_job_requirements, // NOLINT
((ADM_dataset_list_t) (r_inputs)) ((ADM_dataset_list_t) (r_inputs))
((ADM_dataset_list_t) (r_outputs)) ((ADM_dataset_list_t) (r_outputs))
((ADM_storage_t) (r_storage)) ((ADM_adhoc_storage_t) (r_adhoc_storage))
); );
// clang-format on // clang-format on
...@@ -333,9 +342,9 @@ MERCURY_GEN_PROC( ...@@ -333,9 +342,9 @@ MERCURY_GEN_PROC(
/// ADM_register_adhoc_storage /// ADM_register_adhoc_storage
MERCURY_GEN_PROC( MERCURY_GEN_PROC(
ADM_register_adhoc_storage_in_t, ADM_register_adhoc_storage_in_t,
((hg_const_string_t) (name)) ((hg_const_string_t) (name))
((ADM_storage_type_t) (type)) ((ADM_adhoc_storage_type_t) (type))
((ADM_adhoc_context_t) (ctx)) ((ADM_adhoc_context_t) (ctx))
); );
MERCURY_GEN_PROC( MERCURY_GEN_PROC(
...@@ -371,9 +380,12 @@ MERCURY_GEN_PROC( ...@@ -371,9 +380,12 @@ MERCURY_GEN_PROC(
); );
/// ADM_deploy_adhoc_storage /// ADM_deploy_adhoc_storage
MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((int32_t) (reqs))) MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((hg_uint64_t) (id)))
MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, ((int32_t) (ret))) MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t,
((hg_uint64_t) (op_id))
((hg_int32_t) (retval))
);
/// ADM_register_pfs_storage /// ADM_register_pfs_storage
MERCURY_GEN_PROC(ADM_register_pfs_storage_in_t, ((int32_t) (reqs))) MERCURY_GEN_PROC(ADM_register_pfs_storage_in_t, ((int32_t) (reqs)))
......
...@@ -283,35 +283,20 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { ...@@ -283,35 +283,20 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) {
} }
} }
ADM_return_t void
deploy_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage) { deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) {
(void) srv; const auto ec = detail::deploy_adhoc_storage(srv, adhoc_storage);
(void) adhoc_storage;
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("ADM_deploy_adhoc_storage(...)");
ADM_deploy_adhoc_storage_in_t in{};
ADM_deploy_adhoc_storage_out_t out;
const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out);
if(out.ret < 0) { if(!ec) {
LOGGER_ERROR("ADM_deploy_adhoc_storage() = {}", out.ret); throw std::runtime_error(fmt::format(
return static_cast<ADM_return_t>(out.ret); "ADM_deploy_adhoc_storage() error: {}", ec.message()));
} }
LOGGER_INFO("ADM_deploy_adhoc_storage() = {}", ADM_SUCCESS);
return ADM_SUCCESS;
} }
ADM_return_t ADM_return_t
register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, register_pfs_storage(const server& srv, ADM_pfs_context_t ctx,
ADM_storage_t* pfs_storage) { ADM_pfs_storage_t* pfs_storage) {
(void) srv; (void) srv;
(void) ctx; (void) ctx;
(void) pfs_storage; (void) pfs_storage;
...@@ -338,7 +323,7 @@ register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, ...@@ -338,7 +323,7 @@ register_pfs_storage(const server& srv, ADM_pfs_context_t ctx,
ADM_return_t ADM_return_t
update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, update_pfs_storage(const server& srv, ADM_pfs_context_t ctx,
ADM_storage_t pfs_storage) { ADM_pfs_storage_t pfs_storage) {
(void) srv; (void) srv;
(void) ctx; (void) ctx;
(void) pfs_storage; (void) pfs_storage;
...@@ -364,7 +349,7 @@ update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, ...@@ -364,7 +349,7 @@ update_pfs_storage(const server& srv, ADM_pfs_context_t ctx,
} }
ADM_return_t ADM_return_t
remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage) { remove_pfs_storage(const server& srv, ADM_pfs_storage_t pfs_storage) {
(void) srv; (void) srv;
(void) pfs_storage; (void) pfs_storage;
...@@ -437,7 +422,7 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ...@@ -437,7 +422,7 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,
} }
ADM_return_t ADM_return_t
set_io_resources(const server& srv, ADM_job_t job, ADM_storage_t tier, set_io_resources(const server& srv, ADM_job_t job, ADM_adhoc_storage_t tier,
ADM_adhoc_resources_t resources) { ADM_adhoc_resources_t resources) {
(void) srv; (void) srv;
(void) job; (void) job;
......
...@@ -113,8 +113,9 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job); ...@@ -113,8 +113,9 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job);
*/ */
ADM_return_t ADM_return_t
ADM_register_adhoc_storage(ADM_server_t server, const char* name, ADM_register_adhoc_storage(ADM_server_t server, const char* name,
ADM_storage_type_t type, ADM_adhoc_context_t ctx, ADM_adhoc_storage_type_t type,
ADM_storage_t* adhoc_storage); ADM_adhoc_context_t ctx,
ADM_adhoc_storage_t* adhoc_storage);
/** /**
* Update an already-registered adhoc storage system. * Update an already-registered adhoc storage system.
...@@ -127,7 +128,7 @@ ADM_register_adhoc_storage(ADM_server_t server, const char* name, ...@@ -127,7 +128,7 @@ ADM_register_adhoc_storage(ADM_server_t server, const char* name,
* successfully. * successfully.
*/ */
ADM_return_t ADM_return_t
ADM_update_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage, ADM_update_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage,
ADM_adhoc_context_t ctx); ADM_adhoc_context_t ctx);
/** /**
...@@ -141,7 +142,8 @@ ADM_update_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage, ...@@ -141,7 +142,8 @@ ADM_update_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage,
* successfully. * successfully.
*/ */
ADM_return_t ADM_return_t
ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage); ADM_remove_adhoc_storage(ADM_server_t server,
ADM_adhoc_storage_t adhoc_storage);
/** /**
* Initiate the deployment of an adhoc storage system instance. * Initiate the deployment of an adhoc storage system instance.
...@@ -153,7 +155,8 @@ ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage); ...@@ -153,7 +155,8 @@ ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage);
* @return Returns ADM_SUCCESS if the remote procedure has completed * @return Returns ADM_SUCCESS if the remote procedure has completed
*/ */
ADM_return_t ADM_return_t
ADM_deploy_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage); ADM_deploy_adhoc_storage(ADM_server_t server,
ADM_adhoc_storage_t adhoc_storage);
/** /**
* Register a PFS storage tier. * Register a PFS storage tier.
...@@ -167,7 +170,7 @@ ADM_deploy_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage); ...@@ -167,7 +170,7 @@ ADM_deploy_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage);
*/ */
ADM_return_t ADM_return_t
ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
ADM_storage_t* pfs_storage); ADM_pfs_storage_t* pfs_storage);
/** /**
* Update an already-registered PFS storage tier. * Update an already-registered PFS storage tier.
...@@ -181,7 +184,7 @@ ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ...@@ -181,7 +184,7 @@ ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
*/ */
ADM_return_t ADM_return_t
ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
ADM_storage_t adhoc_storage); ADM_pfs_storage_t adhoc_storage);
/** /**
* Remove an already-registered PFS storage tier. * Remove an already-registered PFS storage tier.
...@@ -194,7 +197,7 @@ ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ...@@ -194,7 +197,7 @@ ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
* successfully. * successfully.
*/ */
ADM_return_t ADM_return_t
ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage); ADM_remove_pfs_storage(ADM_server_t server, ADM_pfs_storage_t adhoc_storage);
/** /**
* Transfers the dataset identified by the source_name to the storage tier * Transfers the dataset identified by the source_name to the storage tier
...@@ -258,8 +261,8 @@ ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ...@@ -258,8 +261,8 @@ ADM_set_dataset_information(ADM_server_t server, ADM_job_t job,
* successfully. * successfully.
*/ */
ADM_return_t ADM_return_t
ADM_set_io_resources(ADM_server_t server, ADM_job_t job, ADM_storage_t tier, ADM_set_io_resources(ADM_server_t server, ADM_job_t job,
ADM_adhoc_resources_t resources); ADM_adhoc_storage_t tier, ADM_adhoc_resources_t resources);
/** /**
......
...@@ -69,19 +69,19 @@ update_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage, ...@@ -69,19 +69,19 @@ update_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage,
void void
remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage);
ADM_return_t void
deploy_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage); deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage);
ADM_return_t ADM_return_t
register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, register_pfs_storage(const server& srv, ADM_pfs_context_t ctx,
ADM_storage_t* pfs_storage); ADM_pfs_storage_t* pfs_storage);
ADM_return_t ADM_return_t
update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, update_pfs_storage(const server& srv, ADM_pfs_context_t ctx,
ADM_storage_t pfs_storage); ADM_pfs_storage_t pfs_storage);
ADM_return_t ADM_return_t
remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage); remove_pfs_storage(const server& srv, ADM_pfs_storage_t pfs_storage);
admire::transfer admire::transfer
transfer_datasets(const server& srv, const job& job, transfer_datasets(const server& srv, const job& job,
...@@ -95,7 +95,7 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ...@@ -95,7 +95,7 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,
ADM_dataset_info_t info); ADM_dataset_info_t info);
ADM_return_t ADM_return_t
set_io_resources(const server& srv, ADM_job_t job, ADM_storage_t tier, set_io_resources(const server& srv, ADM_job_t job, ADM_adhoc_storage_t tier,
ADM_adhoc_resources_t resources); ADM_adhoc_resources_t resources);
ADM_return_t ADM_return_t
......
...@@ -82,8 +82,9 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) { ...@@ -82,8 +82,9 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) {
ADM_return_t ADM_return_t
ADM_register_adhoc_storage(ADM_server_t server, const char* name, ADM_register_adhoc_storage(ADM_server_t server, const char* name,
ADM_storage_type_t type, ADM_adhoc_context_t ctx, ADM_adhoc_storage_type_t type,
ADM_storage_t* adhoc_storage) { ADM_adhoc_context_t ctx,
ADM_adhoc_storage_t* adhoc_storage) {
const admire::server srv{server}; const admire::server srv{server};
...@@ -101,7 +102,7 @@ ADM_register_adhoc_storage(ADM_server_t server, const char* name, ...@@ -101,7 +102,7 @@ ADM_register_adhoc_storage(ADM_server_t server, const char* name,
} }
ADM_return_t ADM_return_t
ADM_update_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage, ADM_update_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage,
ADM_adhoc_context_t ctx) { ADM_adhoc_context_t ctx) {
const admire::server srv{server}; const admire::server srv{server};
...@@ -112,7 +113,8 @@ ADM_update_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage, ...@@ -112,7 +113,8 @@ ADM_update_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage,
} }
ADM_return_t ADM_return_t
ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage) { ADM_remove_adhoc_storage(ADM_server_t server,
ADM_adhoc_storage_t adhoc_storage) {
const admire::server srv{server}; const admire::server srv{server};
...@@ -121,16 +123,18 @@ ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage) { ...@@ -121,16 +123,18 @@ ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage) {
} }
ADM_return_t ADM_return_t
ADM_deploy_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage) { ADM_deploy_adhoc_storage(ADM_server_t server,
ADM_adhoc_storage_t adhoc_storage) {
const admire::server srv{server}; const admire::server srv{server};
return admire::deploy_adhoc_storage(srv, adhoc_storage); return admire::detail::deploy_adhoc_storage(
srv, admire::adhoc_storage{adhoc_storage});
} }
ADM_return_t ADM_return_t
ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
ADM_storage_t* pfs_storage) { ADM_pfs_storage_t* pfs_storage) {
const admire::server srv{server}; const admire::server srv{server};
...@@ -139,7 +143,7 @@ ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ...@@ -139,7 +143,7 @@ ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
ADM_return_t ADM_return_t
ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
ADM_storage_t pfs_storage) { ADM_pfs_storage_t pfs_storage) {
const admire::server srv{server}; const admire::server srv{server};
...@@ -147,7 +151,7 @@ ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, ...@@ -147,7 +151,7 @@ ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx,
} }
ADM_return_t ADM_return_t
ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t pfs_storage) { ADM_remove_pfs_storage(ADM_server_t server, ADM_pfs_storage_t pfs_storage) {
const admire::server srv{server}; const admire::server srv{server};
...@@ -188,7 +192,8 @@ ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ...@@ -188,7 +192,8 @@ ADM_set_dataset_information(ADM_server_t server, ADM_job_t job,
} }
ADM_return_t ADM_return_t
ADM_set_io_resources(ADM_server_t server, ADM_job_t job, ADM_storage_t tier, ADM_set_io_resources(ADM_server_t server, ADM_job_t job,
ADM_adhoc_storage_t tier,
ADM_adhoc_resources_t resources) { ADM_adhoc_resources_t resources) {
const admire::server srv{server}; const admire::server srv{server};
......
...@@ -330,7 +330,7 @@ register_adhoc_storage(const server& srv, const std::string& name, ...@@ -330,7 +330,7 @@ register_adhoc_storage(const server& srv, const std::string& name,
std::quoted(rpc_client.self_address()), name, type, ctx); std::quoted(rpc_client.self_address()), name, type, ctx);
const auto rpc_name = name.c_str(); const auto rpc_name = name.c_str();
const auto rpc_type = static_cast<ADM_storage_type_t>(type); const auto rpc_type = static_cast<ADM_adhoc_storage_type_t>(type);
const auto rpc_ctx = api::convert(ctx); const auto rpc_ctx = api::convert(ctx);
ADM_register_adhoc_storage_in_t in{rpc_name, rpc_type, rpc_ctx.get()}; ADM_register_adhoc_storage_in_t in{rpc_name, rpc_type, rpc_ctx.get()};
...@@ -357,6 +357,41 @@ register_adhoc_storage(const server& srv, const std::string& name, ...@@ -357,6 +357,41 @@ register_adhoc_storage(const server& srv, const std::string& name,
return rpc_adhoc_storage; return rpc_adhoc_storage;
} }
admire::error_code
deploy_adhoc_storage(const server& srv,
const adhoc_storage& adhoc_storage) {
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{adhoc_storage: {}}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), adhoc_storage);
ADM_deploy_adhoc_storage_in_t in{adhoc_storage.id()};
ADM_deploy_adhoc_storage_out_t out;
const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out);
if(const auto rv = admire::error_code{out.retval}; !rv) {
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), rv, out.op_id);
return rv;
}
LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}}] [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
admire::error_code::success, out.op_id);
return admire::error_code::success;
}
tl::expected<transfer, error_code> tl::expected<transfer, error_code>
transfer_datasets(const server& srv, const job& job, transfer_datasets(const server& srv, const job& job,
const std::vector<dataset>& sources, const std::vector<dataset>& sources,
...@@ -417,11 +452,10 @@ update_adhoc_storage(const server& srv, ...@@ -417,11 +452,10 @@ update_adhoc_storage(const server& srv,
const auto rpc_id = ::api::remote_procedure::new_id(); const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address()); auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("rpc id: name: {} from: {} => " LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{id: {}, adhoc_ctx{}}}", "body: {{adhoc_storage_id: {}}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__), rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), adhoc_storage.id(), std::quoted(rpc_client.self_address()), adhoc_storage.id());
adhoc_storage_ctx);
const auto rpc_ctx = api::convert(adhoc_storage_ctx); const auto rpc_ctx = api::convert(adhoc_storage_ctx);
...@@ -433,15 +467,16 @@ update_adhoc_storage(const server& srv, ...@@ -433,15 +467,16 @@ update_adhoc_storage(const server& srv,
if(const auto rv = admire::error_code{out.retval}; !rv) { if(const auto rv = admire::error_code{out.retval}; !rv) {
LOGGER_ERROR("rpc id: {} name: {} from: {} <= " LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]", "body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__), rv, rpc_id, std::quoted("ADM_"s + __FUNCTION__),
out.op_id); std::quoted(rpc.origin()), rv, out.op_id);
return rv; return rv;
} }
LOGGER_INFO("rpc id: {} name: {} from: {} <= " LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]", "body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__), rpc_id, std::quoted("ADM_"s + __FUNCTION__),
admire::error_code::success, out.op_id); std::quoted(rpc.origin()), admire::error_code::success,
out.op_id);
return admire::error_code::success; return admire::error_code::success;
} }
......
...@@ -65,6 +65,10 @@ update_adhoc_storage(const server& srv, ...@@ -65,6 +65,10 @@ update_adhoc_storage(const server& srv,
admire::error_code admire::error_code
remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage);
admire::error_code
deploy_adhoc_storage(const server& srv,
const adhoc_storage& adhoc_storage);
} // namespace admire::detail } // namespace admire::detail
#endif // SCORD_ADMIRE_IMPL_HPP #endif // SCORD_ADMIRE_IMPL_HPP
...@@ -31,6 +31,10 @@ ...@@ -31,6 +31,10 @@
#include "job_manager.hpp" #include "job_manager.hpp"
#include "adhoc_storage_manager.hpp" #include "adhoc_storage_manager.hpp"
// Process running
#include <unistd.h>
#include <sys/wait.h>
struct remote_procedure { struct remote_procedure {
static std::uint64_t static std::uint64_t
new_id() { new_id() {
...@@ -439,11 +443,96 @@ ADM_deploy_adhoc_storage(hg_handle_t h) { ...@@ -439,11 +443,96 @@ ADM_deploy_adhoc_storage(hg_handle_t h) {
ret = margo_get_input(h, &in); ret = margo_get_input(h, &in);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
out.ret = -1;
LOGGER_INFO("ADM_deploy_adhoc_storage()"); const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{adhoc_id: {}}}",
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
in.id);
out.ret = 0; auto ec = admire::error_code::success;
auto& adhoc_manager = scord::adhoc_storage_manager::instance();
if(const auto am_result = adhoc_manager.find(in.id);
am_result.has_value()) {
const auto& storage_info = am_result.value();
const auto adhoc_storage = storage_info->adhoc_storage();
if(adhoc_storage.type() == admire::adhoc_storage::type::gekkofs) {
const auto adhoc_ctx = adhoc_storage.context();
/* Number of nodes */
const std::string nodes =
std::to_string(adhoc_ctx.resources().nodes().size());
/* Walltime */
const std::string walltime = std::to_string(adhoc_ctx.walltime());
/* Launch script */
switch(const auto pid = fork()) {
case 0: {
std::vector<const char*> args;
args.push_back("gkfs");
// args.push_back("-c");
// args.push_back("gkfs.conf");
args.push_back("-n");
args.push_back(nodes.c_str());
// args.push_back("-w");
// args.push_back(walltime.c_str());
args.push_back("--srun");
args.push_back("start");
args.push_back(NULL);
std::vector<const char*> env;
env.push_back(NULL);
execvpe("gkfs", const_cast<char* const*>(args.data()),
const_cast<char* const*>(env.data()));
LOGGER_INFO(
"ADM_deploy_adhoc_storage() script didn't execute");
exit(EXIT_FAILURE);
break;
}
case -1: {
ec = admire::error_code::other;
LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
rpc_id, std::quoted(__FUNCTION__),
std::quoted(get_address(h)), ec);
break;
}
default: {
int wstatus = 0;
pid_t retwait = waitpid(pid, &wstatus, 0);
if(retwait == -1) {
LOGGER_ERROR(
"rpc id: {} error_msg: \"Error waitpid code: {}\"",
rpc_id, retwait);
ec = admire::error_code::other;
} else {
if(WEXITSTATUS(wstatus) != 0) {
ec = admire::error_code::other;
} else {
ec = admire::error_code::success;
}
}
break;
}
}
}
} else {
ec = am_result.error();
LOGGER_ERROR("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
rpc_id, std::quoted(__FUNCTION__),
std::quoted(get_address(h)), ec);
}
out.retval = ec;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
ec);
ret = margo_respond(h, &out); ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
......