preload_util.cpp 16.1 KiB
Newer Older
Marc Vef's avatar
Marc Vef committed
#include <preload/preload_util.hpp>
#include <dirent.h>
#include <fstream>
#include <iterator>
#include <sstream>
static const std::string dentry_val_delim = ","s;
/*
 * TODO: Setting our file descriptor index to a specific value is dangerous because we might clash with the kernel.
 * E.g., if we would passthrough and not intercept and the kernel assigns a file descriptor but we will later use
 * the same fd value, we will intercept calls that were supposed to be going to the kernel. This works the other way around too.
 * To mitigate this issue, we set the initial fd number to a high value. We "hope" that we do not clash but this is no permanent solution.
 * Note: This solution will probably work well already for many cases because kernel fd values are reused, unlike to ours.
 * The only case where we will clash with the kernel is, if one process has more than 100000 files open at the same time.
 */
static int fd_idx = 100000;
std::atomic<bool> fd_validation_needed(false);

/**
 * Generate new file descriptor index to be used as an fd within one process in ADA-FS
 * @return fd_idx
 */
int generate_fd_idx() {
    // We need a mutex here for thread safety
    std::lock_guard<std::mutex> inode_lock(fd_idx_mutex);
    if (fd_idx == std::numeric_limits<int>::max()) {
        ld_logger->info("{}() File descriptor index exceeded ints max value. Setting it back to 100000", __func__);
        /*
         * Setting fd_idx back to 3 could have the effect that fd are given twice for different path.
         * This must not happen. Instead a flag is set which tells can tell the OpenFileMap that it should check
         * if this fd is really safe to use.
         */
        fd_validation_needed = true;
    }
    return fd_idx++;
}

int get_fd_idx() {
    std::lock_guard<std::mutex> inode_lock(fd_idx_mutex);
    return fd_idx;
}

bool is_fs_path(const char* path) {
    return strstr(path, fs_config->mountdir.c_str()) == path;
// TODO merge the two stat functions
/**
 * Converts the dentry db value into a stat struct, which is needed by Linux
 * @param path
 * @param db_val
 * @param attr
 * @return
 */
int db_val_to_stat(const std::string path, std::string db_val, struct stat& attr) {

    auto pos = db_val.find(dentry_val_delim);
    if (pos == std::string::npos) { // no delimiter found => no metadata enabled. fill with dummy values
        attr.st_ino = std::hash<std::string>{}(path);
        attr.st_mode = static_cast<unsigned int>(stoul(db_val));
        attr.st_nlink = 1;
        attr.st_uid = fs_config->uid;
        attr.st_gid = fs_config->gid;
        attr.st_size = 0;
        attr.st_blksize = BLOCKSIZE;
        attr.st_blocks = 0;
        attr.st_atim.tv_sec = 0;
        attr.st_mtim.tv_sec = 0;
        attr.st_ctim.tv_sec = 0;
        return 0;
    }
    // some metadata is enabled: mode is always there
    attr.st_mode = static_cast<unsigned int>(stoul(db_val.substr(0, pos)));
    db_val.erase(0, pos + 1);
    // size is also there XXX
    pos = db_val.find(dentry_val_delim);
    if (pos != std::string::npos) {  // delimiter found. more metadata is coming
        attr.st_size = stol(db_val.substr(0, pos));
        db_val.erase(0, pos + 1);
    } else {
        attr.st_size = stol(db_val);
        return 0;
    }
    // The order is important. don't change.
    if (fs_config->atime_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_atim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->mtime_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_mtim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->ctime_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_ctim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->uid_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_uid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->gid_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_gid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->inode_no_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_ino = static_cast<ino_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->link_cnt_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_nlink = static_cast<nlink_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->blocks_state) { // last one will not encounter a delimiter anymore
        attr.st_blocks = static_cast<blkcnt_t>(stoul(db_val));
    }
    return 0;
}

/**
 * Converts the dentry db value into a stat64 struct, which is needed by Linux
 * @param path
 * @param db_val
 * @param attr
 * @return
 */
int db_val_to_stat64(const std::string path, std::string db_val, struct stat64& attr) {

    auto pos = db_val.find(dentry_val_delim);
    if (pos == std::string::npos) { // no delimiter found => no metadata enabled. fill with dummy values
        attr.st_ino = std::hash<std::string>{}(path);
        attr.st_mode = static_cast<unsigned int>(stoul(db_val));
        attr.st_nlink = 1;
        attr.st_uid = fs_config->uid;
        attr.st_gid = fs_config->gid;
        attr.st_size = 0;
        attr.st_blksize = BLOCKSIZE;
        attr.st_blocks = 0;
        attr.st_atim.tv_sec = 0;
        attr.st_mtim.tv_sec = 0;
        attr.st_ctim.tv_sec = 0;
        return 0;
    }
    // some metadata is enabled: mode is always there
    attr.st_mode = static_cast<unsigned int>(stoul(db_val.substr(0, pos)));
    db_val.erase(0, pos + 1);
    // size is also there XXX
    pos = db_val.find(dentry_val_delim);
    if (pos != std::string::npos) {  // delimiter found. more metadata is coming
        attr.st_size = stol(db_val.substr(0, pos));
        db_val.erase(0, pos + 1);
    } else {
        attr.st_size = stol(db_val);
        return 0;
    }
    // The order is important. don't change.
    if (fs_config->atime_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_atim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->mtime_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_mtim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->ctime_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_ctim.tv_sec = static_cast<time_t>(stol(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->uid_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_uid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->gid_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_gid = static_cast<uid_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->inode_no_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_ino = static_cast<ino_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->link_cnt_state) {
        pos = db_val.find(dentry_val_delim);
        attr.st_nlink = static_cast<nlink_t>(stoul(db_val.substr(0, pos)));
        db_val.erase(0, pos + 1);
    }
    if (fs_config->blocks_state) { // last one will not encounter a delimiter anymore
        attr.st_blocks = static_cast<blkcnt_t>(stoul(db_val));
    }
    return 0;
/**
 * Returns the process id for a process name
 * @param procName
 * @return
 */
int getProcIdByName(string procName) {
    int pid = -1;

    // Open the /proc directory
    DIR* dp = opendir("/proc");
    if (dp != nullptr) {
        // Enumerate all entries in directory until process found
        struct dirent* dirp;
        while (pid < 0 && (dirp = readdir(dp))) {
            // Skip non-numeric entries
            int id = atoi(dirp->d_name);
            if (id > 0) {
                // Read contents of virtual /proc/{pid}/cmdline file
                string cmdPath = string("/proc/") + dirp->d_name + "/cmdline";
                ifstream cmdFile(cmdPath.c_str());
                string cmdLine;
                getline(cmdFile, cmdLine);
                if (!cmdLine.empty()) {
                    // Keep first cmdline item which contains the program path
                    size_t pos = cmdLine.find('\0');
                    if (pos != string::npos)
                        cmdLine = cmdLine.substr(0, pos);
                    // Keep program name only, removing the path
                    pos = cmdLine.rfind('/');
                    if (pos != string::npos)
                        cmdLine = cmdLine.substr(pos + 1);
                    // Compare against requested process name
                    if (procName == cmdLine)
                        pid = id;
                }
            }
        }
    }

    closedir(dp);

    return pid;
/**
 * Returns the path where daemon process writes information for the running clients
 * @return string
 */
string daemon_register_path(int pid) {
    return (DAEMON_AUX_PATH + "/daemon_"s + to_string(pid) + ".run"s);
}

bool get_daemon_auxiliaries() {
    auto ret = false;
    auto adafs_daemon_pid = getProcIdByName("adafs_daemon"s);
    if (adafs_daemon_pid == -1) {
        ld_logger->error("{}() ADA-FS daemon not started. Exiting ...", __func__);
        perror("ADA-FS daemon not started. Exiting ...");
        return false;
    }
    ifstream ifs(daemon_register_path(adafs_daemon_pid).c_str(), ::ifstream::in);
    string mountdir;
    if (ifs) {
        getline(ifs, mountdir);
        if (!mountdir.empty()) {
            ret = true;
            fs_config->mountdir = mountdir;
        } else {
            ld_logger->error("{}() Error reading daemon auxiliary file", __func__);
        }
    }
    if (ifs.bad()) {
        perror("Error opening file to register daemon process");
        ld_logger->error("{}() Error opening file to daemon auxiliary file", __func__);
        return false;
    }
    ifs.close();

    return ret;
}

/**
 * Read /etc/hosts and put hostname - ip association into a map in fs config.
 * We are working with hostnames but some network layers (such as Omnipath) does not look into /etc/hosts.
 * Hence, we have to store the mapping ourselves.
 * @return success
 */
bool read_system_hostfile() {
    ifstream hostfile("/etc/hosts");
    if (!hostfile.is_open())
        return false;
    string line;
    map<string, string> sys_hostfile;
    while (getline(hostfile, line)) {
        if (line.empty() || line == "\n" || line.at(0) == '#')
            continue;
        std::istringstream iss(line);
        std::vector<string> tmp_list((istream_iterator<string>(iss)), istream_iterator<string>());
        for (unsigned int i = 1; i < tmp_list.size(); i++) {
            if (tmp_list[i].find(HOSTNAME_SUFFIX) != string::npos)
                sys_hostfile.insert(make_pair(tmp_list[i], tmp_list[0]));
        }
    }
    fs_config->sys_hostfile = sys_hostfile;
    ld_logger->info("{}() /etc/hosts successfully mapped into ADA-FS", __func__);
    return true;
}

/**
 * Creates an abstract rpc address for a given hostid and puts it into an address cache map
 * @param hostid
 * @param svr_addr
 * @return
 */
bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {
    if (rpc_address_cache.tryGet(hostid, svr_addr)) {
        ld_logger->trace("tryGet successful and put in svr_addr");
        //found
        return true;
    } else {
        // not found, manual lookup and add address mapping to LRU cache
        ld_logger->trace("not found in lrucache");
        string remote_addr;
        // Try to get the ip of remote addr. If it cannot be found, use hostname
        // first get the hostname with the hostid
        auto hostname = fs_config->hosts.at(hostid) + HOSTNAME_SUFFIX;
        // then get the ip address from /etc/hosts which is mapped to the sys_hostfile map
        if (fs_config->sys_hostfile.count(hostname) == 1) {
            auto remote_ip = fs_config->sys_hostfile.at(hostname);
            remote_addr = RPC_PROTOCOL + "://"s + remote_ip + ":"s + fs_config->rpc_port;
        if (remote_addr.empty()) {
            remote_addr = RPC_PROTOCOL + "://"s + hostname + ":"s +
                          fs_config->rpc_port; // convert hostid to remote_addr and port
        }
        ld_logger->trace("generated remote_addr {} for hostname {} with rpc_port {}",
                         remote_addr, hostname, fs_config->rpc_port);
        // try to look up 3 times before erroring out
        hg_return_t ret;
        // TODO If this is solution is somewhat helpful, write a more versatile solution
        for (unsigned int i = 0; i < 3; i++) {
            ret = margo_addr_lookup(ld_margo_rpc_id, remote_addr.c_str(), &svr_addr);
                // still not working after 5 tries.
                if (i == 4) {
                    ld_logger->error("{}() Unable to lookup address {} from host {}", __func__,
                                     remote_addr, fs_config->hosts.at(fs_config->host_id));
        if (svr_addr == HG_ADDR_NULL) {
            ld_logger->error("{}() looked up address is NULL for address {} from host {}", __func__,
                             remote_addr, fs_config->hosts.at(fs_config->host_id));
        rpc_address_cache.insert(hostid, svr_addr);
        return true;
    }
}

/**
 * Determines the node id for a given path
 * @param to_hash
 * @return
 */
size_t get_rpc_node(const string& to_hash) {
    return std::hash<string>{}(to_hash) % fs_config->host_size;
}

/**
 * Determines if the recipient id in an RPC is refering to the local or an remote node
 * @param recipient
 * @return
 */
bool is_local_op(const size_t recipient) {
    return recipient == fs_config->host_id;
}

Marc Vef's avatar
Marc Vef committed
inline hg_return
margo_create_wrap_helper(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t recipient, hg_handle_t& handle,
                         hg_addr_t& svr_addr, bool force_rpc) {
    hg_return_t ret;
    if (is_local_op(recipient) && !force_rpc) { // local
        ret = margo_create(ld_margo_ipc_id, daemon_svr_addr, ipc_id, &handle);
        ld_logger->debug("{}() to local daemon (IPC)", __func__);
    } else { // remote
        // TODO HG_ADDR_T is never freed atm. Need to change LRUCache
        if (!get_addr_by_hostid(recipient, svr_addr)) {
            ld_logger->error("{}() server address not resolvable for host id {}", __func__, recipient);
            return HG_OTHER_ERROR;
        }
        ret = margo_create(ld_margo_rpc_id, svr_addr, rpc_id, &handle);
        ld_logger->debug("{}() to remote daemon (RPC)", __func__);
    }
    if (ret != HG_SUCCESS) {
        ld_logger->error("{}() creating handle FAILED", __func__);
        return HG_OTHER_ERROR;
    }
    return ret;
Marc Vef's avatar
Marc Vef committed
}

/**
 * Wraps certain margo functions to create a Mercury handle
 * @param ipc_id
 * @param rpc_id
 * @param path
 * @param handle
 * @param svr_addr
 * @return
 */
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const std::string& path, hg_handle_t& handle,
                            hg_addr_t& svr_addr, bool force_rpc) {
    return margo_create_wrap_helper(ipc_id, rpc_id, get_rpc_node(path), handle, svr_addr, force_rpc);
}

/**
 * Wraps certain margo functions to create a Mercury handle
 * @param ipc_id
 * @param rpc_id
 * @param recipient
 * @param handle
 * @param svr_addr
 * @return
 */
template<>
hg_return margo_create_wrap(const hg_id_t ipc_id, const hg_id_t rpc_id, const size_t& recipient, hg_handle_t& handle,
                            hg_addr_t& svr_addr, bool force_rpc) {
    return margo_create_wrap_helper(ipc_id, rpc_id, recipient, handle, svr_addr, force_rpc);