Verified Commit 9fac7a91 authored by Marc Vef's avatar Marc Vef
Browse files

Daemon: read() uses tasklets with dedicated IO pool

parent bbf31409
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -12,6 +12,14 @@ struct write_chunk_args {
    off64_t off;
    ABT_eventual* eventual;
};
struct read_chunk_args {
    const std::string* path;
    char* buf;
    const rpc_chnk_id_t* chnk_id;
    size_t size;
    off64_t off;
    ABT_eventual* eventual;
};

std::string path_to_fspath(const std::string& path);

@@ -19,7 +27,7 @@ int init_chunk_space(const std::string& path);

int destroy_chunk_space(const std::string& path);

int read_file(const std::string& path, rpc_chnk_id_t chnk_id, size_t size, off_t off, char* buf, size_t& read_size);
void read_file_abt(void* _arg);

void write_file_abt(void* _arg);

+3 −3
Original line number Diff line number Diff line
@@ -21,15 +21,17 @@ bool init_environment() {
        ADAFS_DATA->spdlogger()->error("{}() Unable to initialize RocksDB.", __func__);
        return false;
    }
    // init margo
    // Init margo for RPC
    if (!init_rpc_server()) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Margo RPC server.", __func__);
        return false;
    }
    // Init margo for RPC
    if (!init_ipc_server()) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Margo IPC server.", __func__);
        return false;
    }
    // Init Argobots ESs to drive IO
    if (!init_io_tasklet_pool()) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Argobots pool for I/O.", __func__);
        return false;
@@ -92,7 +94,6 @@ void destroy_enviroment() {
bool init_io_tasklet_pool() {
    vector<ABT_xstream> io_streams_tmp(IO_THREADS);
    ABT_pool io_pools_tmp;
//    auto ret = ABT_snoozer_xstream_create(IO_THREADS, &RPC_DATA->io_pools_, RPC_DATA->io_streams_.data());
    auto ret = ABT_snoozer_xstream_create(IO_THREADS, &io_pools_tmp, io_streams_tmp.data());
    if (ret != ABT_SUCCESS) {
        ADAFS_DATA->spdlogger()->error(
@@ -101,7 +102,6 @@ bool init_io_tasklet_pool() {
    }
    RPC_DATA->io_streams(io_streams_tmp);
    RPC_DATA->io_pool(io_pools_tmp);

    return true;
}

+66 −37
Original line number Diff line number Diff line
@@ -88,7 +88,7 @@ void write_file_abt(void* _arg) {
        return;
    }
    // write file
    auto err = static_cast<size_t>(pwrite(fd, arg->buf, arg->size, arg->off));
    auto err = pwrite64(fd, arg->buf, arg->size, arg->off);
    if (err < 0) {
        ADAFS_DATA->spdlogger()->error("{}() Error {} while pwriting file {} chunk_id {} size {} off {}", __func__,
                                       strerror(errno), chnk_path.c_str(), *arg->chnk_id, arg->size, arg->off);
@@ -112,8 +112,7 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector
        auto chnk_id = static_cast<size_t*>(buf_ptrs[i]);
        auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]);
        auto chnk_size = buf_sizes[i + chnk_n];
        // Starting thread for parallel I/O
        // Note that the parallelism comes from abt-io not from the threads as they are ULTs in a sequential stream
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &eventuals[i]); // written file return value
        auto args = make_unique<write_chunk_args>();
        args->path = &path;
@@ -130,10 +129,10 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector
        }
    }
    for (unsigned int i = 0; i < chnk_n; i++) {
        size_t* thread_written_size;
        ABT_eventual_wait(eventuals[i], (void**) &thread_written_size);
        if (thread_written_size == nullptr || *thread_written_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Writing file thread {} did return nothing. NO ACTION WAS DONE",
        size_t* task_written_size;
        ABT_eventual_wait(eventuals[i], (void**) &task_written_size);
        if (task_written_size == nullptr || *task_written_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Writing file task {} did return nothing. NO ACTION WAS DONE",
                                           __func__, i);
//            // TODO How do we handle already written chunks? Ideally, we would need to remove them after failure.
//            ADAFS_DATA->spdlogger()->error("{}() Writing chunk failed with path {} and id {}. Aborting ...", __func__,
@@ -141,7 +140,7 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector
            write_size = 0;
            return -1;
        } else {
            write_size += *thread_written_size;
            write_size += *task_written_size;
        }
        ABT_eventual_free(&eventuals[i]);
    }
@@ -149,34 +148,41 @@ int write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector
}

/**
 *
 * @param path
 * @param chnk_id
 * @param size
 * @param off
 * @param [out] buf
 * @param [out] read_size
 * @return
 * Used by an argobots threads. Argument args has the following fields:
 * const std::string* path;
   char* buf;
   const rpc_chnk_id_t* chnk_id;
   size_t size;
   off64_t off;
   ABT_eventual* eventual;
 * Because abt-io uses tasklets, calling threads are suspended for the time of the I/O, allowing other tasks to do
 * work in the same ES. This can mean multiple read_file_abt() calls or other RPC calls
 * @return read_size<size_t> is put into eventual and returned that way
 */
int read_file(const string& path, const rpc_chnk_id_t chnk_id, const size_t size, const off64_t off, char* buf,
              size_t& read_size) {
    auto fs_path = path_to_fspath(path);
void read_file_abt(void* _arg) {
    size_t read_size = 0;
    //unpack args
    auto* arg = static_cast<struct read_chunk_args*>(_arg);
    auto fs_path = path_to_fspath(*arg->path);
    auto chnk_path = bfs::path(ADAFS_DATA->chunk_path());
    chnk_path /= fs_path;
    chnk_path /= fmt::FormatInt(chnk_id).c_str();;
    chnk_path /= fmt::FormatInt(*arg->chnk_id).c_str();;

    int fd = open(chnk_path.c_str(), R_OK);
    if (fd < 0)
        return EIO;
    auto err = pread64(fd, buf, size, off);
    if (fd < 0) {
        read_size = static_cast<size_t>(EIO);
        ABT_eventual_set(*(arg->eventual), &read_size, sizeof(size_t));
        return;
    }
    auto err = pread64(fd, arg->buf, arg->size, arg->off);
    if (err < 0) {
        ADAFS_DATA->spdlogger()->error("{}() Error {} while preading file {} chunk_id {} size {} off {}", __func__,
                                       strerror(errno), chnk_path.c_str(), chnk_id, size, off);
        read_size = 0;
    } else
                                       strerror(errno), chnk_path.c_str(), *arg->chnk_id, arg->size, arg->off);
    } else {
        read_size = static_cast<size_t>(err); // This is cast safe
    }
    close(fd);
    return 0;
    ABT_eventual_set(*(arg->eventual), &read_size, sizeof(size_t));
}

int read_chunks(const string& path, const off64_t offset, const vector<void*>& buf_ptrs,
@@ -184,22 +190,45 @@ int read_chunks(const string& path, const off64_t offset, const vector<void*>& b
                size_t& read_size) {
    read_size = 0;
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
    auto chnk_n = buf_sizes.size() / 2;
    // TODO this can be parallized
    auto chnk_n = static_cast<unsigned int>(buf_sizes.size() / 2); // Case-safe: There never are so many chunks at once
    vector<ABT_eventual> eventuals(chnk_n);
    vector<unique_ptr<struct read_chunk_args>> thread_args(chnk_n);
    for (size_t i = 0; i < chnk_n; i++) {
        auto chnk_id = *(static_cast<size_t*>(buf_ptrs[i]));
        auto chnk_id = static_cast<size_t*>(buf_ptrs[i]);
        auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]);
        auto chnk_size = buf_sizes[i + chnk_n];
        size_t read_chnk_size;
        // read_file but only first chunk can have an offset
        if (read_file(path, chnk_id, chnk_size, (i == 0) ? offset : 0, chnk_ptr, read_chnk_size) != 0) {
            // TODO How do we handle errors?
            ADAFS_DATA->spdlogger()->error("{}() read chunk failed with path {} and id {}. Aborting ...", __func__,
                                           path, chnk_id);
        // Starting tasklets for parallel I/O
        ABT_eventual_create(sizeof(size_t), &eventuals[i]); // written file return value
        auto args = make_unique<read_chunk_args>();
        args->path = &path;
        args->buf = chnk_ptr;
        args->chnk_id = chnk_id;
        args->size = chnk_size;
        // only the first chunk gets the offset. the chunks are sorted on the client side
        args->off = (i == 0 ? offset : 0);
        args->eventual = &eventuals[i];
        thread_args[i] = std::move(args);
        auto ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*thread_args[i]), nullptr);
        if (ret != ABT_SUCCESS) {
            ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__);
        }
    }

    for (unsigned int i = 0; i < chnk_n; i++) {
        size_t* task_read_size;
        ABT_eventual_wait(eventuals[i], (void**) &task_read_size);
        if (task_read_size == nullptr || *task_read_size == 0) {
            ADAFS_DATA->spdlogger()->error("{}() Reading file task {} did return nothing. NO ACTION WAS DONE",
                                           __func__, i);
//            // TODO How do we handle errors?
//            ADAFS_DATA->spdlogger()->error("{}() read chunk failed with path {} and id {}. Aborting ...", __func__,
//                                           path, chnk_id);
            read_size = 0;
            return -1;
        } else {
            read_size += *task_read_size;
        }
        read_size += read_chnk_size;
        ABT_eventual_free(&eventuals[i]);
    }
    return 0;
}
 No newline at end of file