Commit 27839240 authored by Marc Vef's avatar Marc Vef Committed by Alberto Miranda
Browse files

Fixing index loop bug during I/O on the daemon

parent 07f62710
Loading
Loading
Loading
Loading
+25 −9
Original line number Diff line number Diff line
@@ -48,8 +48,9 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) {
    auto hgi = margo_get_info(handle);
    auto mid = margo_hg_info_get_instance(hgi);
    auto bulk_size = margo_bulk_get_size(in.bulk_handle);
    GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__,
                                  in.path, bulk_size, in.offset);
    GKFS_DATA->spdlogger()->debug(
            "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
            __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset);
    /*
     * 2. Set up buffers for pull bulk transfers
     */
@@ -98,14 +99,20 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) {
    uint64_t local_offset;
    // object for asynchronous disk IO
    gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n};

    /*
     * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk
     */
    // Start to look for a chunk that hashes to this host with the first chunk in the buffer
    for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
    for (auto chnk_id_file = in.chunk_start;
         chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) {
        // Continue if chunk does not hash to this host
        if (distributor.locate_data(in.path, chnk_id_file) != host_id)
        if (distributor.locate_data(in.path, chnk_id_file) != host_id) {
            GKFS_DATA->spdlogger()->trace(
                    "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
                    __func__, chnk_id_file, host_id, chnk_id_curr);
            continue;
        }
        chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
        // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host
        if (chnk_id_file == in.chunk_start && in.offset > 0) {
@@ -172,6 +179,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) {

    }
    // Sanity check that all chunks where detected in previous loop
    // TODO don't proceed if that happens.
    if (chnk_size_left_host != 0)
        GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__,
                                     chnk_size_left_host);
@@ -221,8 +229,9 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) {
    auto hgi = margo_get_info(handle);
    auto mid = margo_hg_info_get_instance(hgi);
    auto bulk_size = margo_bulk_get_size(in.bulk_handle);
    GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__,
                                  in.path, bulk_size, in.offset);
    GKFS_DATA->spdlogger()->debug(
            "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
            __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset);

    /*
     * 2. Set up buffers for pull bulk transfers
@@ -268,10 +277,15 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) {
     * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk
     */
    // Start to look for a chunk that hashes to this host with the first chunk in the buffer
    for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) {
    for (auto chnk_id_file = in.chunk_start;
         chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) {
        // Continue if chunk does not hash to this host
        if (distributor.locate_data(in.path, chnk_id_file) != host_id)
        if (distributor.locate_data(in.path, chnk_id_file) != host_id) {
            GKFS_DATA->spdlogger()->trace(
                    "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
                    __func__, chnk_id_file, host_id, chnk_id_curr);
            continue;
        }
        chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list
        // Only relevant in the first iteration of the loop and if the chunk hashes to this host
        if (chnk_id_file == in.chunk_start && in.offset > 0) {
@@ -309,7 +323,8 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) {
        }
        try {
            // start tasklet for read operation
            chunk_read_op.read_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0);
            chunk_read_op.read_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr],
                                     chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0);
        } catch (const gkfs::data::ChunkReadOpException& e) {
            // This exception is caused by setup of Argobots variables. If this fails, something is really wrong
            GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what());
@@ -318,6 +333,7 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) {
        chnk_id_curr++;
    }
    // Sanity check that all chunks where detected in previous loop
    // TODO error out. If we continue this will crash the server when sending results back that don't exist.
    if (chnk_size_left_host != 0)
        GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__,
                                     chnk_size_left_host);