Commits on Source (13)
......@@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19)
project(
scord
VERSION 0.3.3
VERSION 0.3.4
LANGUAGES C CXX
)
......
......@@ -34,12 +34,14 @@ struct query_config {
std::string controller_address;
std::string stager_address;
std::uint32_t slurm_id{};
std::uint32_t job_id{};
std::uint32_t adhocid{};
std::string nodes;
std::string adhocfs;
std::string inputs;
std::string outputs;
std::string function;
std::uint32_t qos{};
};
......@@ -78,6 +80,7 @@ parse_command_line(int argc, char* argv[]) {
->option_text("CARGOADDRESS")
->required();
app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required();
app.add_option("--job_id", cfg.job_id, "Job ID (for subsequent ops)");
app.add_option("-n,--nodes", cfg.nodes, "Nodes");
......@@ -90,6 +93,8 @@ parse_command_line(int argc, char* argv[]) {
app.add_option("-o,--outputs", cfg.outputs, "Output dataset");
app.add_option("-q,--qos", cfg.qos, "QoS MB/s");
app.add_option("-f,--function", cfg.function,
"Function {create, stage-in, stage-out, destroy}")
->required();
......@@ -113,6 +118,21 @@ parse_address(const std::string& address) {
return std::make_pair(protocol, address);
}
auto
create_adhoc_type_from_name(const std::string& name) {
if(name == "gekkofs") {
return scord::adhoc_storage::type::gekkofs;
} else if(name == "hercules") {
return scord::adhoc_storage::type::hercules;
} else if(name == "expand") {
return scord::adhoc_storage::type::expand;
} else if(name == "dataclay") {
return scord::adhoc_storage::type::dataclay;
} else {
throw std::runtime_error(
fmt::format("Invalid adhoc fs type: {}", name));
}
}
int
main(int argc, char* argv[]) {
......@@ -141,24 +161,12 @@ main(int argc, char* argv[]) {
scord::job::resources job_resources(nodes);
// Step 1b : Define adhoc_storage
/* type needs to process the sctring in cfg.adhocfs */
auto type = scord::adhoc_storage::type::gekkofs;
if(cfg.adhocfs == "gekkofs") {
type = scord::adhoc_storage::type::gekkofs;
} else if(cfg.adhocfs == "hercules") {
type = scord::adhoc_storage::type::hercules;
} else if(cfg.adhocfs == "expand") {
type = scord::adhoc_storage::type::expand;
} else {
throw std::runtime_error(
fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs));
}
auto type = create_adhoc_type_from_name(cfg.adhocfs);
std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id);
scord::adhoc_storage::resources resources{nodes};
scord::adhoc_storage::ctx ctx{
cfg.controller_address,
cfg.stager_address,
......@@ -167,33 +175,11 @@ main(int argc, char* argv[]) {
100,
false};
scord::adhoc_storage adhoc_storage = register_adhoc_storage(
srv, adhoc_name, type, ctx, resources);
fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id());
auto path = deploy_adhoc_storage(srv, adhoc_storage);
// Step 1c : Define job_requirements
/*
job::requirements::requirements(
std::vector<scord::dataset_route> inputs,
std::vector<scord::dataset_route> outputs,
std::vector<scord::dataset_route> expected_outputs)
: m_inputs(std::move(inputs)), m_outputs(std::move(outputs)),
m_expected_outputs(std::move(expected_outputs)) {}
job::requirements::requirements(
std::vector<scord::dataset_route> inputs,
std::vector<scord::dataset_route> outputs,
std::vector<scord::dataset_route> expected_outputs,
scord::adhoc_storage adhoc_storage)
*/
fmt::print("{},{}\n", path, adhoc_storage.id());
/* Separate inputs into vector of inputs */
......@@ -226,20 +212,95 @@ main(int argc, char* argv[]) {
auto job = scord::register_job(srv, job_resources, job_requirements,
cfg.slurm_id);
return job.id();
}
// Now Tranfer Datasets
if(cfg.function == "stage-in") {
// As the job is registered, we can now transfer datasets. Get
// inputs from requirements (TODO)
// Step 2 : If function is stage-in, transfer datasets
// convert inputs to split inputs (src, dst)
std::vector<scord::dataset> inputs_src, inputs_dst;
for(auto& route : inputs) {
inputs_src.push_back(route.source());
inputs_dst.push_back(route.destination());
auto v_routes_in = split(cfg.inputs, ';');
for(auto& src_dst : v_routes_in) {
auto route = split(src_dst, ',');
inputs_src.push_back(scord::dataset{route[0]});
inputs_dst.push_back(scord::dataset{route[1]});
}
scord::job job(cfg.job_id, cfg.slurm_id);
std::vector<scord::qos::limit> v_qos;
if(cfg.qos) {
scord::qos::limit limit{scord::qos::subclass::bandwidth,
cfg.qos};
v_qos.push_back(limit);
}
scord::transfer_datasets(srv, job, inputs_src, inputs_dst,
std::vector<scord::qos::limit>{},
scord::transfer::mapping::n_to_n);
auto transfer = scord::transfer_datasets(
srv, job, inputs_src, inputs_dst, v_qos,
scord::transfer::mapping::n_to_n);
return transfer.id();
}
if(cfg.function == "wait") {
// Wait for transfer operation to finish
const scord::transfer transfer = scord::transfer{cfg.slurm_id};
scord::job job(cfg.job_id, cfg.slurm_id);
auto response = scord::query_transfer(srv, job, transfer);
while(response.status() != scord::transfer_state::type::finished) {
std::this_thread::sleep_for(1s);
response = scord::query_transfer(srv, job, transfer);
if(scord::transfer_state::type::failed == response.status()) {
fmt::print("Transfer failed\n");
return EXIT_FAILURE;
}
}
}
if(cfg.function == "stage-out") {
// Step 3 : If function is stage-out, transfer datasets
// convert inputs to split inputs (src, dst)
std::vector<scord::dataset> outputs_src, outputs_dst;
auto v_routes_out = split(cfg.outputs, ';');
for(auto& src_dst : v_routes_out) {
auto route = split(src_dst, ',');
outputs_src.push_back(scord::dataset{route[0]});
outputs_dst.push_back(scord::dataset{route[1]});
}
scord::job job(cfg.job_id, cfg.slurm_id);
std::vector<scord::qos::limit> v_qos;
if(cfg.qos) {
scord::qos::limit limit{scord::qos::subclass::bandwidth,
cfg.qos};
v_qos.push_back(limit);
}
auto transfer = scord::transfer_datasets(
srv, job, outputs_src, outputs_dst, v_qos,
scord::transfer::mapping::n_to_n);
return transfer.id();
}
if(cfg.function == "destroy") {
// Step 4 : If function is destroy, terminate adhoc fs server
// Build a scord::adhoc_storage object with the adhocid
auto type = create_adhoc_type_from_name(cfg.adhocfs);
scord::adhoc_storage::resources resources;
scord::adhoc_storage::ctx ctx;
scord::adhoc_storage adhoc_storage(type, "", cfg.adhocid, ctx,
resources);
terminate_adhoc_storage(srv, adhoc_storage);
}
} catch(const std::exception& ex) {
......
#!/usr/bin/env bash
echo "GEKKOFS Script Called"
#!/usr/bin/bash
echo "GEKKOFS Script Called" $HOSTNAME $SLURM_JOBID
if [ "$1" == "start" ]; then
echo "Starting GEKKOFS"
nodes=$3
num_nodes=$(echo $nodes | awk -F, '{print NF}')
# If num_nodes is greater than 40, we are on the testing environment
if [ $num_nodes -gt 40 ]; then
exit 0
fi
workdir=$5
datadir=$7
mountdir=$9
unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE
srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 --export=ALL bash -c "mkdir -p $mountdir; mkdir -p $datadir"
srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=4 --mem-per-cpu=1 --export=ALL bash -c "gkfs_daemon --rootdir $datadir --mountdir $mountdir" &
sleep 4
elif [ "$1" == "stop" ]; then
echo "Stopping GEKKOFS"
nodes=$3
num_nodes=$(echo $nodes | awk -F, '{print NF}')
# If num_nodes is greater than 40, we are on the testing environment
if [ $num_nodes -gt 40 ]; then
exit 0
fi
unset SLURM_CPU_BIND SLURM_CPU_BIND_LIST SLURM_CPU_BIND_TYPE SLURM_CPU_BIND_VERBOSE
srun -N $num_nodes -n $num_nodes --oversubscribe --cpus-per-task=1 --mem-per-cpu=1 pkill -9 gkfs_daemon
elif [ "$1" == "expand" ]; then
echo "Expand command"
elif [ "$1" == "shrink" ]; then
echo "shrink command"
fi
exit 0
......@@ -35,7 +35,7 @@ class Scord(CMakePackage):
homepage = "https://storage.bsc.es/gitlab/eu/admire/io-scheduler"
url = ("https://storage.bsc.es/gitlab/eu/admire/io-scheduler/-/archive/"
"v0.3.3/io-scheduler-v0.3.3.tar.gz")
"v0.3.4/io-scheduler-v0.3.4.tar.gz")
git = "https://storage.bsc.es/gitlab/eu/admire/io-scheduler.git"
maintainers("alberto-miranda")
......@@ -56,7 +56,7 @@ class Scord(CMakePackage):
sha256="74c51915315e01d8479701d340331641f42c5f5cfae0c08bdea6c2f0b01da665")
version("0.3.3",
sha256="a8b5a8d05858bee91b9675ca6c929f4c16b5b2562f4e6a8dba3ce0aacb721f48")
version("0.3.4", sha256="e5e6a46d174db266e1caa2689cd17d88a7dc0623429c5efba20a374383f54a12")
# build variants
variant('build_type',
default='Release',
......@@ -92,6 +92,7 @@ class Scord(CMakePackage):
depends_on("boost@1.71 +program_options", when='@0.2.0:')
depends_on("redis-plus-plus@1.3.3:", when='@0.2.0:')
depends_on("cargo@0.3.3:", when='@0.3.1:')
depends_on("cargo@0.3.4:", when='@0.3.4:')
depends_on("slurm", when='@0.3.1:')
......
......@@ -531,4 +531,41 @@ transfer_datasets(const server& srv, const job& job,
}
tl::expected<transfer_state, error_code>
query_transfer(const server& srv, const job& job, const transfer& transfer) {
using response_type = network::response_with_value<transfer_state>;
network::client rpc_client{srv.protocol()};
const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address());
if(const auto lookup_rv = rpc_client.lookup(srv.address());
lookup_rv.has_value()) {
const auto& endp = lookup_rv.value();
LOGGER_INFO("rpc {:<} body: {{job_id: {}, tx_id: {}}}", rpc, job.id(), transfer.id());
if(const auto call_rv = endp.call(rpc.name(), job.id(), transfer.id());
call_rv.has_value()) {
const response_type resp{call_rv.value()};
LOGGER_EVAL(
resp.error_code(), INFO, ERROR,
"rpc {:>} body: {{retval: {}, tx_state: {}}} [op_id: {}]",
rpc, resp.error_code(), resp.value(), resp.op_id());
if(!resp.error_code()) {
return tl::make_unexpected(resp.error_code());
}
return resp.value();
}
}
LOGGER_ERROR("rpc call failed");
return tl::make_unexpected(scord::error_code::other);
}
} // namespace scord::detail
......@@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job,
const std::vector<qos::limit>& limits,
transfer::mapping mapping);
tl::expected<transfer_state, error_code>
query_transfer(const server& srv, const job& job, const transfer& transfer);
} // namespace scord::detail
......
......@@ -377,6 +377,18 @@ transfer_datasets(const server& srv, const job& job,
return rv.value();
}
scord::transfer_state
query_transfer(const server& srv, const job& job, const transfer& transfer) {
const auto rv = detail::query_transfer(srv, job, transfer);
if(!rv) {
throw std::runtime_error(fmt::format(
"ADM_query_transfer() error: {}", ADM_strerror(rv.error())));
}
return rv.value();
}
ADM_return_t
set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,
......
......@@ -98,6 +98,9 @@ transfer_datasets(const server& srv, const job& job,
const std::vector<qos::limit>& limits,
transfer::mapping mapping);
scord::transfer_state
query_transfer(const server& srv, const job& job, const transfer& transfer);
void
transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw);
......
......@@ -561,6 +561,43 @@ private:
std::unique_ptr<impl> m_pimpl;
};
struct transfer_state {
enum class type : std::underlying_type<ADM_transfer_state_t>::type {
queued = ADM_TRANSFER_QUEUED,
running = ADM_TRANSFER_RUNNING,
finished = ADM_TRANSFER_FINISHED,
failed = ADM_TRANSFER_FAILED,
cancelled = ADM_TRANSFER_CANCELLED
};
transfer_state();
explicit transfer_state(transfer_state::type type);
explicit transfer_state(ADM_transfer_status_t adm_transfer_state);
explicit operator ADM_transfer_status_t() const;
transfer_state(const transfer_state&) noexcept;
transfer_state(transfer_state&&) noexcept;
transfer_state&
operator=(const transfer_state&) noexcept;
transfer_state&
operator=(transfer_state&&) noexcept;
~transfer_state();
transfer_state::type
status() const;
// The implementation for this must be deferred until
// after the declaration of the PIMPL class
template <class Archive>
void
serialize(Archive& ar);
private:
class impl;
std::unique_ptr<impl> m_pimpl;
};
namespace qos {
enum class subclass : std::underlying_type<ADM_qos_class_t>::type {
......@@ -1240,4 +1277,36 @@ struct fmt::formatter<std::vector<scord::qos::limit>>
}
};
template <>
struct fmt::formatter<scord::transfer_state> : fmt::formatter<std::string_view> {
// parse is inherited from formatter<string_view>.
template <typename FormatContext>
auto
format(const scord::transfer_state& t, FormatContext& ctx) const -> format_context::iterator {
using scord::node;
std::string_view name = "unknown";
switch(t.status()){
case scord::transfer_state::type::queued:
name = "queued";
break;
case scord::transfer_state::type::running:
name = "running";
break;
case scord::transfer_state::type::finished:
name = "finished";
break;
case scord::transfer_state::type::failed:
name = "failed";
break;
case scord::transfer_state::type::cancelled:
name = "cancelled";
break;
}
return formatter<std::string_view>::format(name, ctx);
}
};
#endif // SCORD_TYPES_HPP
......@@ -1144,6 +1144,54 @@ ADM_transfer_destroy(ADM_transfer_t tx) {
return ret;
}
/**
* Initialize a transfer state
*
* @remark This function is not actually part of the public API, but it is
* useful to have for internal purposes
*
* @param [in] type A Transfer status
* @return A valid TRANSFER STATUS or NULL in case of failure.
*/
ADM_transfer_status_t
ADM_transfer_status_create(ADM_transfer_state_t type) {
struct adm_transfer_status* adm_transfer_state =
(struct adm_transfer_status*) malloc(sizeof(struct adm_transfer_status));
if(!adm_transfer_state) {
LOGGER_ERROR("Could not allocate ADM_transfer_t");
return NULL;
}
adm_transfer_state->s_state = type;
return adm_transfer_state;
}
/**
* Destroy a ADM_transfer_status_t created by ADM_transfer_status_create().
*
* @remark This function is not actually part of the public API, but it is
* useful to have for internal purposes
*
* @param[in] tx The ADM_transfer_status_t to destroy.
* @return ADM_SUCCESS or corresponding error code.
*/
ADM_return_t
ADM_transfer_status_destroy(ADM_transfer_status_t tx) {
ADM_return_t ret = ADM_SUCCESS;
if(!tx) {
LOGGER_ERROR("Invalid ADM_transfer_status_t");
return ADM_EBADARGS;
}
free(tx);
return ret;
}
ADM_qos_limit_list_t
ADM_qos_limit_list_create(ADM_qos_limit_t limits[], size_t length) {
......
......@@ -462,6 +462,100 @@ template void
transfer::serialize<network::serialization::input_archive>(
network::serialization::input_archive&);
class transfer_state::impl {
public:
impl() = default;
explicit impl(transfer_state::type type) : m_type(type) {}
impl(const impl& rhs) = default;
impl(impl&& rhs) = default;
impl&
operator=(const impl& other) noexcept = default;
impl&
operator=(impl&&) noexcept = default;
transfer_state::type
status() const {
return m_type;
}
template <class Archive>
void
load(Archive& ar) {
ar(SCORD_SERIALIZATION_NVP(m_type));
}
template <class Archive>
void
save(Archive& ar) const {
ar(SCORD_SERIALIZATION_NVP(m_type));
}
private:
transfer_state::type m_type;
};
transfer_state::transfer_state() = default;
transfer_state::transfer_state(transfer_state::type type)
: m_pimpl(std::make_unique<transfer_state::impl>(type)) {}
transfer_state::transfer_state(ADM_transfer_status_t adm_transfer_state)
: transfer_state::transfer_state(static_cast<transfer_state::type>(adm_transfer_state->s_state)) {}
transfer_state::operator ADM_transfer_status_t() const {
return ADM_transfer_status_create((ADM_transfer_state_t)m_pimpl->status());
}
transfer_state::transfer_state(transfer_state&&) noexcept = default;
transfer_state&
transfer_state::operator=(transfer_state&&) noexcept = default;
transfer_state::transfer_state(const transfer_state& other) noexcept
: m_pimpl(std::make_unique<impl>(*other.m_pimpl)) {}
transfer_state&
transfer_state::operator=(const transfer_state& other) noexcept {
this->m_pimpl = std::make_unique<impl>(*other.m_pimpl);
return *this;
}
transfer_state::~transfer_state() = default;
transfer_state::type
transfer_state::status() const {
return m_pimpl->status();
}
// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
transfer_state::serialize(Archive& ar) {
ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}
// we must also explicitly instantiate our template functions for
// serialization in the desired archives
template void
transfer_state::impl::save<network::serialization::output_archive>(
network::serialization::output_archive&) const;
template void
transfer_state::impl::load<network::serialization::input_archive>(
network::serialization::input_archive&);
template void
transfer_state::serialize<network::serialization::output_archive>(
network::serialization::output_archive&);
template void
transfer_state::serialize<network::serialization::input_archive>(
network::serialization::input_archive&);
class dataset::impl {
public:
impl() = default;
......@@ -1214,6 +1308,7 @@ entity::data<scord::transfer>() const {
return m_pimpl->data<scord::transfer>();
}
// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
......
......@@ -183,6 +183,9 @@ ADM_job_create(uint64_t id, uint64_t slurm_id);
ADM_transfer_t
ADM_transfer_create(uint64_t id);
ADM_transfer_status_t
ADM_transfer_status_create(ADM_transfer_state_t type);
#ifdef __cplusplus
} // extern "C"
#endif
......
......@@ -195,6 +195,38 @@ command::as_vector() const {
return tmp;
}
// Function to join two sets of environment variables
char**
joinEnvironments(char** env1, const char** env2) {
// Count the number of variables in each environment
int count1 = 0;
while(env1[count1] != nullptr) {
++count1;
}
int count2 = 0;
while(env2[count2] != nullptr) {
++count2;
}
// Allocate memory for the combined environment
char** combinedEnv = new char*[count1 + count2 + 1];
// Copy the variables from the first environment
for(int i = 0; i < count1; ++i) {
combinedEnv[i] = strdup(env1[i]);
}
// Copy the variables from the second environment
for(int i = 0; i < count2; ++i) {
combinedEnv[count1 + i] = strdup(env2[i]);
}
// Null-terminate the combined environment
combinedEnv[count1 + count2] = nullptr;
return combinedEnv;
}
void
command::exec() const {
......@@ -207,8 +239,11 @@ command::exec() const {
switch(const auto pid = ::fork()) {
case 0: {
// Join the environments
char** combinedEnv = joinEnvironments(environ, envp.get());
::execvpe(argv[0], const_cast<char* const*>(argv.get()),
const_cast<char* const*>(envp.get()));
const_cast<char* const*>(combinedEnv));
// We cannot use the default logger in the child process because it
// is not fork-safe, and even though we received a copy of the
// global logger, it is not valid because the child process does
......
......@@ -28,6 +28,7 @@
#include <net/utilities.hpp>
#include "rpc_server.hpp"
extern char** environ;
using namespace std::literals;
......@@ -142,7 +143,7 @@ rpc_server::deploy_adhoc_storage(
const auto& adhoc_cfg = it->second;
LOGGER_DEBUG("deploy \"{:e}\" (ID: {})", adhoc_type, adhoc_uuid);
// 1. Create a working directory for the adhoc storage instance
adhoc_dir = adhoc_cfg.working_directory() / adhoc_uuid;
......@@ -172,6 +173,8 @@ rpc_server::deploy_adhoc_storage(
const auto cmd = adhoc_cfg.startup_command().eval(
adhoc_uuid, *adhoc_dir, hostnames);
// Fill environment
// 4. Execute the startup command
try {
......
......@@ -97,6 +97,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize,
provider::define(EXPAND(update_pfs_storage));
provider::define(EXPAND(remove_pfs_storage));
provider::define(EXPAND(transfer_datasets));
provider::define(EXPAND(query_transfer));
#undef EXPAND
m_network_engine.push_prefinalize_callback([this]() {
......@@ -875,6 +876,75 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id,
}
void
rpc_server::query_transfer(const network::request& req, scord::job_id job_id, scord::transfer_id tx_id) {
using network::get_address;
using network::response_with_value;
using network::rpc_info;
using response_with_status = response_with_value<scord::transfer_state>;
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
LOGGER_INFO("rpc {:>} body: {{job_id: {}, tx_id{}}}",
rpc, job_id, tx_id);
const auto jm_result = m_job_manager.find(job_id);
if(!jm_result) {
LOGGER_ERROR("rpc id: {} error_msg: \"Error finding job: {}\"",
rpc.id(), job_id);
const auto resp = response_with_status{rpc.id(), jm_result.error()};
LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());
req.respond(resp);
return;
}
const auto& job_metadata_ptr = jm_result.value();
if(!job_metadata_ptr->adhoc_storage_metadata()) {
LOGGER_ERROR("rpc id: {} error_msg: \"Job has no adhoc storage\"",
rpc.id(), job_id);
const auto resp = response_with_status{rpc.id(), error_code::no_resources};
LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());
req.respond(resp);
return;
}
const auto data_stager_address =
job_metadata_ptr->adhoc_storage_metadata()->data_stager_address();
// Transform the `scord::dataset`s into `cargo::dataset`s and contact the
// Cargo service associated with the job's adhoc storage instance to
// execute the transfers.
cargo::server srv{data_stager_address};
// Register the transfer into the `tranfer_manager`.
// We embed the generated `cargo::transfer` object into
// scord's `transfer_metadata` so that we can later query the Cargo
// service for the transfer's status.
const auto rv =
m_transfer_manager.find(tx_id)
.or_else([&](auto&& ec) {
LOGGER_ERROR("rpc id: {} error_msg: \"Error finding "
"transfer: {}\"",
rpc.id(), ec);
})
.and_then([&](auto&& transfer_metadata_ptr)
-> tl::expected<scord::transfer_state, error_code> {
return scord::transfer_state(static_cast<scord::transfer_state::type>(transfer_metadata_ptr->transfer().status().state()));
});
const auto resp =
rv ? response_with_status{rpc.id(), error_code::success, rv.value()}
: response_with_status{rpc.id(), rv.error()};
LOGGER_EVAL(resp.error_code(), INFO, ERROR,
"rpc {:<} body: {{retval: {}, status: {}}}", rpc,
resp.error_code(), resp.value_or_none());
req.respond(resp);
}
/* Scheduling is done each 0.5 s*/
void
rpc_server::scheduler_update() {
......
......@@ -109,6 +109,10 @@ private:
const std::vector<scord::dataset>& targets,
const std::vector<scord::qos::limit>& limits,
enum scord::transfer::mapping mapping);
void
query_transfer(const network::request& req, scord::job_id job_id,
scord::transfer_id transfer_id);
job_manager m_job_manager;
adhoc_storage_manager m_adhoc_manager;
......