Loading include/norns.h +17 −11 Original line number Diff line number Diff line Loading @@ -26,17 +26,23 @@ #include <features.h> #include <sys/types.h> #include <stdint.h> __BEGIN_DECLS /* I/O task descriptor */ struct norns_iotd { int ni_tid; /* task identifier */ int ni_ibid; /* source backend identifier */ const char* ni_ipath; /* path to data source */ int ni_obid; /* destination backend identifier */ const char* ni_opath; /* path to data destination */ int ni_type; /* operation to be performed */ uint32_t ni_tid; /* task identifier */ uint32_t ni_sbid; /* source backend identifier */ const char* ni_spath; /* path to data source */ uint32_t ni_dbid; /* destination backend identifier */ const char* ni_dpath; /* path to data destination */ uint32_t ni_type; /* operation to be performed */ /* Internal members. */ pid_t __pid; /* pid of the process that made the request */ uint32_t __jobid; /* job id of the process that made the request (XXX Slurm dependent)*/ }; /* Task types */ Loading Loading @@ -66,18 +72,18 @@ int norns_getconf() __THROW; /* Enqueue an asynchronous I/O task */ int norns_transfer(struct norns_iotd* iotdp) __THROW; /* wait for the completion of the task associated to iotdp */ int norns_wait(struct norns_iotd* iotdp) __THROW; /* Try to cancel an asynchronous I/O task associated with iotdp */ ssize_t norns_cancel(struct norns_iotd* iotdp) __THROW; /* Retrieve return status associated with iotdp */ ssize_t norns_return(struct norns_iotd* iotdp) __THROW; ssize_t norns_return(struct norns_iotd* iotdp, struct norns_iotst* statp) __THROW; /* Retrieve current status associated with iotdp */ /* Retrieve current status associated with iotdp (if iotdp is NULL, retrieve status for all running tasks) */ ssize_t norns_progress(struct norns_iotd* iotdp, struct norns_iotst* statp) __THROW; /* Retrieve status associated with all current tasks */ ssize_t norns_progress_all(struct norns_iotst** statp) __THROW; /* Retrieve error status associated with iotdp */ int norns_error(struct norns_iotd* iotdp) __THROW; Loading lib/norns.c +4 −25 Original line number Diff line number Diff line Loading @@ -47,29 +47,6 @@ __attribute__((destructor)) static void __norns_finit(void); void __norns_init(){ #if 0 struct sockaddr_un server; sock = socket(AF_UNIX, SOCK_STREAM, 0); if (sock < 0) { perror("opening stream socket"); exit(EXIT_FAILURE); } server.sun_family = AF_UNIX; strncpy(server.sun_path, SOCKET_FILE, sizeof(server.sun_path)); server.sun_path[sizeof(server.sun_path)-1] = '\0'; if (connect(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un)) < 0) { if (close(sock) < 0){ exit(EXIT_FAILURE); } perror("connecting stream socket"); exit(EXIT_FAILURE); } #endif } static int connect_to_daemon(void){ Loading Loading @@ -97,8 +74,6 @@ void __norns_finit(void){ /* * */ printf("Executing this when the library is unloaded\n"); close(sock); } int norns_transfer(struct norns_iotd* iotdp) { Loading @@ -112,6 +87,10 @@ int norns_transfer(struct norns_iotd* iotdp) { memcpy(iotd_copy, iotdp, sizeof(*iotd_copy)); // capture important process information iotd_copy->__pid = getpid(); iotd_copy->__jobid = 0; int sfd = connect_to_daemon(); if(sfd == -1){ Loading src/io-task.hpp 0 → 100644 +51 −0 Original line number Diff line number Diff line // // Copyright (C) 2017 Barcelona Supercomputing Center // Centro Nacional de Supercomputacion // // This file is part of the Data Scheduler, a daemon for tracking and managing // requests for asynchronous data transfer in a hierarchical storage environment. // // See AUTHORS file in the top level directory for information // regarding developers and contributors. // // The Data Scheduler is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Data Scheduler is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with Data Scheduler. If not, see <http://www.gnu.org/licenses/>. // #ifndef __IO_TASK_HPP__ #define __IO_TASK_HPP__ namespace io { struct task { using backend_ptr = std::shared_ptr<storage::backend>; task(struct norns_iotd* /*iotdp*/) { } void operator()(int /* thread_id */) const { std::cout << "Hello from a task!\n"; } uint64_t m_id; pid_t m_pid; uint32_t m_jobid; backend_ptr m_source; backend_ptr m_destination; }; } // namespace io #endif // __IO_TASK_HPP__ src/urd.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -51,6 +51,7 @@ #include "backends.hpp" #include "ctpl.h" #include "logger.hpp" #include "io-task.hpp" #include "urd.hpp" #if 0 Loading Loading @@ -230,7 +231,7 @@ void urd::new_request_handler(struct norns_iotd* iotdp){ //std::unique_ptr<urd::task> task(new urd::task(iotdp)); //iotdp->ni_tid = task->m_task_id; m_workers->push(urd::task(iotdp)); m_workers->push(std::move(io::task(iotdp))); /* the ipc_listener will automatically reply to the client when we exit the handler */ } Loading src/urd.hpp +0 −15 Original line number Diff line number Diff line Loading @@ -38,21 +38,6 @@ class urd { public: /* sample task (to be removed) */ struct task { task(struct norns_iotd* /*iotdp*/){ } void operator()(int /* id */) const { std::cout << "Hello from a task!\n"; } pid_t m_pid; uint64_t m_task_id; const char* m_path; }; public: void set_configuration(const config_settings& settings); void run(); Loading Loading
include/norns.h +17 −11 Original line number Diff line number Diff line Loading @@ -26,17 +26,23 @@ #include <features.h> #include <sys/types.h> #include <stdint.h> __BEGIN_DECLS /* I/O task descriptor */ struct norns_iotd { int ni_tid; /* task identifier */ int ni_ibid; /* source backend identifier */ const char* ni_ipath; /* path to data source */ int ni_obid; /* destination backend identifier */ const char* ni_opath; /* path to data destination */ int ni_type; /* operation to be performed */ uint32_t ni_tid; /* task identifier */ uint32_t ni_sbid; /* source backend identifier */ const char* ni_spath; /* path to data source */ uint32_t ni_dbid; /* destination backend identifier */ const char* ni_dpath; /* path to data destination */ uint32_t ni_type; /* operation to be performed */ /* Internal members. */ pid_t __pid; /* pid of the process that made the request */ uint32_t __jobid; /* job id of the process that made the request (XXX Slurm dependent)*/ }; /* Task types */ Loading Loading @@ -66,18 +72,18 @@ int norns_getconf() __THROW; /* Enqueue an asynchronous I/O task */ int norns_transfer(struct norns_iotd* iotdp) __THROW; /* wait for the completion of the task associated to iotdp */ int norns_wait(struct norns_iotd* iotdp) __THROW; /* Try to cancel an asynchronous I/O task associated with iotdp */ ssize_t norns_cancel(struct norns_iotd* iotdp) __THROW; /* Retrieve return status associated with iotdp */ ssize_t norns_return(struct norns_iotd* iotdp) __THROW; ssize_t norns_return(struct norns_iotd* iotdp, struct norns_iotst* statp) __THROW; /* Retrieve current status associated with iotdp */ /* Retrieve current status associated with iotdp (if iotdp is NULL, retrieve status for all running tasks) */ ssize_t norns_progress(struct norns_iotd* iotdp, struct norns_iotst* statp) __THROW; /* Retrieve status associated with all current tasks */ ssize_t norns_progress_all(struct norns_iotst** statp) __THROW; /* Retrieve error status associated with iotdp */ int norns_error(struct norns_iotd* iotdp) __THROW; Loading
lib/norns.c +4 −25 Original line number Diff line number Diff line Loading @@ -47,29 +47,6 @@ __attribute__((destructor)) static void __norns_finit(void); void __norns_init(){ #if 0 struct sockaddr_un server; sock = socket(AF_UNIX, SOCK_STREAM, 0); if (sock < 0) { perror("opening stream socket"); exit(EXIT_FAILURE); } server.sun_family = AF_UNIX; strncpy(server.sun_path, SOCKET_FILE, sizeof(server.sun_path)); server.sun_path[sizeof(server.sun_path)-1] = '\0'; if (connect(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un)) < 0) { if (close(sock) < 0){ exit(EXIT_FAILURE); } perror("connecting stream socket"); exit(EXIT_FAILURE); } #endif } static int connect_to_daemon(void){ Loading Loading @@ -97,8 +74,6 @@ void __norns_finit(void){ /* * */ printf("Executing this when the library is unloaded\n"); close(sock); } int norns_transfer(struct norns_iotd* iotdp) { Loading @@ -112,6 +87,10 @@ int norns_transfer(struct norns_iotd* iotdp) { memcpy(iotd_copy, iotdp, sizeof(*iotd_copy)); // capture important process information iotd_copy->__pid = getpid(); iotd_copy->__jobid = 0; int sfd = connect_to_daemon(); if(sfd == -1){ Loading
src/io-task.hpp 0 → 100644 +51 −0 Original line number Diff line number Diff line // // Copyright (C) 2017 Barcelona Supercomputing Center // Centro Nacional de Supercomputacion // // This file is part of the Data Scheduler, a daemon for tracking and managing // requests for asynchronous data transfer in a hierarchical storage environment. // // See AUTHORS file in the top level directory for information // regarding developers and contributors. // // The Data Scheduler is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Data Scheduler is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with Data Scheduler. If not, see <http://www.gnu.org/licenses/>. // #ifndef __IO_TASK_HPP__ #define __IO_TASK_HPP__ namespace io { struct task { using backend_ptr = std::shared_ptr<storage::backend>; task(struct norns_iotd* /*iotdp*/) { } void operator()(int /* thread_id */) const { std::cout << "Hello from a task!\n"; } uint64_t m_id; pid_t m_pid; uint32_t m_jobid; backend_ptr m_source; backend_ptr m_destination; }; } // namespace io #endif // __IO_TASK_HPP__
src/urd.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -51,6 +51,7 @@ #include "backends.hpp" #include "ctpl.h" #include "logger.hpp" #include "io-task.hpp" #include "urd.hpp" #if 0 Loading Loading @@ -230,7 +231,7 @@ void urd::new_request_handler(struct norns_iotd* iotdp){ //std::unique_ptr<urd::task> task(new urd::task(iotdp)); //iotdp->ni_tid = task->m_task_id; m_workers->push(urd::task(iotdp)); m_workers->push(std::move(io::task(iotdp))); /* the ipc_listener will automatically reply to the client when we exit the handler */ } Loading
src/urd.hpp +0 −15 Original line number Diff line number Diff line Loading @@ -38,21 +38,6 @@ class urd { public: /* sample task (to be removed) */ struct task { task(struct norns_iotd* /*iotdp*/){ } void operator()(int /* id */) const { std::cout << "Hello from a task!\n"; } pid_t m_pid; uint64_t m_task_id; const char* m_path; }; public: void set_configuration(const config_settings& settings); void run(); Loading