Loading examples/ADM_cancel_transfer.cpp +15 −23 Original line number Diff line number Diff line #include <fmt/format.h> #include <engine.hpp> #include <admire.hpp> int Loading @@ -12,35 +12,27 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } scord::network::rpc_client rpc_client{"tcp"}; rpc_client.register_rpcs(); admire::server server{"tcp", argv[1]}; auto endp = rpc_client.lookup(argv[1]); ADM_job_handle_t job{}; ADM_transfer_handle_t tx_handle{}; ADM_return_t ret = ADM_SUCCESS; fmt::print( stdout, "Calling ADM_cancel_transfer remote procedure on {} with transfer id {} ...\n", argv[1], argv[2]); ADM_cancel_transfer_in_t in; try { in.transfer_id = std::stoi(argv[2]); ret = admire::cancel_transfer(server, job, tx_handle); } catch(const std::exception& e) { fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", e.what()); exit(EXIT_FAILURE); } ADM_cancel_transfer_out_t out; endp.call("ADM_cancel_transfer", &in, &out); if(out.ret < 0) { fmt::print( stdout, "ADM_cancel_transfer remote procedure not completed successfully\n"); if(ret != ADM_SUCCESS) { fmt::print(stdout, "ADM_cancel_transfer() remote procedure not completed " "successfully\n"); exit(EXIT_FAILURE); } else { fmt::print( stdout, "ADM_cancel_transfer remote procedure completed successfully\n"); } fmt::print(stdout, "ADM_cancel_transfer() remote procedure completed " "successfully\n"); } src/api/admire.cpp +171 −25 Original line number Diff line number Diff line Loading @@ -24,142 +24,288 @@ #include <admire.hpp> #include <engine.hpp> #include <logger.hpp> namespace { void init_library() __attribute__((constructor)); void init_logger(); void init_library() { init_logger(); } /** Logging for the library */ void init_logger() { // for now, just create a simple console logger scord::logger::create_global_logger("libadm_iosched", "console color"); } } // namespace namespace admire { ADM_return_t register_job(ADM_server_t server, ADM_job_requirements_t reqs, register_job(const server& srv, ADM_job_requirements_t reqs, ADM_job_handle_t* job) { (void) srv; (void) reqs; (void) job; return ADM_OTHER_ERROR; } ADM_return_t update_job(ADM_server_t server, ADM_job_handle_t job, update_job(const server& srv, ADM_job_handle_t job, ADM_job_requirements_t reqs) { (void) srv; (void) job; (void) reqs; return ADM_OTHER_ERROR; } ADM_return_t remove_job(ADM_server_t server, ADM_job_handle_t job) { remove_job(const server& srv, ADM_job_handle_t job) { (void) srv; (void) job; return ADM_OTHER_ERROR; } ADM_return_t register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, register_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t* adhoc_handle) { (void) srv; (void) job; (void) ctx; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, update_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t adhoc_handle) { (void) srv; (void) job; (void) ctx; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, remove_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { (void) srv; (void) job; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, deploy_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { (void) srv; (void) job; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t transfer_dataset(ADM_server_t server, ADM_job_handle_t job, transfer_dataset(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t** sources, ADM_dataset_handle_t** targets, ADM_limit_t** limits, ADM_tx_mapping_t mapping, ADM_transfer_handle_t* tx) { (void) srv; (void) job; (void) sources; (void) targets; (void) limits; (void) mapping; (void) tx; return ADM_OTHER_ERROR; } ADM_return_t set_dataset_information(ADM_server_t server, ADM_job_handle_t job, set_dataset_information(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t target, ADM_dataset_info_t info) { (void) srv; (void) job; (void) target; (void) info; return ADM_OTHER_ERROR; } ADM_return_t set_io_resources(ADM_server_t server, ADM_job_handle_t job, set_io_resources(const server& srv, ADM_job_handle_t job, ADM_storage_handle_t tier, ADM_storage_resources_t resources) { (void) srv; (void) job; (void) tier; (void) resources; return ADM_OTHER_ERROR; } ADM_return_t get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, get_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, ADM_transfer_priority_t* priority) { (void) srv; (void) job; (void) tx_handle; (void) priority; return ADM_OTHER_ERROR; } ADM_return_t set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, set_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, int incr) { (void) srv; (void) job; (void) tx_handle; (void) incr; return ADM_OTHER_ERROR; } ADM_return_t cancel_transfer(ADM_server_t server, ADM_job_handle_t job, cancel_transfer(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle) { return ADM_OTHER_ERROR; (void) job; (void) tx_handle; scord::network::rpc_client rpc_client{srv.m_protocol}; rpc_client.register_rpcs(); auto endp = rpc_client.lookup(srv.m_address); LOGGER_INFO("ADM_cancel_transfer(...)"); // FIXME: change RPC fields to ADM_transfer_handle_t ADM_cancel_transfer_in_t in{42}; ADM_cancel_transfer_out_t out; endp.call("ADM_cancel_transfer", &in, &out); if(out.ret < 0) { LOGGER_ERROR("ADM_cancel_transfer() = {}", out.ret); return static_cast<ADM_return_t>(out.ret); } LOGGER_INFO("ADM_cancel_transfer() = {}", ADM_SUCCESS); return ADM_SUCCESS; } ADM_return_t get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, get_pending_transfers(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t** pending_transfers) { (void) srv; (void) job; (void) pending_transfers; return ADM_OTHER_ERROR; } ADM_return_t set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, set_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_limit_t limit) { (void) srv; (void) job; (void) limit; return ADM_OTHER_ERROR; } ADM_return_t get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, get_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_qos_scope_t scope, ADM_qos_entity_t entity, ADM_limit_t** limits) { (void) srv; (void) job; (void) scope; (void) entity; (void) limits; return ADM_OTHER_ERROR; } ADM_return_t define_data_operation(ADM_server_t server, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args) { define_data_operation(const server& srv, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args) { (void) srv; (void) job; (void) path; (void) op; (void) args; return ADM_OTHER_ERROR; } ADM_return_t connect_data_operation(ADM_server_t server, ADM_job_handle_t job, connect_data_operation(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t input, ADM_dataset_handle_t output, bool should_stream, va_list args) { (void) srv; (void) job; (void) input; (void) output; (void) should_stream; (void) args; return ADM_OTHER_ERROR; } ADM_return_t finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, finalize_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, ADM_data_operation_status_t* status) { (void) srv; (void) job; (void) op; (void) status; return ADM_OTHER_ERROR; } ADM_return_t link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, link_transfer_to_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, bool should_stream, ...) { bool should_stream, va_list args) { (void) srv; (void) job; (void) op; (void) should_stream; (void) args; return ADM_OTHER_ERROR; } ADM_return_t get_statistics(ADM_server_t server, ADM_job_handle_t job, get_statistics(const server& srv, ADM_job_handle_t job, ADM_job_stats_t** stats) { (void) srv; (void) job; (void) stats; return ADM_OTHER_ERROR; } Loading src/api/admire.h +1 −1 Original line number Diff line number Diff line Loading @@ -48,7 +48,7 @@ enum ADM_return_t { }; /* A server */ typedef const char* ADM_server_t; typedef struct adm_server ADM_server_t; /* A node */ typedef const char* ADM_node_t; Loading src/api/admire.hpp +29 −25 Original line number Diff line number Diff line Loading @@ -24,103 +24,107 @@ #include <admire.h> #include <cstdarg> #include <string> #ifndef SCORD_ADMIRE_HPP #define SCORD_ADMIRE_HPP namespace admire { struct server { std::string m_protocol; std::string m_address; }; ADM_return_t register_job(ADM_server_t server, ADM_job_requirements_t reqs, register_job(const server& srv, ADM_job_requirements_t reqs, ADM_job_handle_t* job); ADM_return_t update_job(ADM_server_t server, ADM_job_handle_t job, update_job(const server& srv, ADM_job_handle_t job, ADM_job_requirements_t reqs); ADM_return_t remove_job(ADM_server_t server, ADM_job_handle_t job); remove_job(const server& srv, ADM_job_handle_t job); ADM_return_t register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, register_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t* adhoc_handle); ADM_return_t update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, update_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t adhoc_handle); ADM_return_t remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, remove_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle); ADM_return_t deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, deploy_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle); ADM_return_t transfer_dataset(ADM_server_t server, ADM_job_handle_t job, transfer_dataset(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t** sources, ADM_dataset_handle_t** targets, ADM_limit_t** limits, ADM_tx_mapping_t mapping, ADM_transfer_handle_t* tx_handle); ADM_return_t set_dataset_information(ADM_server_t server, ADM_job_handle_t job, set_dataset_information(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t target, ADM_dataset_info_t info); ADM_return_t set_io_resources(ADM_server_t server, ADM_job_handle_t job, set_io_resources(const server& srv, ADM_job_handle_t job, ADM_storage_handle_t tier, ADM_storage_resources_t resources); ADM_return_t get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, get_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, ADM_transfer_priority_t* priority); ADM_return_t set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, set_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, int incr); ADM_return_t cancel_transfer(ADM_server_t server, ADM_job_handle_t job, cancel_transfer(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle); ADM_return_t get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, get_pending_transfers(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t** pending_transfers); ADM_return_t set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, ADM_limit_t limit); set_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_limit_t limit); ADM_return_t get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, get_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_qos_scope_t scope, ADM_qos_entity_t entity, ADM_limit_t** limits); ADM_return_t define_data_operation(ADM_server_t server, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args); define_data_operation(const server& srv, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args); ADM_return_t connect_data_operation(ADM_server_t server, ADM_job_handle_t job, connect_data_operation(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t input, ADM_dataset_handle_t output, bool should_stream, va_list args); ADM_return_t finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, finalize_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, ADM_data_operation_status_t* status); ADM_return_t link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, link_transfer_to_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, bool should_stream, ...); bool should_stream, va_list args); ADM_return_t get_statistics(ADM_server_t server, ADM_job_handle_t job, get_statistics(const server& srv, ADM_job_handle_t job, ADM_job_stats_t** stats); } // namespace admire Loading src/api/c_wrapper.cpp +86 −22 Original line number Diff line number Diff line Loading @@ -25,47 +25,73 @@ #include <admire.h> #include <admire.hpp> struct adm_server { const char* s_protocol; const char* s_address; }; ADM_return_t ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, ADM_job_handle_t* job) { return admire::register_job(server, reqs, job); const admire::server srv{server.s_protocol, server.s_address}; return admire::register_job(srv, reqs, job); } ADM_return_t ADM_update_job(ADM_server_t server, ADM_job_handle_t job, ADM_job_requirements_t reqs) { return admire::update_job(server, job, reqs); const admire::server srv{server.s_protocol, server.s_address}; return admire::update_job(srv, job, reqs); } ADM_return_t ADM_remove_job(ADM_server_t server, ADM_job_handle_t job) { return admire::remove_job(server, job); const admire::server srv{server.s_protocol, server.s_address}; return admire::remove_job(srv, job); } ADM_return_t ADM_register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t* adhoc_handle) { return admire::register_adhoc_storage(server, job, ctx, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::register_adhoc_storage(srv, job, ctx, adhoc_handle); } ADM_return_t ADM_update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t adhoc_handle) { return admire::update_adhoc_storage(server, job, ctx, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::update_adhoc_storage(srv, job, ctx, adhoc_handle); } ADM_return_t ADM_remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { return admire::remove_adhoc_storage(server, job, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::remove_adhoc_storage(srv, job, adhoc_handle); } ADM_return_t ADM_deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { return admire::deploy_adhoc_storage(server, job, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::deploy_adhoc_storage(srv, job, adhoc_handle); } ADM_return_t Loading @@ -74,69 +100,99 @@ ADM_transfer_dataset(ADM_server_t server, ADM_job_handle_t job, ADM_dataset_handle_t** targets, ADM_limit_t** limits, ADM_tx_mapping_t mapping, ADM_transfer_handle_t* tx_handle) { return admire::transfer_dataset(server, job, sources, targets, limits, mapping, tx_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::transfer_dataset(srv, job, sources, targets, limits, mapping, tx_handle); } ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_handle_t job, ADM_dataset_handle_t target, ADM_dataset_info_t info) { return admire::set_dataset_information(server, job, target, info); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_dataset_information(srv, job, target, info); } ADM_return_t ADM_set_io_resources(ADM_server_t server, ADM_job_handle_t job, ADM_storage_handle_t tier, ADM_storage_resources_t resources) { return admire::set_io_resources(server, job, tier, resources); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_io_resources(srv, job, tier, resources); } ADM_return_t ADM_get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, ADM_transfer_priority_t* priority) { return admire::get_transfer_priority(server, job, tx_handle, priority); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_transfer_priority(srv, job, tx_handle, priority); } ADM_return_t ADM_set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, int incr) { return admire::set_transfer_priority(server, job, tx_handle, incr); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_transfer_priority(srv, job, tx_handle, incr); } ADM_return_t ADM_cancel_transfer(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle) { return ADM_OTHER_ERROR; const admire::server srv{server.s_protocol, server.s_address}; return admire::cancel_transfer(srv, job, tx_handle); } ADM_return_t ADM_get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t** pending_transfers) { return admire::get_pending_transfers(server, job, pending_transfers); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_pending_transfers(srv, job, pending_transfers); } ADM_return_t ADM_set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, ADM_limit_t limit) { return admire::set_qos_constraints(server, job, limit); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_qos_constraints(srv, job, limit); } ADM_return_t ADM_get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, ADM_qos_scope_t scope, ADM_qos_entity_t entity, ADM_limit_t** limits) { return admire::get_qos_constraints(server, job, scope, entity, limits); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_qos_constraints(srv, job, scope, entity, limits); } ADM_return_t ADM_define_data_operation(ADM_server_t server, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, ...) { const admire::server srv{server.s_protocol, server.s_address}; va_list args; va_start(args, op); auto ret = admire::define_data_operation(server, job, path, op, args); auto ret = admire::define_data_operation(srv, job, path, op, args); va_end(args); return ret; Loading @@ -148,9 +204,11 @@ ADM_connect_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_dataset_handle_t output, bool should_stream, ...) { const admire::server srv{server.s_protocol, server.s_address}; va_list args; va_start(args, should_stream); auto ret = admire::connect_data_operation(server, job, input, output, auto ret = admire::connect_data_operation(srv, job, input, output, should_stream, args); va_end(args); Loading @@ -161,7 +219,10 @@ ADM_return_t ADM_finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_data_operation_handle_t op, ADM_data_operation_status_t* status) { return admire::finalize_data_operation(server, job, op, status); const admire::server srv{server.s_protocol, server.s_address}; return admire::finalize_data_operation(srv, job, op, status); } ADM_return_t Loading @@ -169,9 +230,11 @@ ADM_link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_data_operation_handle_t op, bool should_stream, ...) { const admire::server srv{server.s_protocol, server.s_address}; va_list args; va_start(args, should_stream); auto ret = admire::link_transfer_to_data_operation(server, job, op, auto ret = admire::link_transfer_to_data_operation(srv, job, op, should_stream, args); va_end(args); Loading @@ -181,5 +244,6 @@ ADM_link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_return_t ADM_get_statistics(ADM_server_t server, ADM_job_handle_t job, ADM_job_stats_t** stats) { return admire::get_statistics(server, job, stats); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_statistics(srv, job, stats); } Loading
examples/ADM_cancel_transfer.cpp +15 −23 Original line number Diff line number Diff line #include <fmt/format.h> #include <engine.hpp> #include <admire.hpp> int Loading @@ -12,35 +12,27 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } scord::network::rpc_client rpc_client{"tcp"}; rpc_client.register_rpcs(); admire::server server{"tcp", argv[1]}; auto endp = rpc_client.lookup(argv[1]); ADM_job_handle_t job{}; ADM_transfer_handle_t tx_handle{}; ADM_return_t ret = ADM_SUCCESS; fmt::print( stdout, "Calling ADM_cancel_transfer remote procedure on {} with transfer id {} ...\n", argv[1], argv[2]); ADM_cancel_transfer_in_t in; try { in.transfer_id = std::stoi(argv[2]); ret = admire::cancel_transfer(server, job, tx_handle); } catch(const std::exception& e) { fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", e.what()); exit(EXIT_FAILURE); } ADM_cancel_transfer_out_t out; endp.call("ADM_cancel_transfer", &in, &out); if(out.ret < 0) { fmt::print( stdout, "ADM_cancel_transfer remote procedure not completed successfully\n"); if(ret != ADM_SUCCESS) { fmt::print(stdout, "ADM_cancel_transfer() remote procedure not completed " "successfully\n"); exit(EXIT_FAILURE); } else { fmt::print( stdout, "ADM_cancel_transfer remote procedure completed successfully\n"); } fmt::print(stdout, "ADM_cancel_transfer() remote procedure completed " "successfully\n"); }
src/api/admire.cpp +171 −25 Original line number Diff line number Diff line Loading @@ -24,142 +24,288 @@ #include <admire.hpp> #include <engine.hpp> #include <logger.hpp> namespace { void init_library() __attribute__((constructor)); void init_logger(); void init_library() { init_logger(); } /** Logging for the library */ void init_logger() { // for now, just create a simple console logger scord::logger::create_global_logger("libadm_iosched", "console color"); } } // namespace namespace admire { ADM_return_t register_job(ADM_server_t server, ADM_job_requirements_t reqs, register_job(const server& srv, ADM_job_requirements_t reqs, ADM_job_handle_t* job) { (void) srv; (void) reqs; (void) job; return ADM_OTHER_ERROR; } ADM_return_t update_job(ADM_server_t server, ADM_job_handle_t job, update_job(const server& srv, ADM_job_handle_t job, ADM_job_requirements_t reqs) { (void) srv; (void) job; (void) reqs; return ADM_OTHER_ERROR; } ADM_return_t remove_job(ADM_server_t server, ADM_job_handle_t job) { remove_job(const server& srv, ADM_job_handle_t job) { (void) srv; (void) job; return ADM_OTHER_ERROR; } ADM_return_t register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, register_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t* adhoc_handle) { (void) srv; (void) job; (void) ctx; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, update_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t adhoc_handle) { (void) srv; (void) job; (void) ctx; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, remove_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { (void) srv; (void) job; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, deploy_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { (void) srv; (void) job; (void) adhoc_handle; return ADM_OTHER_ERROR; } ADM_return_t transfer_dataset(ADM_server_t server, ADM_job_handle_t job, transfer_dataset(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t** sources, ADM_dataset_handle_t** targets, ADM_limit_t** limits, ADM_tx_mapping_t mapping, ADM_transfer_handle_t* tx) { (void) srv; (void) job; (void) sources; (void) targets; (void) limits; (void) mapping; (void) tx; return ADM_OTHER_ERROR; } ADM_return_t set_dataset_information(ADM_server_t server, ADM_job_handle_t job, set_dataset_information(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t target, ADM_dataset_info_t info) { (void) srv; (void) job; (void) target; (void) info; return ADM_OTHER_ERROR; } ADM_return_t set_io_resources(ADM_server_t server, ADM_job_handle_t job, set_io_resources(const server& srv, ADM_job_handle_t job, ADM_storage_handle_t tier, ADM_storage_resources_t resources) { (void) srv; (void) job; (void) tier; (void) resources; return ADM_OTHER_ERROR; } ADM_return_t get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, get_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, ADM_transfer_priority_t* priority) { (void) srv; (void) job; (void) tx_handle; (void) priority; return ADM_OTHER_ERROR; } ADM_return_t set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, set_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, int incr) { (void) srv; (void) job; (void) tx_handle; (void) incr; return ADM_OTHER_ERROR; } ADM_return_t cancel_transfer(ADM_server_t server, ADM_job_handle_t job, cancel_transfer(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle) { return ADM_OTHER_ERROR; (void) job; (void) tx_handle; scord::network::rpc_client rpc_client{srv.m_protocol}; rpc_client.register_rpcs(); auto endp = rpc_client.lookup(srv.m_address); LOGGER_INFO("ADM_cancel_transfer(...)"); // FIXME: change RPC fields to ADM_transfer_handle_t ADM_cancel_transfer_in_t in{42}; ADM_cancel_transfer_out_t out; endp.call("ADM_cancel_transfer", &in, &out); if(out.ret < 0) { LOGGER_ERROR("ADM_cancel_transfer() = {}", out.ret); return static_cast<ADM_return_t>(out.ret); } LOGGER_INFO("ADM_cancel_transfer() = {}", ADM_SUCCESS); return ADM_SUCCESS; } ADM_return_t get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, get_pending_transfers(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t** pending_transfers) { (void) srv; (void) job; (void) pending_transfers; return ADM_OTHER_ERROR; } ADM_return_t set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, set_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_limit_t limit) { (void) srv; (void) job; (void) limit; return ADM_OTHER_ERROR; } ADM_return_t get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, get_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_qos_scope_t scope, ADM_qos_entity_t entity, ADM_limit_t** limits) { (void) srv; (void) job; (void) scope; (void) entity; (void) limits; return ADM_OTHER_ERROR; } ADM_return_t define_data_operation(ADM_server_t server, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args) { define_data_operation(const server& srv, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args) { (void) srv; (void) job; (void) path; (void) op; (void) args; return ADM_OTHER_ERROR; } ADM_return_t connect_data_operation(ADM_server_t server, ADM_job_handle_t job, connect_data_operation(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t input, ADM_dataset_handle_t output, bool should_stream, va_list args) { (void) srv; (void) job; (void) input; (void) output; (void) should_stream; (void) args; return ADM_OTHER_ERROR; } ADM_return_t finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, finalize_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, ADM_data_operation_status_t* status) { (void) srv; (void) job; (void) op; (void) status; return ADM_OTHER_ERROR; } ADM_return_t link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, link_transfer_to_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, bool should_stream, ...) { bool should_stream, va_list args) { (void) srv; (void) job; (void) op; (void) should_stream; (void) args; return ADM_OTHER_ERROR; } ADM_return_t get_statistics(ADM_server_t server, ADM_job_handle_t job, get_statistics(const server& srv, ADM_job_handle_t job, ADM_job_stats_t** stats) { (void) srv; (void) job; (void) stats; return ADM_OTHER_ERROR; } Loading
src/api/admire.h +1 −1 Original line number Diff line number Diff line Loading @@ -48,7 +48,7 @@ enum ADM_return_t { }; /* A server */ typedef const char* ADM_server_t; typedef struct adm_server ADM_server_t; /* A node */ typedef const char* ADM_node_t; Loading
src/api/admire.hpp +29 −25 Original line number Diff line number Diff line Loading @@ -24,103 +24,107 @@ #include <admire.h> #include <cstdarg> #include <string> #ifndef SCORD_ADMIRE_HPP #define SCORD_ADMIRE_HPP namespace admire { struct server { std::string m_protocol; std::string m_address; }; ADM_return_t register_job(ADM_server_t server, ADM_job_requirements_t reqs, register_job(const server& srv, ADM_job_requirements_t reqs, ADM_job_handle_t* job); ADM_return_t update_job(ADM_server_t server, ADM_job_handle_t job, update_job(const server& srv, ADM_job_handle_t job, ADM_job_requirements_t reqs); ADM_return_t remove_job(ADM_server_t server, ADM_job_handle_t job); remove_job(const server& srv, ADM_job_handle_t job); ADM_return_t register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, register_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t* adhoc_handle); ADM_return_t update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, update_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t adhoc_handle); ADM_return_t remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, remove_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle); ADM_return_t deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, deploy_adhoc_storage(const server& srv, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle); ADM_return_t transfer_dataset(ADM_server_t server, ADM_job_handle_t job, transfer_dataset(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t** sources, ADM_dataset_handle_t** targets, ADM_limit_t** limits, ADM_tx_mapping_t mapping, ADM_transfer_handle_t* tx_handle); ADM_return_t set_dataset_information(ADM_server_t server, ADM_job_handle_t job, set_dataset_information(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t target, ADM_dataset_info_t info); ADM_return_t set_io_resources(ADM_server_t server, ADM_job_handle_t job, set_io_resources(const server& srv, ADM_job_handle_t job, ADM_storage_handle_t tier, ADM_storage_resources_t resources); ADM_return_t get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, get_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, ADM_transfer_priority_t* priority); ADM_return_t set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, set_transfer_priority(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, int incr); ADM_return_t cancel_transfer(ADM_server_t server, ADM_job_handle_t job, cancel_transfer(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle); ADM_return_t get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, get_pending_transfers(const server& srv, ADM_job_handle_t job, ADM_transfer_handle_t** pending_transfers); ADM_return_t set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, ADM_limit_t limit); set_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_limit_t limit); ADM_return_t get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, get_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_qos_scope_t scope, ADM_qos_entity_t entity, ADM_limit_t** limits); ADM_return_t define_data_operation(ADM_server_t server, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args); define_data_operation(const server& srv, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, va_list args); ADM_return_t connect_data_operation(ADM_server_t server, ADM_job_handle_t job, connect_data_operation(const server& srv, ADM_job_handle_t job, ADM_dataset_handle_t input, ADM_dataset_handle_t output, bool should_stream, va_list args); ADM_return_t finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, finalize_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, ADM_data_operation_status_t* status); ADM_return_t link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, link_transfer_to_data_operation(const server& srv, ADM_job_handle_t job, ADM_data_operation_handle_t op, bool should_stream, ...); bool should_stream, va_list args); ADM_return_t get_statistics(ADM_server_t server, ADM_job_handle_t job, get_statistics(const server& srv, ADM_job_handle_t job, ADM_job_stats_t** stats); } // namespace admire Loading
src/api/c_wrapper.cpp +86 −22 Original line number Diff line number Diff line Loading @@ -25,47 +25,73 @@ #include <admire.h> #include <admire.hpp> struct adm_server { const char* s_protocol; const char* s_address; }; ADM_return_t ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, ADM_job_handle_t* job) { return admire::register_job(server, reqs, job); const admire::server srv{server.s_protocol, server.s_address}; return admire::register_job(srv, reqs, job); } ADM_return_t ADM_update_job(ADM_server_t server, ADM_job_handle_t job, ADM_job_requirements_t reqs) { return admire::update_job(server, job, reqs); const admire::server srv{server.s_protocol, server.s_address}; return admire::update_job(srv, job, reqs); } ADM_return_t ADM_remove_job(ADM_server_t server, ADM_job_handle_t job) { return admire::remove_job(server, job); const admire::server srv{server.s_protocol, server.s_address}; return admire::remove_job(srv, job); } ADM_return_t ADM_register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t* adhoc_handle) { return admire::register_adhoc_storage(server, job, ctx, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::register_adhoc_storage(srv, job, ctx, adhoc_handle); } ADM_return_t ADM_update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t adhoc_handle) { return admire::update_adhoc_storage(server, job, ctx, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::update_adhoc_storage(srv, job, ctx, adhoc_handle); } ADM_return_t ADM_remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { return admire::remove_adhoc_storage(server, job, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::remove_adhoc_storage(srv, job, adhoc_handle); } ADM_return_t ADM_deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, ADM_adhoc_storage_handle_t adhoc_handle) { return admire::deploy_adhoc_storage(server, job, adhoc_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::deploy_adhoc_storage(srv, job, adhoc_handle); } ADM_return_t Loading @@ -74,69 +100,99 @@ ADM_transfer_dataset(ADM_server_t server, ADM_job_handle_t job, ADM_dataset_handle_t** targets, ADM_limit_t** limits, ADM_tx_mapping_t mapping, ADM_transfer_handle_t* tx_handle) { return admire::transfer_dataset(server, job, sources, targets, limits, mapping, tx_handle); const admire::server srv{server.s_protocol, server.s_address}; return admire::transfer_dataset(srv, job, sources, targets, limits, mapping, tx_handle); } ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_handle_t job, ADM_dataset_handle_t target, ADM_dataset_info_t info) { return admire::set_dataset_information(server, job, target, info); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_dataset_information(srv, job, target, info); } ADM_return_t ADM_set_io_resources(ADM_server_t server, ADM_job_handle_t job, ADM_storage_handle_t tier, ADM_storage_resources_t resources) { return admire::set_io_resources(server, job, tier, resources); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_io_resources(srv, job, tier, resources); } ADM_return_t ADM_get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, ADM_transfer_priority_t* priority) { return admire::get_transfer_priority(server, job, tx_handle, priority); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_transfer_priority(srv, job, tx_handle, priority); } ADM_return_t ADM_set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle, int incr) { return admire::set_transfer_priority(server, job, tx_handle, incr); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_transfer_priority(srv, job, tx_handle, incr); } ADM_return_t ADM_cancel_transfer(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t tx_handle) { return ADM_OTHER_ERROR; const admire::server srv{server.s_protocol, server.s_address}; return admire::cancel_transfer(srv, job, tx_handle); } ADM_return_t ADM_get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, ADM_transfer_handle_t** pending_transfers) { return admire::get_pending_transfers(server, job, pending_transfers); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_pending_transfers(srv, job, pending_transfers); } ADM_return_t ADM_set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, ADM_limit_t limit) { return admire::set_qos_constraints(server, job, limit); const admire::server srv{server.s_protocol, server.s_address}; return admire::set_qos_constraints(srv, job, limit); } ADM_return_t ADM_get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, ADM_qos_scope_t scope, ADM_qos_entity_t entity, ADM_limit_t** limits) { return admire::get_qos_constraints(server, job, scope, entity, limits); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_qos_constraints(srv, job, scope, entity, limits); } ADM_return_t ADM_define_data_operation(ADM_server_t server, ADM_job_handle_t job, const char* path, ADM_data_operation_handle_t* op, ...) { const admire::server srv{server.s_protocol, server.s_address}; va_list args; va_start(args, op); auto ret = admire::define_data_operation(server, job, path, op, args); auto ret = admire::define_data_operation(srv, job, path, op, args); va_end(args); return ret; Loading @@ -148,9 +204,11 @@ ADM_connect_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_dataset_handle_t output, bool should_stream, ...) { const admire::server srv{server.s_protocol, server.s_address}; va_list args; va_start(args, should_stream); auto ret = admire::connect_data_operation(server, job, input, output, auto ret = admire::connect_data_operation(srv, job, input, output, should_stream, args); va_end(args); Loading @@ -161,7 +219,10 @@ ADM_return_t ADM_finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_data_operation_handle_t op, ADM_data_operation_status_t* status) { return admire::finalize_data_operation(server, job, op, status); const admire::server srv{server.s_protocol, server.s_address}; return admire::finalize_data_operation(srv, job, op, status); } ADM_return_t Loading @@ -169,9 +230,11 @@ ADM_link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_data_operation_handle_t op, bool should_stream, ...) { const admire::server srv{server.s_protocol, server.s_address}; va_list args; va_start(args, should_stream); auto ret = admire::link_transfer_to_data_operation(server, job, op, auto ret = admire::link_transfer_to_data_operation(srv, job, op, should_stream, args); va_end(args); Loading @@ -181,5 +244,6 @@ ADM_link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, ADM_return_t ADM_get_statistics(ADM_server_t server, ADM_job_handle_t job, ADM_job_stats_t** stats) { return admire::get_statistics(server, job, stats); const admire::server srv{server.s_protocol, server.s_address}; return admire::get_statistics(srv, job, stats); }