Program Listing for File srv_data.cpp

Return to documentation for file (src/proxy/rpc/srv_data.cpp)

/*
  Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This software was partially supported by the
  the European Union’s Horizon 2020 JTI-EuroHPC research and
  innovation programme, by the project ADMIRE (Project ID: 956748,
  admire-eurohpc.eu)

  This project was partially promoted by the Ministry for Digital Transformation
  and the Civil Service, within the framework of the Recovery,
  Transformation and Resilience Plan - Funded by the European Union
  -NextGenerationEU.

  SPDX-License-Identifier: MIT
*/

#include <proxy/proxy.hpp>
#include <proxy/rpc/rpc_defs.hpp>
#include <proxy/rpc/forward_data.hpp>
#include <proxy/rpc/rpc_util.hpp>

#include <common/rpc/rpc_types.hpp>

using namespace std;

static hg_return_t
proxy_rpc_srv_write(hg_handle_t handle) {

    rpc_client_proxy_write_in_t client_in{};
    rpc_data_out_t client_out{};
    client_out.err = EIO;
    client_out.io_size = 0;
    hg_bulk_t bulk_handle = nullptr;
    auto ret = margo_get_input(handle, &client_in);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
                                 __func__);
        client_out.err = EBUSY;
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
    }

    auto hgi = margo_get_info(handle);
    auto mid = margo_hg_handle_get_instance(handle);
    auto bulk_size = margo_bulk_get_size(client_in.bulk_handle);
    assert(bulk_size == client_in.write_size);
    PROXY_DATA->log()->debug(
            "{}() Got RPC with path '{}' bulk_size '{}' == write_size '{}'",
            __func__, client_in.path, bulk_size, client_in.write_size);
    /*
     * Set up buffer and pull from client
     */
    void* bulk_buf; // buffer for bulk transfer
    // create bulk handle and allocated memory for buffer with buf_sizes
    // information
    ret = margo_bulk_create(mid, 1, nullptr, &bulk_size, HG_BULK_READWRITE,
                            &bulk_handle);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error("{}() Failed to create bulk handle", __func__);
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
    }
    // access the internally allocated memory buffer and put it into buf_ptrs
    uint32_t actual_count; // number of segments. we use one here because we
                           // pull the whole buffer at once
    ret = margo_bulk_access(bulk_handle, 0, bulk_size, HG_BULK_READWRITE, 1,
                            &bulk_buf, &bulk_size, &actual_count);
    if(ret != HG_SUCCESS || actual_count != 1) {
        PROXY_DATA->log()->error(
                "{}() Failed to access allocated buffer from bulk handle",
                __func__);
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
                                          &bulk_handle);
    }
    // pull data from client here
    ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr,
                              client_in.bulk_handle, 0, bulk_handle, 0,
                              bulk_size);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error(
                "{}() Failed to pull data from client for path '{}' with size '{}'",
                __func__, client_in.path, bulk_size);
        client_out.err = EBUSY;
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
                                          &bulk_handle);
    }

    // Forward request to daemon, using bulk_buf, containing the pulled data
    // (which is pulled again by the daemon)
    auto daemon_out = gkfs::rpc::forward_write(client_in.path, bulk_buf,
                                               client_in.offset, bulk_size);
    client_out.err = daemon_out.first;
    client_out.io_size = daemon_out.second;
    PROXY_DATA->log()->debug("{}() Sending output err '{}' io_size '{}'",
                             __func__, client_out.err, client_out.io_size);

    return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
                                      &bulk_handle);
}

DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_write)

static hg_return_t
proxy_rpc_srv_read(hg_handle_t handle) {
    rpc_client_proxy_read_in_t client_in{};
    rpc_data_out_t client_out{};
    client_out.err = EIO;
    client_out.io_size = 0;
    hg_bulk_t bulk_handle = nullptr;
    auto ret = margo_get_input(handle, &client_in);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
                                 __func__);
        client_out.err = EBUSY;
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
    }

    auto hgi = margo_get_info(handle);
    auto mid = margo_hg_handle_get_instance(handle);
    auto bulk_size = margo_bulk_get_size(client_in.bulk_handle);
    assert(bulk_size == client_in.read_size);
    PROXY_DATA->log()->debug(
            "{}() Got RPC with path '{}' bulk_size '{}' == read_size '{}'",
            __func__, client_in.path, bulk_size, client_in.read_size);
    /*
     * Set up buffer for push from daemon
     */
    void* bulk_buf; // buffer for bulk transfer
    // create bulk handle and allocated memory for buffer with buf_sizes
    // information
    ret = margo_bulk_create(mid, 1, nullptr, &bulk_size, HG_BULK_READWRITE,
                            &bulk_handle);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error("{}() Failed to create bulk handle", __func__);
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
    }
    // access the internally allocated memory buffer
    uint32_t actual_count; // number of segments. we use one here because we
                           // pull the whole buffer at once
    ret = margo_bulk_access(bulk_handle, 0, bulk_size, HG_BULK_READWRITE, 1,
                            &bulk_buf, &bulk_size, &actual_count);
    if(ret != HG_SUCCESS || actual_count != 1) {
        PROXY_DATA->log()->error(
                "{}() Failed to access allocated buffer from bulk handle",
                __func__);
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
                                          &bulk_handle);
    }
    // Forward request to daemon, using bulk_buf, containing the allocated
    // buffer (which is pushed the data by the daemon)
    auto daemon_out = gkfs::rpc::forward_read(client_in.path, bulk_buf,
                                              client_in.offset, bulk_size);
    if(daemon_out.first != 0) {
        PROXY_DATA->log()->error(
                "{}() Failure when forwarding to daemon with err '{}' and iosize '{}'",
                __func__, daemon_out.first, daemon_out.second);
        client_out.err = daemon_out.first;
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
                                          &bulk_handle);
    }
    // Push data to client here if no error was reported by the daemon
    ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr,
                              client_in.bulk_handle, 0, bulk_handle, 0,
                              bulk_size);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error(
                "{}() Failed to push data from client for path '{}' with size '{}'",
                __func__, client_in.path, bulk_size);
        client_out.err = EBUSY;
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
                                          &bulk_handle);
    }

    client_out.err = daemon_out.first;
    client_out.io_size = daemon_out.second;
    PROXY_DATA->log()->debug("{}() Sending output err '{}' io_size '{}'",
                             __func__, client_out.err, client_out.io_size);
    return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
                                      &bulk_handle);
}

DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_read)


static hg_return_t
proxy_rpc_srv_truncate(hg_handle_t handle) {
    rpc_client_proxy_trunc_in_t client_in{};
    rpc_err_out_t client_out{};

    auto ret = margo_get_input(handle, &client_in);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
                                 __func__);
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
    }
    PROXY_DATA->log()->debug(
            "{}() Got RPC with path '{}' current_size '{}' length '{}'",
            __func__, client_in.path, client_in.current_size, client_in.length);

    client_out.err = gkfs::rpc::forward_truncate(
            client_in.path, client_in.current_size, client_in.length);

    PROXY_DATA->log()->debug("{}() Sending output err '{}'", __func__,
                             client_out.err);
    return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}

DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_truncate)

static hg_return_t
proxy_rpc_srv_chunk_stat(hg_handle_t handle) {
    rpc_chunk_stat_in_t client_in{};
    rpc_chunk_stat_out_t client_out{};
    auto ret = margo_get_input(handle, &client_in);
    if(ret != HG_SUCCESS) {
        PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
                                 __func__);
        return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
    }
    PROXY_DATA->log()->debug("{}() Got chunk stat RPC ", __func__);

    auto daemon_out = gkfs::rpc::forward_get_chunk_stat();
    client_out.err = daemon_out.first;
    client_out.chunk_free = daemon_out.second.chunk_free;
    client_out.chunk_total = daemon_out.second.chunk_total;
    client_out.chunk_size = daemon_out.second.chunk_size;

    PROXY_DATA->log()->debug("{}() Sending output err '{}'", __func__,
                             client_out.err);
    return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}

DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_chunk_stat)