Commit dcefd2a7 authored by Marc Vef's avatar Marc Vef
Browse files

Merge branch '34-remove-all-chunks-when-a-file-is-removed' into 'master'

Resolve "Remove all chunks when a file is removed"

Closes #34

See merge request zdvresearch_bsc/adafs!22
parents f165612d 67f85829
Loading
Loading
Loading
Loading
+1 −28
Original line number Diff line number Diff line
@@ -17,45 +17,18 @@ std::string path_to_fspath(const std::string& path) {
    return fs_path;
}

/**
 * Creates the directory in the chunk dir for a file to hold data
 * @param inode
 * @return
 */
// XXX this might be just a temp function as long as we don't use chunks
// XXX this function creates not only the chunk folder but also a single file which holds the data of the 'real' file
int init_chunk_space(const std::string& path) {
    auto fs_path = path_to_fspath(path);

    auto chnk_path = bfs::path(ADAFS_DATA->chunk_path());
    chnk_path /= fs_path;

    // create chunk dir
    bfs::create_directories(chnk_path);

    // XXX create temp big file. remember also to modify the return value
    chnk_path /= "data"s;
    bfs::ofstream ofs{chnk_path};

//    return static_cast<int>(bfs::exists(chnk_path));
    return 0;
}
/**
 * Remove the directory in the chunk dir of a file.
 * @param inode
 * @return
 */
// XXX this might be just a temp function as long as we don't use chunks
int destroy_chunk_space(const std::string& path) {
    auto fs_path = path_to_fspath(path);

    auto chnk_path = bfs::path(ADAFS_DATA->chunk_path());
    chnk_path /= fs_path;

    // create chunk dir
    // remove chunk dir with all contents if path exists
    bfs::remove_all(chnk_path);

//    return static_cast<int>(!bfs::exists(chnk_path));
    return 0;
}

+17 −24
Original line number Diff line number Diff line
@@ -5,31 +5,12 @@

using namespace std;

static const std::string dentry_val_delim = ","s; // XXX this needs to be global.

ino_t generate_inode_no() {
    std::lock_guard<std::mutex> inode_lock(ADAFS_DATA->inode_mutex);
    // TODO check that our inode counter is within boundaries of inode numbers in the given node
    return ADAFS_DATA->raise_inode_count(1);
}

/**
 * Creates a file system node of any type (such as file or directory)
 * @param path
 * @param uid
 * @param gid
 * @param mode
 * @return
 */
int create_node(const std::string& path, const uid_t uid, const gid_t gid, mode_t mode) {
    auto err = create_metadentry(path, mode); // XXX errorhandling

    // XXX Only do that for files and not for directories
    init_chunk_space(path);

    return err;
}

/**
 * Creates metadata (if required) and dentry at the same time
 * @param path
@@ -78,16 +59,28 @@ int get_metadentry(const std::string& path, Metadata& md) {
    return 0;
}

/**
 * Wrapper to remove a KV store entry with the path as key
 * @param path
 * @return
 */
int remove_metadentry(const string& path) {
    return db_delete_metadentry(path) ? 0 : -1;
}

/**
 * Remove metadentry if exists and try to remove all chunks for path
 * @param path
 * @return
 */
int remove_node(const string& path) {
    auto err = remove_metadentry(path);
    // XXX Only do that with a file. Directory needs to be handled differently
    if (err == 0)
        destroy_chunk_space(
                path); // XXX This removes only the data on that node. Leaving everything in inconsistent state
    int err = 0; // assume we succeed
    Metadata md{};
    // If metadentry exists, try to remove it
    if (get_metadentry(path, md) == 0) {
        err = remove_metadentry(path);
    }
    destroy_chunk_space(path); // destroys all chunks for the path on this node
    return err;
}

+1 −1
Original line number Diff line number Diff line
@@ -131,7 +131,7 @@ static hg_return_t rpc_srv_rm_node(hg_handle_t handle) {
    assert(ret == HG_SUCCESS);
    ADAFS_DATA->spdlogger()->debug("Got remove node RPC with path {}", in.path);

    // do remove
    // Remove metadentry if exists on the node but also remove all chunks for that path
    out.err = remove_node(in.path);

    ADAFS_DATA->spdlogger()->debug("Sending output {}", out.err);
+5 −0
Original line number Diff line number Diff line
@@ -44,6 +44,11 @@ int adafs_mk_node(const std::string& path, const mode_t mode) {
    return rpc_send_mk_node(path, mode);
}

/**
 * This sends internally a broadcast (i.e. n RPCs) to clean their chunk folders for that path
 * @param path
 * @return
 */
int adafs_rm_node(const std::string& path) {
    init_ld_env_if_needed();
    return rpc_send_rm_node(path);
+45 −27
Original line number Diff line number Diff line
@@ -164,44 +164,62 @@ int rpc_send_stat(const std::string& path, string& attr) {
}

int rpc_send_rm_node(const std::string& path) {
    rpc_rm_node_in_t in{};
    rpc_err_out_t out{};
    hg_handle_t handle;
    int err = EUNKNOWN;
    hg_return_t ret;
    int err = 0; // assume we succeed

    ld_logger->debug("{}() Creating Mercury handles for all nodes ...", __func__);
    vector<hg_handle_t> rpc_handles(fs_config->host_size);
    vector<margo_request> rpc_waiters(fs_config->host_size);
    vector<rpc_rm_node_in_t> rpc_in(fs_config->host_size);
    // Send rpc to all nodes as all of them can have chunks for this path
    for (size_t i = 0; i < fs_config->host_size; i++) {
        // fill in
    in.path = path.c_str();
        rpc_in[i].path = path.c_str();
        // create handle
        ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, i, rpc_handles[i], false);
        if (ret != HG_SUCCESS) {
            ld_logger->warn("{}() Unable to create Mercury handle", __func__);
            // We use continue here to remove at least some data
            // XXX In the future we can discuss RPC retrying. This should be a function to be used in general
            errno = EBUSY;
            err = -1;
        }
        // send async rpc
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->warn("{}() Unable to create Mercury handle", __func__);
            errno = EBUSY;
            err = -1;
        }
    }

    ld_logger->debug("{}() Creating Mercury handle ...", __func__);
    auto ret = margo_create_wrap(ipc_rm_node_id, rpc_rm_node_id, path, handle, false);
    // Wait for RPC responses and then get response
    for (size_t i = 0; i < fs_config->host_size; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            ld_logger->warn("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, i);
            errno = EBUSY;
        return -1;
            err = -1;
        }
    // Send rpc
#if defined(MARGO_FORWARD_TIMER)
    ret = margo_forward_timed_wrap_timer(handle, &in, __func__);
#else
    ret = margo_forward_timed_wrap(handle, &in);
#endif
    // Get response
    if (ret == HG_SUCCESS) {
        ld_logger->trace("{}() Waiting for response", __func__);
        ret = margo_get_output(handle, &out);
        rpc_err_out_t out{};
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret == HG_SUCCESS) {
            ld_logger->debug("{}() Got response success: {}", __func__, out.err);
            err = out.err;
            if (err != 0) {
                errno = out.err;
                err = -1;
            }
        } else {
            // something is wrong
            errno = EBUSY;
            err = -1;
            ld_logger->error("{}() while getting rpc output", __func__);
        }
        /* clean up resources consumed by this rpc */
        margo_free_output(handle, &out);
    } else {
        ld_logger->warn("{}() timed out");
        errno = EBUSY;
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    margo_destroy(handle);
    return err;
}