Loading ifs/include/adafs_ops/data.hpp +4 −1 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ int init_chunk_space(const std::string& path); int destroy_chunk_space(const std::string& path); int read_file(char* buf, size_t& read_size, const std::string& path, size_t size, off_t off); int read_file(const std::string& path, rpc_chnk_id_t chnk_id, size_t size, off_t off, char* buf, size_t& read_size); int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id, size_t size, off_t off, bool append, off_t updated_size, size_t& write_size); Loading @@ -19,4 +19,7 @@ int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id, int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes, size_t& write_size); int read_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes, size_t& read_size); #endif //IFS_DATA_HPP ifs/include/preload/preload_util.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -8,7 +8,7 @@ #include <extern/lrucache/LRUCache11.hpp> #include <string> // TODO singleton this stuff away struct FsConfig { // configurable metadata bool atime_state; Loading ifs/include/preload/rpc/ld_rpc_data.hpp +11 −0 Original line number Diff line number Diff line Loading @@ -25,9 +25,20 @@ struct write_args { std::vector<unsigned long>& chnk_ids; ABT_eventual* eventual; }; struct read_args { std::string& path; size_t in_size; off_t in_offset; void* buf; std::vector<unsigned long>& chnk_ids; ABT_eventual* eventual; }; void rpc_send_write_abt(void* _arg); void rpc_send_read_abt(void* _arg); template<typename T> int rpc_send_read(const std::string& path, const size_t in_size, const off_t in_offset, T* tar_buf, size_t& read_size) { hg_handle_t handle; Loading ifs/include/rpc/rpc_utils.hpp +38 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,44 @@ extern "C" { } #include <string> template<typename I, typename O> inline hg_return_t rpc_cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if (bulk_handle) { ret = margo_bulk_free(*bulk_handle); if (ret != HG_SUCCESS) return ret; } if (input && handle) { ret = margo_free_input(*handle, input); if (ret != HG_SUCCESS) return ret; } if (output && handle) { ret = margo_free_output(*handle, output); if (ret != HG_SUCCESS) return ret; } if (handle) { ret = margo_destroy(*handle); if (ret != HG_SUCCESS) return ret; } return ret; } template<typename I, typename O> inline hg_return_t rpc_cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if (output && handle) { ret = margo_respond(*handle, output); if (ret != HG_SUCCESS) return ret; } return rpc_cleanup(handle, input, static_cast<O*>(nullptr), bulk_handle); } hg_bool_t bool_to_merc_bool(bool state); bool is_handle_sm(margo_instance_id mid, const hg_addr_t& addr); Loading ifs/src/adafs_ops/data.cpp +56 −17 Original line number Diff line number Diff line Loading @@ -60,28 +60,17 @@ int destroy_chunk_space(const std::string& path) { } /** * pread wrapper * @param buf * @param read_size * * @param path * @param buf * @param chnk_id * @param size * @param off * @param append * @param updated_size * @param [out] write_size * @return */ int read_file(char* buf, size_t& read_size, const string& path, const size_t size, const off_t off) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; chnk_path /= "data"s; int fd = open(chnk_path.c_str(), R_OK); if (fd < 0) return EIO; read_size = static_cast<size_t>(pread(fd, buf, size, off)); close(fd); return 0; } int write_file(const string& path, const char* buf, const rpc_chnk_id_t chnk_id, const size_t size, const off_t off, const bool append, const off_t updated_size, size_t& write_size) { auto fs_path = path_to_fspath(path); Loading Loading @@ -117,6 +106,7 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector write_size = 0; // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks auto chnk_n = buf_sizes.size() / 2; // TODO this can be parallized for (size_t i = 0; i < chnk_n; i++) { auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i * 2])); auto chnk_ptr = static_cast<char*>(buf_ptrs[(i * 2) + 1]); Loading @@ -134,3 +124,52 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector } return 0; } /** * * @param path * @param chnk_id * @param size * @param off * @param [out] buf * @param [out] read_size * @return */ int read_file(const string& path, const rpc_chnk_id_t chnk_id, const size_t size, const off_t off, char* buf, size_t& read_size) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; chnk_path /= fmt::FormatInt(chnk_id).c_str();; int fd = open(chnk_path.c_str(), R_OK); if (fd < 0) return EIO; read_size = static_cast<size_t>(pread(fd, buf, size, off)); close(fd); return 0; } int read_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_size_t>& buf_sizes, size_t& read_size) { read_size = 0; // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks auto chnk_n = buf_sizes.size() / 2; // TODO this can be parallized for (size_t i = 0; i < chnk_n; i++) { auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i])); auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]); auto chnk_size = buf_sizes[i + chnk_n]; size_t read_chnk_size; // TODO append if (read_file(path, chnk_id, chnk_size, 0, chnk_ptr, read_chnk_size) != 0) { // TODO How do we handle errors? ADAFS_DATA->spdlogger()->error("{}() read chunk failed with path {} and id {}. Aborting ...", __func__, path, chnk_id); read_size = 0; return -1; } read_size += read_chnk_size; } return 0; } No newline at end of file Loading
ifs/include/adafs_ops/data.hpp +4 −1 Original line number Diff line number Diff line Loading @@ -11,7 +11,7 @@ int init_chunk_space(const std::string& path); int destroy_chunk_space(const std::string& path); int read_file(char* buf, size_t& read_size, const std::string& path, size_t size, off_t off); int read_file(const std::string& path, rpc_chnk_id_t chnk_id, size_t size, off_t off, char* buf, size_t& read_size); int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id, size_t size, off_t off, bool append, off_t updated_size, size_t& write_size); Loading @@ -19,4 +19,7 @@ int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id, int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes, size_t& write_size); int read_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes, size_t& read_size); #endif //IFS_DATA_HPP
ifs/include/preload/preload_util.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -8,7 +8,7 @@ #include <extern/lrucache/LRUCache11.hpp> #include <string> // TODO singleton this stuff away struct FsConfig { // configurable metadata bool atime_state; Loading
ifs/include/preload/rpc/ld_rpc_data.hpp +11 −0 Original line number Diff line number Diff line Loading @@ -25,9 +25,20 @@ struct write_args { std::vector<unsigned long>& chnk_ids; ABT_eventual* eventual; }; struct read_args { std::string& path; size_t in_size; off_t in_offset; void* buf; std::vector<unsigned long>& chnk_ids; ABT_eventual* eventual; }; void rpc_send_write_abt(void* _arg); void rpc_send_read_abt(void* _arg); template<typename T> int rpc_send_read(const std::string& path, const size_t in_size, const off_t in_offset, T* tar_buf, size_t& read_size) { hg_handle_t handle; Loading
ifs/include/rpc/rpc_utils.hpp +38 −0 Original line number Diff line number Diff line Loading @@ -9,6 +9,44 @@ extern "C" { } #include <string> template<typename I, typename O> inline hg_return_t rpc_cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if (bulk_handle) { ret = margo_bulk_free(*bulk_handle); if (ret != HG_SUCCESS) return ret; } if (input && handle) { ret = margo_free_input(*handle, input); if (ret != HG_SUCCESS) return ret; } if (output && handle) { ret = margo_free_output(*handle, output); if (ret != HG_SUCCESS) return ret; } if (handle) { ret = margo_destroy(*handle); if (ret != HG_SUCCESS) return ret; } return ret; } template<typename I, typename O> inline hg_return_t rpc_cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if (output && handle) { ret = margo_respond(*handle, output); if (ret != HG_SUCCESS) return ret; } return rpc_cleanup(handle, input, static_cast<O*>(nullptr), bulk_handle); } hg_bool_t bool_to_merc_bool(bool state); bool is_handle_sm(margo_instance_id mid, const hg_addr_t& addr); Loading
ifs/src/adafs_ops/data.cpp +56 −17 Original line number Diff line number Diff line Loading @@ -60,28 +60,17 @@ int destroy_chunk_space(const std::string& path) { } /** * pread wrapper * @param buf * @param read_size * * @param path * @param buf * @param chnk_id * @param size * @param off * @param append * @param updated_size * @param [out] write_size * @return */ int read_file(char* buf, size_t& read_size, const string& path, const size_t size, const off_t off) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; chnk_path /= "data"s; int fd = open(chnk_path.c_str(), R_OK); if (fd < 0) return EIO; read_size = static_cast<size_t>(pread(fd, buf, size, off)); close(fd); return 0; } int write_file(const string& path, const char* buf, const rpc_chnk_id_t chnk_id, const size_t size, const off_t off, const bool append, const off_t updated_size, size_t& write_size) { auto fs_path = path_to_fspath(path); Loading Loading @@ -117,6 +106,7 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector write_size = 0; // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks auto chnk_n = buf_sizes.size() / 2; // TODO this can be parallized for (size_t i = 0; i < chnk_n; i++) { auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i * 2])); auto chnk_ptr = static_cast<char*>(buf_ptrs[(i * 2) + 1]); Loading @@ -134,3 +124,52 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector } return 0; } /** * * @param path * @param chnk_id * @param size * @param off * @param [out] buf * @param [out] read_size * @return */ int read_file(const string& path, const rpc_chnk_id_t chnk_id, const size_t size, const off_t off, char* buf, size_t& read_size) { auto fs_path = path_to_fspath(path); auto chnk_path = bfs::path(ADAFS_DATA->chunk_path()); chnk_path /= fs_path; chnk_path /= fmt::FormatInt(chnk_id).c_str();; int fd = open(chnk_path.c_str(), R_OK); if (fd < 0) return EIO; read_size = static_cast<size_t>(pread(fd, buf, size, off)); close(fd); return 0; } int read_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_size_t>& buf_sizes, size_t& read_size) { read_size = 0; // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks auto chnk_n = buf_sizes.size() / 2; // TODO this can be parallized for (size_t i = 0; i < chnk_n; i++) { auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i])); auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]); auto chnk_size = buf_sizes[i + chnk_n]; size_t read_chnk_size; // TODO append if (read_file(path, chnk_id, chnk_size, 0, chnk_ptr, read_chnk_size) != 0) { // TODO How do we handle errors? ADAFS_DATA->spdlogger()->error("{}() read chunk failed with path {} and id {}. Aborting ...", __func__, path, chnk_id); read_size = 0; return -1; } read_size += read_chnk_size; } return 0; } No newline at end of file