Commit 25e84b1e authored by Marc Vef's avatar Marc Vef
Browse files

Finished up chunking algorithm + clean up

parent 8f4a2424
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -14,12 +14,12 @@ int destroy_chunk_space(const std::string& path);
int read_file(const std::string& path, rpc_chnk_id_t chnk_id, size_t size, off_t off, char* buf, size_t& read_size);

int write_file(const std::string& path, const char* buf, rpc_chnk_id_t chnk_id, size_t size, off_t off,
               bool append, off_t updated_size, size_t& write_size);
               size_t& write_size);

int write_chunks(const std::string& path, const std::vector<void*>& buf_ptrs, const std::vector<hg_size_t>& buf_sizes,
                 const off_t offset, size_t& write_size);
                 off_t offset, size_t& write_size);

int read_chunks(const std::string& path, const off_t offset, const std::vector<void*>& buf_ptrs,
int read_chunks(const std::string& path, off_t offset, const std::vector<void*>& buf_ptrs,
                const std::vector<hg_size_t>& buf_sizes, size_t& read_size);

#endif //IFS_DATA_HPP
+1 −1
Original line number Diff line number Diff line
@@ -15,13 +15,13 @@ extern "C" {

#include <iostream>

// XXX these two structs can be merged. How to deal with const void* then?
struct write_args {
    std::shared_ptr<std::string> path;
    size_t in_size;
    off_t in_offset;
    const void* buf;
    size_t chnk_start;
    off_t updated_size;
    std::vector<unsigned long>* chnk_ids;
    size_t recipient;
    ABT_eventual* eventual;
+1 −3
Original line number Diff line number Diff line
@@ -73,9 +73,7 @@ MERCURY_GEN_PROC(rpc_data_out_t,
MERCURY_GEN_PROC(rpc_write_data_in_t,
                 ((hg_const_string_t) (path))\
((int64_t) (offset))\
((hg_bool_t) (append))\
((hg_bulk_t) (bulk_handle))\
((hg_int64_t) (updated_size)))
((hg_bulk_t) (bulk_handle)))



+18 −22
Original line number Diff line number Diff line
@@ -72,7 +72,7 @@ int destroy_chunk_space(const std::string& path) {
 * @return
 */
int write_file(const string& path, const char* buf, const rpc_chnk_id_t chnk_id, const size_t size, const off_t off,
               const bool append, const off_t updated_size, size_t& write_size) {
               size_t& write_size) {
    auto fs_path = path_to_fspath(path);
    auto chnk_path = bfs::path(ADAFS_DATA->chunk_path());
    chnk_path /= fs_path;
@@ -82,23 +82,14 @@ int write_file(const string& path, const char* buf, const rpc_chnk_id_t chnk_id,
    int fd = open(chnk_path.c_str(), O_WRONLY | O_CREAT, 0777);
    if (fd < 0)
        return EIO;
    if (append) // write at updated_size - size as this is the offset that the EOF corresponds to.
        write_size = static_cast<size_t>(pwrite(fd, buf, size, (updated_size - size)));
    else
        write_size = static_cast<size_t>(pwrite(fd, buf, size, off));
    auto err = static_cast<size_t>(pwrite(fd, buf, size, off));
    if (err < 0) {
        ADAFS_DATA->spdlogger()->error("{}() Error {} while pwriting file {} chunk_id {} size {} off {}", __func__,
                                       strerror(errno), chnk_path.c_str(), chnk_id, size, off);
        write_size = 0;
    } else
        write_size = static_cast<size_t>(err); // This is cast safe
    close(fd);
    // XXX DO WE NEED THE BELOW CODE?
    // Depending on if the file was appended or not metadata sizes need to be modified accordingly

    if (append) {
        // Metadata was already updated by the client before the write operation
        // truncating file
        truncate(chnk_path.c_str(), updated_size);
    }
//    else {
//        truncate(chnk_path.c_str(), size); // file is rewritten, thus, only written size is kept
//    }

    return 0;
}

@@ -106,7 +97,7 @@ int
write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_size_t>& buf_sizes, const off_t offset,
             size_t& write_size) {
    write_size = 0;
    auto err = static_cast<int>(0);
    int err;
    // buf sizes also hold chnk ids. we only want to keep calculate the actual chunks
    auto chnk_n = buf_sizes.size() / 2;
    // TODO this can be parallized
@@ -115,11 +106,10 @@ write_chunks(const string& path, const vector<void*>& buf_ptrs, const vector<hg_
        auto chnk_ptr = static_cast<char*>(buf_ptrs[i + chnk_n]);
        auto chnk_size = buf_sizes[i + chnk_n];
        size_t written_chnk_size;
        // TODO 5,6,7 params (append and offset stuff)
        if (i == 0) // only the first chunk gets the offset. the chunks are sorted on the client site
            err = write_file(path, chnk_ptr, chnk_id, chnk_size, offset, false, 0, written_chnk_size);
            err = write_file(path, chnk_ptr, chnk_id, chnk_size, offset, written_chnk_size);
        else
            err = write_file(path, chnk_ptr, chnk_id, chnk_size, 0, false, 0, written_chnk_size);
            err = write_file(path, chnk_ptr, chnk_id, chnk_size, 0, written_chnk_size);
        if (err != 0) {
            // TODO How do we handle already written chunks? Ideally, we would need to remove them after failure.
            ADAFS_DATA->spdlogger()->error("{}() Writing chunk failed with path {} and id {}. Aborting ...", __func__,
@@ -152,7 +142,13 @@ int read_file(const string& path, const rpc_chnk_id_t chnk_id, const size_t size
    int fd = open(chnk_path.c_str(), R_OK);
    if (fd < 0)
        return EIO;
    read_size = static_cast<size_t>(pread(fd, buf, size, off));
    auto err = pread(fd, buf, size, off);
    if (err < 0) {
        ADAFS_DATA->spdlogger()->error("{}() Error {} while preading file {} chunk_id {} size {} off {}", __func__,
                                       strerror(errno), chnk_path.c_str(), chnk_id, size, off);
        read_size = 0;
    } else
        read_size = static_cast<size_t>(err); // This is cast safe
    close(fd);
    return 0;
}
+0 −1
Original line number Diff line number Diff line
@@ -182,7 +182,6 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off_t offset) {
        args->in_offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset
        args->buf = buf;// pointer to write buffer
        args->chnk_start = chnk_start;// append flag when file was opened
        args->updated_size = updated_size;// for append truncate TODO needed?
        args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination
        args->recipient = dest_idx[i];// recipient
        args->eventual = &eventuals[i];// pointer to an eventual which has allocated memory for storing the written size
Loading