Commit edabf169 authored by Ramon Nou's avatar Ramon Nou
Browse files

Empty directories correctly succeed

parent 73a30d23
Loading
Loading
Loading
Loading
Loading
+27 −24
Original line number Diff line number Diff line
@@ -498,13 +498,17 @@ master_server::plan_transfer_datasets(const network::request& req,
    using proto::plan_response;

    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
    LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources, targets);
    LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources,
                targets);

    auto expanded = expand_transfer_requests(sources, targets);
    std::size_t file_count = expanded.sources.size();
    std::size_t total_size = std::accumulate(expanded.sizes.begin(), expanded.sizes.end(), 0ULL);
    std::size_t total_size =
            std::accumulate(expanded.sizes.begin(), expanded.sizes.end(), 0ULL);

    req.respond(plan_response<error_code>{rpc.id(), error_code::success, std::make_tuple(file_count, total_size)});
    req.respond(
            plan_response<error_code>{rpc.id(), error_code::success,
                                      std::make_tuple(file_count, total_size)});
}

void
@@ -530,25 +534,24 @@ master_server::transfer_datasets(const network::request& req,
                                             r.tid()});

                // Asynchronously expand and dispatch
                m_network_engine.get_handler_pool().make_thread(
                        [this, r, s = sources, t = targets]() {
                m_network_engine.get_handler_pool().make_thread([this, r,
                                                                 s = sources,
                                                                 t = targets]() {
                    auto expanded = expand_transfer_requests(s, t);
                            if(!expanded.sources.empty()) {
                    auto ec = m_request_manager.update(
                                        r.tid(), expanded.sources.size(),
                                        expanded.sizes);
                                if(ec == error_code::success) {
                                    _dispatch_transfer_to_workers(r, expanded);
                            r.tid(), expanded.sources.size(), expanded.sizes);
                    if(ec != error_code::success) {
                        LOGGER_ERROR("Failed to update request {}: {}", r.tid(),
                                     ec);
                    } else {
                                    LOGGER_ERROR(
                                            "Failed to update request {}: {}",
                                            r.tid(), ec);
                                }
                        if(!expanded.sources.empty()) {
                            _dispatch_transfer_to_workers(r, expanded);
                        } else {
                            LOGGER_INFO(
                                        "No files to transfer for request {}",
                                    "No files to transfer for request {}. It is now complete.",
                                    r.tid());
                        }
                    }
                });
            });
}
+92 −48
Original line number Diff line number Diff line
@@ -49,7 +49,8 @@ request_manager::create(std::size_t nworkers) {
}

error_code
request_manager::update(std::uint64_t tid, std::size_t nfiles, const std::vector<std::size_t>& file_sizes) {
request_manager::update(std::uint64_t tid, std::size_t nfiles,
                        const std::vector<std::size_t>& file_sizes) {
    abt::unique_lock lock(m_mutex);
    auto it = m_requests.find(tid);
    if(it == m_requests.end()) {
@@ -57,15 +58,17 @@ request_manager::update(std::uint64_t tid, std::size_t nfiles, const std::vector
    }
    auto& meta = it->second;
    meta.p_req = parallel_request{tid, nfiles, meta.p_req.nworkers()};
    meta.statuses.resize(nfiles, std::vector<part_status>(meta.p_req.nworkers()));
    meta.statuses.resize(nfiles,
                         std::vector<part_status>(meta.p_req.nworkers()));
    meta.file_sizes = file_sizes;
    meta.expanded = true;
    return error_code::success;
}

error_code
request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid,
                        std::string name, transfer_state s, float bw, std::size_t bytes,
                        std::optional<error_code> ec) {
                        std::string name, transfer_state s, float bw,
                        std::size_t bytes, std::optional<error_code> ec) {
    abt::unique_lock lock(m_mutex);

    auto it = m_requests.find(tid);
@@ -76,7 +79,8 @@ request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid,

    auto& statuses = it->second.statuses;
    if(seqno >= statuses.size() || wid >= statuses[seqno].size()) {
        LOGGER_ERROR("{}: Invalid sequence number {} or worker ID {}", __FUNCTION__, seqno, wid);
        LOGGER_ERROR("{}: Invalid sequence number {} or worker ID {}",
                     __FUNCTION__, seqno, wid);
        return error_code::snafu;
    }

@@ -96,26 +100,48 @@ request_manager::lookup(std::uint64_t tid) {

    const auto& meta = it->second;
    const auto& all_file_statuses = meta.statuses;
    auto elapsed_time = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - meta.start_time);
    auto elapsed_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
            std::chrono::steady_clock::now() - meta.start_time);
    if(!meta.expanded) {
        return request_status{
                "",          transfer_state::pending, 0.0f, 0, 0, elapsed_time,
                std::nullopt};
    }

    if (all_file_statuses.empty() && meta.p_req.nfiles() == 0) { // Not yet updated with files
        return request_status{"", transfer_state::pending, 0.0f, 0, 0, elapsed_time, std::nullopt};
    if(meta.p_req.nfiles() == 0) {
        return request_status{
                "",           transfer_state::completed, 0.0f, 0, 0,
                elapsed_time, error_code::success};
    }

    size_t total_bytes_transferred = 0;
    size_t total_bytes = std::accumulate(meta.file_sizes.begin(), meta.file_sizes.end(), 0ULL);
    size_t total_bytes = std::accumulate(meta.file_sizes.begin(),
                                         meta.file_sizes.end(), 0ULL);
    float total_bw = 0.0f;
    int active_workers = 0;
    bool any_running = false;
    bool all_completed = true;

    for(const auto& file_status_vec : all_file_statuses) {
        for(const auto& part : file_status_vec) {
            if (part.state() == transfer_state::failed) {
                return request_status{part.name(), transfer_state::failed, part.bw(), total_bytes_transferred, total_bytes, elapsed_time, part.error()};
            }
            if (part.state() == transfer_state::running) {
            switch(part.state()) {
                case transfer_state::failed:
                    return request_status{part.name(), transfer_state::failed,
                                          part.bw(),   total_bytes_transferred,
                                          total_bytes, elapsed_time,
                                          part.error()};
                case transfer_state::running:
                    any_running = true;
                    all_completed = false;
                    break;
                case transfer_state::pending:
                    all_completed = false;
                    break;
                case transfer_state::completed:
                    break;
            }


            if(part.bw() > 0) {
                total_bw += part.bw();
                active_workers++;
@@ -126,15 +152,27 @@ request_manager::lookup(std::uint64_t tid) {

    float avg_bw = (active_workers > 0) ? total_bw : 0.0f;

    if (total_bytes > 0 && total_bytes_transferred >= total_bytes) {
        return request_status{"", transfer_state::completed, 0.0f, total_bytes_transferred, total_bytes, elapsed_time, error_code::success};
    if(all_completed) {
        return request_status{"",
                              transfer_state::completed,
                              0.0f,
                              total_bytes_transferred,
                              total_bytes,
                              elapsed_time,
                              error_code::success};
    }

    if(any_running || (total_bytes > 0 && total_bytes_transferred < total_bytes)) {
        return request_status{"", transfer_state::running, avg_bw, total_bytes_transferred, total_bytes, elapsed_time, std::nullopt};
    if(any_running || (total_bytes_transferred > 0)) {
        return request_status{"",          transfer_state::running,
                              avg_bw,      total_bytes_transferred,
                              total_bytes, elapsed_time,
                              std::nullopt};
    }

    return request_status{"", transfer_state::pending, 0.0f, total_bytes_transferred, total_bytes, elapsed_time, std::nullopt};
    return request_status{"",          transfer_state::pending,
                          0.0f,        total_bytes_transferred,
                          total_bytes, elapsed_time,
                          std::nullopt};
}

tl::expected<std::vector<request_status>, error_code>
@@ -149,7 +187,8 @@ request_manager::lookup_all(std::uint64_t tid) {

    const auto& meta = it->second;
    const auto& all_file_statuses = meta.statuses;
    auto elapsed_time = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - meta.start_time);
    auto elapsed_time = std::chrono::duration_cast<std::chrono::nanoseconds>(
            std::chrono::steady_clock::now() - meta.start_time);
    std::vector<request_status> result;

    if(all_file_statuses.empty()) {
@@ -168,7 +207,8 @@ request_manager::lookup_all(std::uint64_t tid) {
        std::optional<error_code> ec;

        for(const auto& part : file_status_vec) {
            if(name.empty()) name = part.name();
            if(name.empty())
                name = part.name();
            if(part.state() == transfer_state::failed) {
                failed = true;
                ec = part.error();
@@ -186,11 +226,15 @@ request_manager::lookup_all(std::uint64_t tid) {

        float avg_bw = (active_workers > 0) ? total_bw : 0.0f;
        transfer_state s = transfer_state::completed;
        if(failed) s = transfer_state::failed;
        else if(running) s = transfer_state::running;
        else if (bytes_transferred < meta.file_sizes[i]) s = transfer_state::pending;

        result.emplace_back(name, s, avg_bw, bytes_transferred, meta.file_sizes[i], elapsed_time, ec);
        if(failed)
            s = transfer_state::failed;
        else if(running)
            s = transfer_state::running;
        else if(bytes_transferred < meta.file_sizes[i])
            s = transfer_state::pending;

        result.emplace_back(name, s, avg_bw, bytes_transferred,
                            meta.file_sizes[i], elapsed_time, ec);
    }
    return result;
}
+1 −0
Original line number Diff line number Diff line
@@ -44,6 +44,7 @@ class request_manager {
    
    struct request_metadata {
        parallel_request p_req;
        bool expanded = false;
        std::chrono::steady_clock::time_point start_time;
        std::vector<std::size_t> file_sizes;
        std::vector<file_status> statuses;