Unverified Commit d308e5d7 authored by Tommaso Tocci's avatar Tommaso Tocci
Browse files

bugfix: propagate IO error from daemons to client

Write errors happenig on deamons are now reported back to clients.
"No space left on device" error is now properly propagated to clients.
parent 8d01c3f2
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -91,7 +91,7 @@ MERCURY_GEN_PROC(rpc_read_data_in_t,
((hg_bulk_t) (bulk_handle)))

MERCURY_GEN_PROC(rpc_data_out_t,
                 ((int32_t) (res))\
        ((int32_t) (err))\
((hg_size_t) (io_size)))

MERCURY_GEN_PROC(rpc_write_data_in_t,
+20 −14
Original line number Diff line number Diff line
@@ -118,33 +118,36 @@ ssize_t write(const string& path, const void* buf, const bool append_flag, const
    // Wait for RPC responses and then get response and add it to out_size which is the written size
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    bool error = false;
    for (unsigned int i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             targets[i]);
            error = true;
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[i]);
            err = -1;
            error = true;
            errno = EIO;
        }
        if (out.err != 0) {
            CTX->log()->error("{}() Daemon reported error: {}", __func__, out.err);
            error = true;
            errno = out.err;
        }
        CTX->log()->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);
    return (err < 0) ? err : out_size;
    return (error) ? -1 : out_size;
}

/**
@@ -228,33 +231,36 @@ ssize_t read(const string& path, void* buf, const off64_t offset, const size_t r
    // Wait for RPC responses and then get response and add it to out_size which is the read size
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    ssize_t err = 0;
    bool error = false;
    for (unsigned int i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             targets[i]);
            error = true;
            errno = EBUSY;
            err = -1;
        }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[i]);
            err = -1;
            error = true;
            errno = EIO;
        }
        if (out.err != 0) {
            CTX->log()->error("{}() Daemon reported error: {}", __func__, out.err);
            error = true;
            errno = out.err;
        }
        CTX->log()->debug("{}() Got response {}", __func__, out.res);
        if (out.res != 0)
            errno = out.res;
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);
    return (err < 0) ? err : out_size;
    return (error) ? -1 : out_size;
}

int trunc_data(const std::string& path, size_t current_size, size_t new_size) {
+1 −1
Original line number Diff line number Diff line
@@ -125,7 +125,7 @@ void ChunkStorage::write_chunk(const std::string& file_path, unsigned int chunk_
        throw std::system_error(errno, std::system_category(), "Failed to open chunk file for write");
    }

    auto wrote = pwrite64(fd, buff, size, offset);
    auto wrote = pwrite(fd, buff, size, offset);
    if (wrote < 0) {
        log->error("Failed to write chunk file. File: {}, size: {}, offset: {}, Error: {}",
                chunk_path, size, offset, std::strerror(errno));
+57 −39
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ struct write_chunk_args {
   ABT_eventual* eventual;
 * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon.
 * This function is called by tasklets, as this function cannot be allowed to block.
 * @return written_size<size_t> is put into eventual and returned that way
 * @return written_size<ssize_t> is put into eventual and returned that way
 */
void write_file_abt(void* _arg) {
    // Unpack args
@@ -52,10 +52,10 @@ void write_file_abt(void* _arg) {
    try {
        ADAFS_DATA->storage()->write_chunk(path, arg->chnk_id,
                arg->buf, arg->size, arg->off, arg->eventual);
    } catch (const std::exception& e){
    } catch (const std::system_error& serr){
        ADAFS_DATA->spdlogger()->error("{}() Error writing chunk {} of file {}", __func__, arg->chnk_id, path);
        auto wrote = 0;
        ABT_eventual_set(arg->eventual, &wrote, sizeof(size_t));
        ssize_t wrote = -(serr.code().value());
        ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t));
    }

}
@@ -79,7 +79,7 @@ struct read_chunk_args {
   ABT_eventual* eventual;
 * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon.
 * This function is called by tasklets, as this function cannot be allowed to block.
 * @return read_size<size_t> is put into eventual and returned that way
 * @return read_size<ssize_t> is put into eventual and returned that way
 */
void read_file_abt(void* _arg) {
    //unpack args
@@ -89,10 +89,10 @@ void read_file_abt(void* _arg) {
    try {
        ADAFS_DATA->storage()->read_chunk(path, arg->chnk_id,
                arg->buf, arg->size, arg->off, arg->eventual);
    } catch (const std::exception& e){
    } catch (const std::system_error& serr){
        ADAFS_DATA->spdlogger()->error("{}() Error reading chunk {} of file {}", __func__, arg->chnk_id, path);
        size_t read = 0;
        ABT_eventual_set(arg->eventual, &read, sizeof(size_t));
        ssize_t read = -(serr.code().value());
        ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t));
    }
}

@@ -128,7 +128,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    rpc_data_out_t out{};
    hg_bulk_t bulk_handle = nullptr;
    // default out for error
    out.res = EIO;
    out.err = EIO;
    out.io_size = 0;
    // Getting some information from margo
    auto ret = margo_get_input(handle, &in);
@@ -250,7 +250,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
        }
        // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value
        ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value
        auto& task_arg = task_args[chnk_id_curr];
        task_arg.path = path.get();
        task_arg.buf = bulk_buf_ptrs[chnk_id_curr];
@@ -277,33 +277,41 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    /*
     * 4. Read task results and accumulate in out.io_size
     */
    out.err = 0;
    out.io_size = 0;
    for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) {
        size_t* task_written_size;
        ssize_t* task_written_size = nullptr;
        // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler
        ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_written_size);
        if (task_written_size == nullptr || *task_written_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Writing file task for chunk {} failed and did return anything.",
        auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_written_size);
        if (abt_ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error(
                    "{}() Failed to wait for write task for chunk {}",
                    __func__, chnk_id_curr);
            /*
             * XXX We have to talk about how chunk errors are handled? Should we try to write again?
             * In any case we just ignore this for now and return the out.io_size with as much has been written
             * After all, we can decide on the semantics.
             */
        } else {
            out.io_size += *task_written_size; // add task written size to output size
            out.err = EIO;
            break;
        }
        assert(task_written_size != nullptr);
        if (*task_written_size < 0) {
            ADAFS_DATA->spdlogger()->error("{}() Write task failed for chunk {}",
                                           __func__, chnk_id_curr);
            out.err = -(*task_written_size);
            break;
        }

        out.io_size += *task_written_size; // add task written size to output size
        ABT_eventual_free(&task_eventuals[chnk_id_curr]);
    }

    // Sanity check to see if all data has been written
    if (in.total_chunk_size != out.io_size)
    if (in.total_chunk_size != out.io_size) {
        ADAFS_DATA->spdlogger()->warn("{}() total chunk size {} and out.io_size {} mismatch!", __func__,
                                      in.total_chunk_size, out.io_size);
    }

    /*
     * 5. Respond and cleanup
     */
    out.res = 0; // Set errorcode to succcess
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.err);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    // free tasks after responding
    for (auto&& task : abt_tasks) {
@@ -323,7 +331,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    rpc_data_out_t out{};
    hg_bulk_t bulk_handle = nullptr;
    // Set default out for error
    out.res = EIO;
    out.err = EIO;
    out.io_size = 0;
    // Getting some information from margo
    auto ret = margo_get_input(handle, &in);
@@ -421,7 +429,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
        }
        // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value
        ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value
        auto& task_arg = task_args[chnk_id_curr];
        task_arg.path = path.get();
        task_arg.buf = bulk_buf_ptrs[chnk_id_curr];
@@ -446,16 +454,29 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    /*
     * 4. Read task results and accumulate in out.io_size
     */
    out.err = 0;
    out.io_size = 0;
    for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) {
        size_t* task_read_size;
        ssize_t* task_read_size = nullptr;
        // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler
        ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size);

        auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size);
        if (abt_ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error(
                    "{}() Failed to wait for read task for chunk {}",
                    __func__, chnk_id_curr);
            out.err = EIO;
            break;
        }
        assert(task_read_size != nullptr);
        if(*task_read_size < 0){
            ADAFS_DATA->spdlogger()->warn(
                    "{}() Read task failed for chunk {}",
                    __func__, chnk_id_curr);
            out.err = -(*task_read_size);
            break;
        }
        
        if(*task_read_size == 0) {
            ADAFS_DATA->spdlogger()->warn("{}() Read task for chunk {} returned 0 bytes", __func__, chnk_id_curr);
            continue;
        }

@@ -466,19 +487,16 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
                    "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}",
                    __func__, chnk_id_curr, in.path, origin_offsets[chnk_id_curr], local_offsets[chnk_id_curr],
                    chnk_sizes[chnk_id_curr]);
            cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n);
            return rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
            out.err = EIO;
            break;
        }
        out.io_size += *task_read_size; // add task read size to output size
    }

    ADAFS_DATA->spdlogger()->trace("{}() total chunk size read {}/{}", __func__, out.io_size, in.total_chunk_size);

    /*
     * 5. Respond and cleanup
     */
    out.res = 0; // Set errorcode to succcess
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res);
    ADAFS_DATA->spdlogger()->debug("{}() Sending output response, err: {}", __func__, out.err);
    ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle);
    // free tasks after responding
    cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n);