ld_rpc_metadentry.cpp 11.7 KiB
Newer Older

#include <preload/rpc/ld_rpc_metadentry.hpp>
using ns = chrono::nanoseconds;
using get_time = chrono::steady_clock;

inline hg_return_t margo_forward_timed_wrap_timer(hg_handle_t& handle, void* in_struct, const char* func) {
    auto start_t = get_time::now();
    auto ret = margo_forward_timed_wrap(handle, in_struct);
    auto diff_count = chrono::duration_cast<ns>(get_time::now() - start_t).count();
    if (((diff_count) / 1000000.) > MARGO_FORWARD_TIMER_THRESHOLD)
        ld_logger->info("{}() rpc_time: {} ms", func, ((diff_count) / 1000000.));
    return ret;
}

inline hg_return_t margo_forward_timed_wrap(hg_handle_t& handle, void* in_struct) {
    hg_return_t ret = HG_OTHER_ERROR;

    for (int i = 0; i < RPC_TRIES; ++i) {
        ret = margo_forward_timed(handle, in_struct, RPC_TIMEOUT);
        if (ret == HG_SUCCESS) {
            break;
        }
    }
    return ret;
}
int rpc_send_mk_node(const std::string& path, const mode_t mode) {
Marc Vef's avatar
Marc Vef committed
    hg_handle_t handle;
    rpc_mk_node_in_t in{};
Marc Vef's avatar
Marc Vef committed
    rpc_err_out_t out{};
    int err = EUNKNOWN;
    // fill in
    in.path = path.c_str();
    in.mode = mode;
Marc Vef's avatar
Marc Vef committed
    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_mk_node_id, rpc_mk_node_id, path, handle, false);
    if (ret != HG_SUCCESS) {
        errno = EBUSY;
        return -1;
    // Send rpc
    ld_logger->debug("{}() About to send RPC ...", __func__);
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
Marc Vef's avatar
Marc Vef committed
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            err = out.err;
        } else {
            // something is wrong
            errno = EBUSY;
            ld_logger->error("{}() while getting rpc output", __func__);
        }
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
        ld_logger->warn("{}() timed out");
    margo_destroy(handle);
Marc Vef's avatar
Marc Vef committed
    return err;
int rpc_send_access(const std::string& path, const int mask) {
    hg_handle_t handle;
    rpc_access_in_t in{};
    rpc_err_out_t out{};
    int err = EUNKNOWN;
    // fill in
    in.path = path.c_str();
    in.mask = mask;
    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_access_id, rpc_access_id, path, handle, false);
    if (ret != HG_SUCCESS) {
        errno = EBUSY;
        return -1;
    // Send rpc
    ld_logger->debug("{}() About to send RPC ...", __func__);
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            err = out.err;
        } else {
            // something is wrong
            errno = EBUSY;
            ld_logger->error("{}() while getting rpc output", __func__);
        }
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out");
    }

    margo_destroy(handle);
    return err;
}

int rpc_send_stat(const std::string& path, string& attr) {
    hg_handle_t handle;
Marc Vef's avatar
Marc Vef committed
    rpc_stat_out_t out{};
    int err = EUNKNOWN;
    // fill in
    in.path = path.c_str();
Marc Vef's avatar
Marc Vef committed
    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_stat_id, rpc_stat_id, path, handle, false);
    if (ret != HG_SUCCESS) {
        errno = EBUSY;
        return -1;
    // Send rpc
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
Marc Vef's avatar
Marc Vef committed
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            if (out.err == 0) {
                err = 0;
                attr = out.db_val;
            } else {
                err = -1;
                errno = out.err;
            }
        } else {
            // something is wrong
            errno = EBUSY;
            ld_logger->error("{}() while getting rpc output", __func__);
        }
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out");
        errno = EBUSY;
    margo_destroy(handle);
    return err;
int rpc_send_rm_node(const std::string& path) {
    rpc_rm_node_in_t in{};
    rpc_err_out_t out{};
    hg_handle_t handle;
    int err = EUNKNOWN;
    // fill in
    in.path = path.c_str();

    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, false);
    if (ret != HG_SUCCESS) {
        errno = EBUSY;
        return -1;
    // Send rpc
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            err = out.err;
        } else {
            // something is wrong
            errno = EBUSY;
            ld_logger->error("{}() while getting rpc output", __func__);
        }
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
        ld_logger->warn("{}() timed out");
        errno = EBUSY;
    margo_destroy(handle);
int rpc_send_update_metadentry(const string& path, const Metadentry& md, const MetadentryUpdateFlags& md_flags) {
    rpc_update_metadentry_in_t in{};
    rpc_err_out_t out{};
    int err = EUNKNOWN;
    // fill in
    // add data
    in.path = path.c_str();
    in.size = md_flags.size ? md.size : 0;
    in.nlink = md_flags.link_count ? md.link_count : 0;
    in.gid = md_flags.gid ? md.gid : 0;
    in.uid = md_flags.uid ? md.uid : 0;
    in.blocks = md_flags.blocks ? md.blocks : 0;
    in.inode_no = md_flags.inode_no ? md.inode_no : 0;
    in.atime = md_flags.atime ? md.atime : 0;
    in.mtime = md_flags.mtime ? md.mtime : 0;
    in.ctime = md_flags.ctime ? md.ctime : 0;
    // add data flags
    in.size_flag = bool_to_merc_bool(md_flags.size);
    in.nlink_flag = bool_to_merc_bool(md_flags.link_count);
    in.gid_flag = bool_to_merc_bool(md_flags.gid);
    in.uid_flag = bool_to_merc_bool(md_flags.uid);
    in.block_flag = bool_to_merc_bool(md_flags.blocks);
    in.inode_no_flag = bool_to_merc_bool(md_flags.inode_no);
    in.atime_flag = bool_to_merc_bool(md_flags.atime);
    in.mtime_flag = bool_to_merc_bool(md_flags.mtime);
    in.ctime_flag = bool_to_merc_bool(md_flags.ctime);

    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_update_metadentry_id, rpc_update_metadentry_id, path, handle, false);
    if (ret != HG_SUCCESS) {
        errno = EBUSY;
        return -1;
    // Send rpc
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            err = out.err;
        } else {
            // something is wrong
            errno = EBUSY;
            ld_logger->error("{}() while getting rpc output", __func__);
        }
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
        ld_logger->warn("{}() timed out");
        errno = EBUSY;
    margo_destroy(handle);
int rpc_send_update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag,
                                    off64_t& ret_size) {
    rpc_update_metadentry_size_in_t in{};
    rpc_update_metadentry_size_out_t out{};
    // add data
    in.path = path.c_str();
    in.size = size;
Marc Vef's avatar
Marc Vef committed
    if (append_flag)
        in.append = HG_TRUE;
    else
        in.append = HG_FALSE;

    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_update_metadentry_size_id, rpc_update_metadentry_size_id, path, handle, false);
    if (ret != HG_SUCCESS) {
        errno = EBUSY;
        return -1;
    }
    // Send rpc
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            err = out.err;
            ret_size = out.ret_size;
        } else {
            // something is wrong
            errno = EBUSY;
            ret_size = 0;
            ld_logger->error("{}() while getting rpc output", __func__);
        }
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out");
        errno = EBUSY;
    }
    margo_destroy(handle);
    return err;
}

int rpc_send_get_metadentry_size(const std::string& path, off64_t& ret_size) {
    hg_handle_t handle;
    rpc_path_only_in_t in{};
    rpc_get_metadentry_size_out_t out{};
    // add data
    in.path = path.c_str();
    int err = EUNKNOWN;

    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_get_metadentry_size_id, rpc_get_metadentry_size_id, path, handle, false);
    if (ret != HG_SUCCESS) {
        errno = EBUSY;
        return -1;
    // Send rpc
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            err = out.err;
            ret_size = out.ret_size;
        } else {
            // something is wrong
            errno = EBUSY;
            ret_size = 0;
            ld_logger->error("{}() while getting rpc output", __func__);
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
        ld_logger->warn("{}() timed out");
        errno = EBUSY;
    margo_destroy(handle);
Marc Vef's avatar
Marc Vef committed
    return err;