Commit 11045d0b authored by Marc Vef's avatar Marc Vef
Browse files

Adding get_chunk_stat proxy to proxy

parent 3279efdf
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ struct margo_client_ids {
    hg_id_t rpc_update_metadentry_size_id;
    hg_id_t rpc_write_id;
    hg_id_t rpc_read_id;
    hg_id_t rpc_chunk_stat_id;
};

class ProxyData {
+3 −0
Original line number Diff line number Diff line
@@ -27,6 +27,9 @@ std::pair<int, ssize_t>
forward_read(const std::string& path, void* buf, int64_t offset,
             size_t read_size);

std::pair<int, ChunkStat>
forward_get_chunk_stat();

} // namespace rpc
} // namespace gkfs

+2 −0
Original line number Diff line number Diff line
@@ -33,4 +33,6 @@ DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_read)

DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_write)

DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_chunk_stat)

#endif // GKFS_PROXY_RPC_DEFS_HPP
+5 −0
Original line number Diff line number Diff line
@@ -43,6 +43,8 @@ register_server_ipcs(margo_instance_id mid) {
                   rpc_data_out_t, proxy_rpc_srv_write)
    MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_read, rpc_proxy_read_data_in_t,
                   rpc_data_out_t, proxy_rpc_srv_read)
    MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_chunk_stat, rpc_chunk_stat_in_t,
                   rpc_chunk_stat_out_t, proxy_rpc_srv_chunk_stat)
    MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_create, rpc_mk_node_in_t,
                   rpc_err_out_t, proxy_rpc_srv_create)
    MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_stat, rpc_path_only_in_t,
@@ -121,6 +123,9 @@ register_client_rpcs(margo_instance_id mid) {
    PROXY_DATA->rpc_client_ids().rpc_read_id =
            MARGO_REGISTER(mid, gkfs::rpc::tag::read, rpc_read_data_in_t,
                           rpc_data_out_t, NULL);
    PROXY_DATA->rpc_client_ids().rpc_chunk_stat_id =
            MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat,
                           rpc_chunk_stat_in_t, rpc_chunk_stat_out_t, NULL);
    PROXY_DATA->rpc_client_ids().rpc_create_id = MARGO_REGISTER(
            mid, gkfs::rpc::tag::create, rpc_mk_node_in_t, rpc_err_out_t, NULL);
    PROXY_DATA->rpc_client_ids().rpc_stat_id =
+83 −0
Original line number Diff line number Diff line
@@ -343,4 +343,87 @@ forward_read(const std::string& path, void* buf, const int64_t offset,
    return ::make_pair(err, out_size);
}

pair<int, ChunkStat>
forward_get_chunk_stat() {
    int err = 0;
    hg_return ret{};
    // Create handle
    PROXY_DATA->log()->debug("{}() Creating Margo handle ...", __func__);

    // some helper variables for async RPC
    auto target_n = PROXY_DATA->hosts_size();
    vector<hg_handle_t> rpc_handles(target_n);
    vector<margo_request> rpc_waiters(target_n);
    vector<rpc_chunk_stat_in_t> rpc_in(target_n);
    for(uint64_t i = 0; i < target_n; i++) {
        ret = margo_create(PROXY_DATA->client_rpc_mid(),
                           PROXY_DATA->rpc_endpoints().at(i),
                           PROXY_DATA->rpc_client_ids().rpc_chunk_stat_id,
                           &rpc_handles[i]);
        // XXX Don't think this is useful here cause responds go into nothing
        if(ret != HG_SUCCESS) {
            for(uint64_t j = 0; j < i + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            return ::make_pair(EBUSY, ChunkStat{});
        }
        // Send RPC
        rpc_in[i].dummy = 0;
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if(ret != HG_SUCCESS) {
            PROXY_DATA->log()->error(
                    "{}() Unable to send non-blocking rpc for recipient {}",
                    __func__, i);
            for(uint64_t j = 0; j < i + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            return ::make_pair(EBUSY, ChunkStat{});
        }
    }
    PROXY_DATA->log()->debug("{}() '{}' RPCs sent, waiting for reply ...",
                             __func__, target_n);
    // Wait for RPC responses and then get response and add it to out_size which
    // is the written size All potential outputs are served to free resources
    // regardless of errors, although an errorcode is set.
    unsigned long chunk_size = gkfs::config::rpc::chunksize;
    unsigned long chunk_total = 0;
    unsigned long chunk_free = 0;
    for(uint64_t i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output
        // that never comes?
        ret = margo_wait(rpc_waiters[i]);
        if(ret != HG_SUCCESS) {
            PROXY_DATA->log()->error(
                    "{}() Unable to wait for margo_request handle for recipient {}",
                    __func__, i);
            err = EBUSY;
        }
        // decode response
        rpc_chunk_stat_out_t daemon_out{};
        ret = margo_get_output(rpc_handles[i], &daemon_out);
        if(ret != HG_SUCCESS) {
            PROXY_DATA->log()->error(
                    "{}() Failed to get rpc output for recipient {}", __func__,
                    i);
            err = EBUSY;
        }
        PROXY_DATA->log()->debug(
                "{}() Got response from target '{}': err '{}' with chunk_total '{}' chunk_free '{}'",
                __func__, i, daemon_out.err, daemon_out.chunk_total,
                daemon_out.chunk_free);
        if(daemon_out.err != 0)
            err = daemon_out.err;
        else {
            chunk_total += daemon_out.chunk_total;
            chunk_free += daemon_out.chunk_free;
        }
        margo_free_output(rpc_handles[i], &daemon_out);
        margo_destroy(rpc_handles[i]);
    }
    if(err)
        return make_pair(err, ChunkStat{});
    else
        return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free});
}

} // namespace gkfs::rpc
 No newline at end of file
Loading