Loading include/daemon/relocation/transmitter.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,7 @@ namespace gkfs::relocation { void int transmit_metadata_and_data(gkfs::rpc::host_t localhost); } // namespace gkfs::relocation Loading src/daemon/handler/srv_relocation.cpp +8 −8 Original line number Diff line number Diff line Loading @@ -53,8 +53,8 @@ rpc_srv_relocate_metadata(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } cout << fmt::format("TODO(dauer) Received metadata entry {}: {}\n", in.key, in.value); GKFS_DATA->spdlogger()->debug("{}() Received metadata entry {}: {}", __func__, in.key, in.value); GKFS_DATA->mdb()->put(in.key, in.value); Loading Loading @@ -84,8 +84,8 @@ rpc_srv_relocate_chunk(hg_handle_t handle) { auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); cout << fmt::format("TODO(dauer) Received chunk {}: chunk {} size {}\n", in.path, in.chunk_id, bulk_size); GKFS_DATA->spdlogger()->trace("{}() Received chunk {}: {} size: {}", __func__, in.path, in.chunk_id, bulk_size); unique_ptr<char[]> buf(new char[bulk_size]()); char* buf_ptr = buf.get(); Loading @@ -112,7 +112,7 @@ rpc_srv_relocate_chunk(hg_handle_t handle) { */ hg_return_t rpc_srv_relocation_start(hg_handle_t handle) { cout << "TODO(dauer) relocation start received\n"; GKFS_DATA->spdlogger()->info("{}() Relocation start received", __func__); rpc_relocation_start_in_t in{}; rpc_err_out_t out{}; out.err = EIO; Loading @@ -125,9 +125,9 @@ rpc_srv_relocation_start(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } gkfs::relocation::transmit_metadata_and_data(in.host_id); out.err = 0; // TODO(dauer) out.err = gkfs::relocation::transmit_metadata_and_data(in.host_id); GKFS_DATA->spdlogger()->info("{}() Work is done, returning to management", __func__); return gkfs::rpc::cleanup_respond(&handle, &in, &out); } Loading src/daemon/relocation/config_manager.cpp +15 −8 Original line number Diff line number Diff line Loading @@ -97,8 +97,10 @@ calculate_random_slicing() { } } GKFS_DATA->spdlogger()->info("Hosts to be added: {}", new_uris_str); GKFS_DATA->spdlogger()->info("Hosts to be removed: {}", gone_uris_str); GKFS_DATA->spdlogger()->info("{}() Hosts to be added: {}", __func__, new_uris_str); GKFS_DATA->spdlogger()->info("{}() Hosts to be removed: {}", __func__, gone_uris_str); // remove gone hosts if(gone_ids.size() > 0) { Loading @@ -118,14 +120,14 @@ calculate_random_slicing() { trigger_hosts = rs_hosts; } else { GKFS_DATA->spdlogger()->info( "No existing random slicing configuration found, " "{}() No existing random slicing configuration found, " "creating from scratch with {} hosts.", hostsfile_hosts.size()); __func__, hostsfile_hosts.size()); for(const auto& [id, host] : hostsfile_hosts) { const auto& [hostname, uri] = host; disks.emplace_back(new VDRIVE::Disk(id, 1, hostname, uri)); cout << uri << "\n"; GKFS_DATA->spdlogger()->trace("{}() Adding uri: {}", __func__, uri); } dist->setConfiguration(&disks, 1, 1); trigger_hosts = hostsfile_hosts; Loading Loading @@ -165,7 +167,9 @@ read_hosts_file() { */ void do_relocation(hostmap_t hosts) { cout << "number of hosts: " << hosts.size() << "\n"; GKFS_DATA->spdlogger()->info("{}() Starting relocation on {} hosts", __func__, hosts.size()); auto mid = margo_init(GKFS_DATA->bind_addr().c_str(), MARGO_CLIENT_MODE, 0, 0); if(mid == MARGO_INSTANCE_NULL) { Loading @@ -179,8 +183,9 @@ do_relocation(hostmap_t hosts) { for(const auto& [host_id, host] : hosts) { const auto& [hostname, uri] = host; cout << fmt::format("Hostname: {} Address: {} ID: {}\n", hostname, uri, host_id); GKFS_DATA->spdlogger()->info( "{}() Sending trigger to host: {} (URI: {} ID: {})\n", __func__, hostname, uri, host_id); hg_addr_t host_addr; rpc_relocation_start_in_t in{}; rpc_err_out_t out{}; Loading @@ -206,6 +211,8 @@ do_relocation(hostmap_t hosts) { cout << "Error during margo cleanup.\n"; } } GKFS_DATA->spdlogger()->info("{}() Relocation done", __func__, hosts.size()); margo_finalize(mid); } Loading src/daemon/relocation/transmitter.cpp +33 −20 Original line number Diff line number Diff line Loading @@ -38,9 +38,9 @@ using namespace std; namespace gkfs::relocation { void int transmit_metadata_and_data(gkfs::rpc::host_t localhost) { // TODO(dauer) return error code on error { // TODO(dauer) move this to somewhere appropriate auto dist_impl = make_shared<VDRIVE::DistRandSlice>(); Loading @@ -58,8 +58,8 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { auto hosts = dynamic_cast<gkfs::rpc::RandomSlicingDistributor*>(&distributor) ->get_hosts_map(); cout << fmt::format("Got host_id = {} and parsed {} hosts.\n", localhost, hosts.size()); GKFS_DATA->spdlogger()->info("{}() Got host id = {} and parsed {} hosts", __func__, localhost, hosts.size()); // Relocate metadata for(const auto& [metakey, metavalue] : GKFS_DATA->mdb()->get_all()) { Loading @@ -67,11 +67,11 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { continue; } auto destination = distributor.locate_file_metadata(metakey); cout << "Metadentry " << metakey << " : " << metavalue << (destination == localhost ? " Stay" : " -> Goto "s + std::to_string(destination)) << '\n'; GKFS_DATA->spdlogger()->trace( "{}() Metadentry {} : {} {} {}", __func__, metakey, metavalue, (destination == localhost ? " Stay on " : " -> Goto "), destination); if(destination == localhost) { continue; Loading Loading @@ -99,34 +99,46 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { ret = margo_get_output(handle, &out); assert(ret == HG_SUCCESS); // TODO(dauer) process output // TODO(dauer) catch DB exceptions GKFS_DATA->mdb()->remove(in.key); if(HG_SUCCESS != gkfs::rpc::margo_client_cleanup(&handle, &out, &mid, &host_addr)) { cout << "Error during margo cleanup.\n"; GKFS_DATA->spdlogger()->error("{}() Error during margo cleanup", __func__); } } // Relocate data (chunks) auto relocate_chunk_id = auto relocate_chunk_rpc_id = gkfs::rpc::get_rpc_id(mid, gkfs::rpc::tag::relocate_chunk); for(auto& chunks_dir : GKFS_DATA->storage()->chunks_directory_iterator()) { assert(chunks_dir.is_directory()); // TODO(dauer) just log if(!chunks_dir.is_directory()) { GKFS_DATA->spdlogger()->warn( "{}() Expected directory but got something else: {}", __func__, chunks_dir.path().string()); continue; } string file_path = GKFS_DATA->storage()->get_file_path( chunks_dir.path().filename().string()); for(auto& chunk_file : fs::directory_iterator(chunks_dir)) { assert(chunk_file.is_regular_file()); // TODO(dauer) just log if(!chunk_file.is_regular_file()) { GKFS_DATA->spdlogger()->warn( "{}() Expected regular file but got something else: {}", __func__, chunk_file.path().string()); continue; } gkfs::rpc::chnk_id_t chunk_id = std::stoul(chunk_file.path().filename().string()); auto destination = distributor.locate_data(file_path, chunk_id); size_t size = chunk_file.file_size(); cout << file_path << " chunk: " << chunk_id << " size: " << size << (destination == localhost ? " Stay" : " -> Goto "s + std::to_string(destination)) << '\n'; GKFS_DATA->spdlogger()->trace( "{}() Checking {} chunk: {} size: {} {} {}", __func__, file_path, chunk_id, size, (destination == localhost ? " Stay on" : " -> Goto "), destination); if(destination == localhost) { continue; Loading Loading @@ -158,7 +170,7 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { // let's do this sequential first hg_handle_t handle; ret = margo_create(mid, host_addr, relocate_chunk_id, &handle); ret = margo_create(mid, host_addr, relocate_chunk_rpc_id, &handle); assert(ret == HG_SUCCESS); ret = margo_forward(handle, &in); // blocking Loading @@ -176,6 +188,7 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { } } } return 0; } Loading Loading
include/daemon/relocation/transmitter.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -18,7 +18,7 @@ namespace gkfs::relocation { void int transmit_metadata_and_data(gkfs::rpc::host_t localhost); } // namespace gkfs::relocation Loading
src/daemon/handler/srv_relocation.cpp +8 −8 Original line number Diff line number Diff line Loading @@ -53,8 +53,8 @@ rpc_srv_relocate_metadata(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } cout << fmt::format("TODO(dauer) Received metadata entry {}: {}\n", in.key, in.value); GKFS_DATA->spdlogger()->debug("{}() Received metadata entry {}: {}", __func__, in.key, in.value); GKFS_DATA->mdb()->put(in.key, in.value); Loading Loading @@ -84,8 +84,8 @@ rpc_srv_relocate_chunk(hg_handle_t handle) { auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); cout << fmt::format("TODO(dauer) Received chunk {}: chunk {} size {}\n", in.path, in.chunk_id, bulk_size); GKFS_DATA->spdlogger()->trace("{}() Received chunk {}: {} size: {}", __func__, in.path, in.chunk_id, bulk_size); unique_ptr<char[]> buf(new char[bulk_size]()); char* buf_ptr = buf.get(); Loading @@ -112,7 +112,7 @@ rpc_srv_relocate_chunk(hg_handle_t handle) { */ hg_return_t rpc_srv_relocation_start(hg_handle_t handle) { cout << "TODO(dauer) relocation start received\n"; GKFS_DATA->spdlogger()->info("{}() Relocation start received", __func__); rpc_relocation_start_in_t in{}; rpc_err_out_t out{}; out.err = EIO; Loading @@ -125,9 +125,9 @@ rpc_srv_relocation_start(hg_handle_t handle) { return gkfs::rpc::cleanup_respond(&handle, &in, &out); } gkfs::relocation::transmit_metadata_and_data(in.host_id); out.err = 0; // TODO(dauer) out.err = gkfs::relocation::transmit_metadata_and_data(in.host_id); GKFS_DATA->spdlogger()->info("{}() Work is done, returning to management", __func__); return gkfs::rpc::cleanup_respond(&handle, &in, &out); } Loading
src/daemon/relocation/config_manager.cpp +15 −8 Original line number Diff line number Diff line Loading @@ -97,8 +97,10 @@ calculate_random_slicing() { } } GKFS_DATA->spdlogger()->info("Hosts to be added: {}", new_uris_str); GKFS_DATA->spdlogger()->info("Hosts to be removed: {}", gone_uris_str); GKFS_DATA->spdlogger()->info("{}() Hosts to be added: {}", __func__, new_uris_str); GKFS_DATA->spdlogger()->info("{}() Hosts to be removed: {}", __func__, gone_uris_str); // remove gone hosts if(gone_ids.size() > 0) { Loading @@ -118,14 +120,14 @@ calculate_random_slicing() { trigger_hosts = rs_hosts; } else { GKFS_DATA->spdlogger()->info( "No existing random slicing configuration found, " "{}() No existing random slicing configuration found, " "creating from scratch with {} hosts.", hostsfile_hosts.size()); __func__, hostsfile_hosts.size()); for(const auto& [id, host] : hostsfile_hosts) { const auto& [hostname, uri] = host; disks.emplace_back(new VDRIVE::Disk(id, 1, hostname, uri)); cout << uri << "\n"; GKFS_DATA->spdlogger()->trace("{}() Adding uri: {}", __func__, uri); } dist->setConfiguration(&disks, 1, 1); trigger_hosts = hostsfile_hosts; Loading Loading @@ -165,7 +167,9 @@ read_hosts_file() { */ void do_relocation(hostmap_t hosts) { cout << "number of hosts: " << hosts.size() << "\n"; GKFS_DATA->spdlogger()->info("{}() Starting relocation on {} hosts", __func__, hosts.size()); auto mid = margo_init(GKFS_DATA->bind_addr().c_str(), MARGO_CLIENT_MODE, 0, 0); if(mid == MARGO_INSTANCE_NULL) { Loading @@ -179,8 +183,9 @@ do_relocation(hostmap_t hosts) { for(const auto& [host_id, host] : hosts) { const auto& [hostname, uri] = host; cout << fmt::format("Hostname: {} Address: {} ID: {}\n", hostname, uri, host_id); GKFS_DATA->spdlogger()->info( "{}() Sending trigger to host: {} (URI: {} ID: {})\n", __func__, hostname, uri, host_id); hg_addr_t host_addr; rpc_relocation_start_in_t in{}; rpc_err_out_t out{}; Loading @@ -206,6 +211,8 @@ do_relocation(hostmap_t hosts) { cout << "Error during margo cleanup.\n"; } } GKFS_DATA->spdlogger()->info("{}() Relocation done", __func__, hosts.size()); margo_finalize(mid); } Loading
src/daemon/relocation/transmitter.cpp +33 −20 Original line number Diff line number Diff line Loading @@ -38,9 +38,9 @@ using namespace std; namespace gkfs::relocation { void int transmit_metadata_and_data(gkfs::rpc::host_t localhost) { // TODO(dauer) return error code on error { // TODO(dauer) move this to somewhere appropriate auto dist_impl = make_shared<VDRIVE::DistRandSlice>(); Loading @@ -58,8 +58,8 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { auto hosts = dynamic_cast<gkfs::rpc::RandomSlicingDistributor*>(&distributor) ->get_hosts_map(); cout << fmt::format("Got host_id = {} and parsed {} hosts.\n", localhost, hosts.size()); GKFS_DATA->spdlogger()->info("{}() Got host id = {} and parsed {} hosts", __func__, localhost, hosts.size()); // Relocate metadata for(const auto& [metakey, metavalue] : GKFS_DATA->mdb()->get_all()) { Loading @@ -67,11 +67,11 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { continue; } auto destination = distributor.locate_file_metadata(metakey); cout << "Metadentry " << metakey << " : " << metavalue << (destination == localhost ? " Stay" : " -> Goto "s + std::to_string(destination)) << '\n'; GKFS_DATA->spdlogger()->trace( "{}() Metadentry {} : {} {} {}", __func__, metakey, metavalue, (destination == localhost ? " Stay on " : " -> Goto "), destination); if(destination == localhost) { continue; Loading Loading @@ -99,34 +99,46 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { ret = margo_get_output(handle, &out); assert(ret == HG_SUCCESS); // TODO(dauer) process output // TODO(dauer) catch DB exceptions GKFS_DATA->mdb()->remove(in.key); if(HG_SUCCESS != gkfs::rpc::margo_client_cleanup(&handle, &out, &mid, &host_addr)) { cout << "Error during margo cleanup.\n"; GKFS_DATA->spdlogger()->error("{}() Error during margo cleanup", __func__); } } // Relocate data (chunks) auto relocate_chunk_id = auto relocate_chunk_rpc_id = gkfs::rpc::get_rpc_id(mid, gkfs::rpc::tag::relocate_chunk); for(auto& chunks_dir : GKFS_DATA->storage()->chunks_directory_iterator()) { assert(chunks_dir.is_directory()); // TODO(dauer) just log if(!chunks_dir.is_directory()) { GKFS_DATA->spdlogger()->warn( "{}() Expected directory but got something else: {}", __func__, chunks_dir.path().string()); continue; } string file_path = GKFS_DATA->storage()->get_file_path( chunks_dir.path().filename().string()); for(auto& chunk_file : fs::directory_iterator(chunks_dir)) { assert(chunk_file.is_regular_file()); // TODO(dauer) just log if(!chunk_file.is_regular_file()) { GKFS_DATA->spdlogger()->warn( "{}() Expected regular file but got something else: {}", __func__, chunk_file.path().string()); continue; } gkfs::rpc::chnk_id_t chunk_id = std::stoul(chunk_file.path().filename().string()); auto destination = distributor.locate_data(file_path, chunk_id); size_t size = chunk_file.file_size(); cout << file_path << " chunk: " << chunk_id << " size: " << size << (destination == localhost ? " Stay" : " -> Goto "s + std::to_string(destination)) << '\n'; GKFS_DATA->spdlogger()->trace( "{}() Checking {} chunk: {} size: {} {} {}", __func__, file_path, chunk_id, size, (destination == localhost ? " Stay on" : " -> Goto "), destination); if(destination == localhost) { continue; Loading Loading @@ -158,7 +170,7 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { // let's do this sequential first hg_handle_t handle; ret = margo_create(mid, host_addr, relocate_chunk_id, &handle); ret = margo_create(mid, host_addr, relocate_chunk_rpc_id, &handle); assert(ret == HG_SUCCESS); ret = margo_forward(handle, &in); // blocking Loading @@ -176,6 +188,7 @@ transmit_metadata_and_data(gkfs::rpc::host_t localhost) { } } } return 0; } Loading