Commit b88ac598 authored by Marc Vef's avatar Marc Vef
Browse files

Multiple nodes I/O fix

parent 0fadd7c2
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -114,7 +114,9 @@ size_t get_rpc_node(const std::string& to_hash);

bool is_local_op(size_t recipient);

hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
template<typename T>
hg_return margo_create_wrap(hg_id_t ipc_id, hg_id_t rpc_id, const T&, hg_handle_t& handle,
                            hg_addr_t& svr_addr, bool force_rpc);


#endif //IFS_PRELOAD_UTIL_HPP
+2 −0
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ struct write_args {
    size_t chnk_start;
    off_t updated_size;
    std::vector<unsigned long>& chnk_ids;
    size_t recipient;
    ABT_eventual* eventual;
};
struct read_args {
@@ -31,6 +32,7 @@ struct read_args {
    off_t in_offset;
    void* buf;
    std::vector<unsigned long>& chnk_ids;
    size_t recipient;
    ABT_eventual* eventual;
};

+2 −0
Original line number Diff line number Diff line
@@ -83,6 +83,7 @@ ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off_t offset) {
                0, // reading offset only for the first chunk
                buf, // pointer to write buffer
                dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
                dest_idx[i], // recipient
                &eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
        };
        if (i == 0)
@@ -180,6 +181,7 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
                chnk_start, // append flag when file was opened
                updated_size, // for append truncate TODO needed?
                dest_ids[dest_idx[i]], // pointer to list of chunk ids that all go to the same destination
                dest_idx[i], // recipient
                &eventuals[i], // pointer to an eventual which has allocated memory for storing the written size
        };
        if (i == 0) // first offset in dest_idx is the chunk with a potential offset
+33 −12
Original line number Diff line number Diff line
@@ -257,19 +257,10 @@ bool is_local_op(const size_t recipient) {
    return recipient == fs_config->host_id;
}

/**
 * Wraps certain margo functions to create a Mercury handle
 * @param ipc_id
 * @param rpc_id
 * @param path
 * @param handle
 * @param svr_addr
 * @return
 */
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
inline hg_return
margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t recipient, hg_handle_t& handle,
                         hg_addr_t& svr_addr, bool force_rpc) {
    hg_return_t ret;
    auto recipient = get_rpc_node(path);
    if (is_local_op(recipient) && !force_rpc) { // local
        ret = margo_create(ld_margo_ipc_id, daemon_svr_addr, ipc_id, &handle);
        ld_logger->debug("{}() to local daemon (IPC)", __func__);
@@ -288,3 +279,33 @@ hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const st
    }
    return ret;
}

/**
 * Wraps certain margo functions to create a Mercury handle
 * @param ipc_id
 * @param rpc_id
 * @param path
 * @param handle
 * @param svr_addr
 * @return
 */
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
                            hg_addr_t& svr_addr, bool force_rpc) {
    return margo_create_wrap_helper(ipc_id, rpc_id, get_rpc_node(path), handle, svr_addr, force_rpc);
}

/**
 * Wraps certain margo functions to create a Mercury handle
 * @param ipc_id
 * @param rpc_id
 * @param recipient
 * @param handle
 * @param svr_addr
 * @return
 */
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t& recipient, hg_handle_t& handle,
                            hg_addr_t& svr_addr, bool force_rpc) {
    return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, svr_addr, force_rpc);
}
 No newline at end of file
+3 −2
Original line number Diff line number Diff line
@@ -66,7 +66,8 @@ void rpc_send_write_abt(void* _arg) {
    in.append = HG_FALSE; // unused


    margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->path, handle, svr_addr, false);
    margo_create_wrap(ipc_write_data_id, rpc_write_data_id, arg->recipient, handle, svr_addr, false);


    auto used_mid = margo_hg_handle_get_instance(handle);

@@ -163,7 +164,7 @@ void rpc_send_read_abt(void* _arg) {
    in.size = arg->in_size;
    in.offset = arg->in_offset;

    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, in.path, handle, svr_addr, false);
    margo_create_wrap(ipc_read_data_id, rpc_read_data_id, arg->recipient, handle, svr_addr, false);

    auto used_mid = margo_hg_handle_get_instance(handle);
    /* register local target buffer for bulk access */
Loading