Commit 44698778 authored by Marc Vef's avatar Marc Vef
Browse files

Daemon crash fix when handling I/O requests

parent 387245a3
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -304,6 +304,7 @@ int puts(const char* str) {
ssize_t write(int fd, const void* buf, size_t count) {
    init_passthrough_if_needed();
    if (is_env_initialized && file_map.exist(fd)) {
        ld_logger->trace("{}() called with fd {}", __func__, fd);
        // TODO if append flag has been given, set offset accordingly.
        // XXX handle lseek too
        return pwrite(fd, buf, count, 0);
@@ -314,6 +315,7 @@ ssize_t write(int fd, const void* buf, size_t count) {
ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
    init_passthrough_if_needed();
    if (is_env_initialized && file_map.exist(fd)) {
        ld_logger->trace("{}() called with fd {}", __func__, fd);
        auto adafs_fd = file_map.get(fd);
        auto path = adafs_fd->path();
        auto append_flag = adafs_fd->append_flag();
@@ -345,6 +347,7 @@ ssize_t pwrite(int fd, const void* buf, size_t count, off_t offset) {
ssize_t read(int fd, void* buf, size_t count) {
    init_passthrough_if_needed();
    if (is_env_initialized && file_map.exist(fd)) {
        ld_logger->trace("{}() called with fd {}", __func__, fd);
        return pread(fd, buf, count, 0);
    }
    return (reinterpret_cast<decltype(&read)>(libc_read))(fd, buf, count);
@@ -353,6 +356,7 @@ ssize_t read(int fd, void* buf, size_t count) {
ssize_t pread(int fd, void* buf, size_t count, off_t offset) {
    init_passthrough_if_needed();
    if (is_env_initialized && file_map.exist(fd)) {
        ld_logger->trace("{}() called with fd {}", __func__, fd);
        auto adafs_fd = file_map.get(fd);
        auto path = adafs_fd->path();
        size_t read_size = 0;
+4 −0
Original line number Diff line number Diff line
@@ -44,6 +44,10 @@ int rpc_send_open(const hg_id_t ipc_open_id, const hg_id_t rpc_open_id, const st
    in.mode = mode;

    // TODO handle all flags. currently only file create
    if (!(flags & O_CREAT)) {
        ld_logger->debug("{}() No create flag given, assuming file exists ...", __func__);
        return 0; // XXX This is a temporary quickfix for read. Look up if file exists. Do it on server end.
    }
    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    margo_create_wrap(ipc_open_id, rpc_open_id, path, handle, svr_addr);

+8 −14
Original line number Diff line number Diff line
@@ -9,8 +9,8 @@
using namespace std;

static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    rpc_read_data_in_t in;
    rpc_data_out_t out;
    rpc_read_data_in_t in{};
    rpc_data_out_t out{};
    void* b_buf;
    int err;
    hg_bulk_t bulk_handle;
@@ -39,7 +39,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        // set up buffer for bulk transfer
        b_buf = (void*) buf.get();

        ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in.size, HG_BULK_READ_ONLY, &bulk_handle);
        ret = margo_bulk_create(mid, 1, &b_buf, &in.size, HG_BULK_READ_ONLY, &bulk_handle);

        // push data to client
        if (ret == HG_SUCCESS)
@@ -55,14 +55,11 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        if (hret != HG_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("Failed to respond to read request");
        }
        HG_Bulk_free(bulk_handle);
        margo_bulk_free(bulk_handle);
    }

    in.path = nullptr;

    // Destroy handle when finished
    margo_free_input(handle, &in);
    margo_free_output(handle, &out);
    margo_destroy(handle);
    return HG_SUCCESS;
}
@@ -70,8 +67,8 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
DEFINE_MARGO_RPC_HANDLER(rpc_srv_read_data)

static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    rpc_write_data_in_t in;
    rpc_data_out_t out;
    rpc_write_data_in_t in{};
    rpc_data_out_t out{};
    void* b_buf;
    hg_bulk_t bulk_handle;

@@ -84,7 +81,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    // register local buffer to fill for bulk pull
    auto b_buf_wrap = make_unique<char[]>(in.size);
    b_buf = static_cast<void*>(b_buf_wrap.get());
    ret = HG_Bulk_create(hgi->hg_class, 1, &b_buf, &in.size, HG_BULK_WRITE_ONLY, &bulk_handle);
    ret = margo_bulk_create(mid, 1, &b_buf, &in.size, HG_BULK_WRITE_ONLY, &bulk_handle);
    // push data to client
    if (ret == HG_SUCCESS) {
        // pull data from client here
@@ -96,7 +93,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
            ADAFS_DATA->spdlogger()->error("Failed to write data to local disk.");
            out.io_size = 0;
        }
        HG_Bulk_free(bulk_handle);
        margo_bulk_free(bulk_handle);
    } else {
        ADAFS_DATA->spdlogger()->error("Failed to pull data from client in write operation");
        out.res = EIO;
@@ -108,11 +105,8 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        ADAFS_DATA->spdlogger()->error("Failed to respond to write request");
    }

    in.path = nullptr;

    // Destroy handle when finished
    margo_free_input(handle, &in);
    margo_free_output(handle, &out);
    margo_destroy(handle);
    return HG_SUCCESS;
}