Commit db8883fd authored by Jean Bez's avatar Jean Bez
Browse files

use new custom callback functions from AGIOS to speed up the flow

parent 184595ce
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ void agios_shutdown();

void *agios_callback(int64_t request_id);
void *agios_callback_aggregated(int64_t *requests, int32_t total);
void *agios_eventual_callback(int64_t request_id, void *info);

unsigned long long int generate_unique_id();

+1 −1
Original line number Diff line number Diff line
@@ -341,7 +341,7 @@ fi

# get AGIOS
if check_dependency "agios" "${DEP_CONFIG[@]}"; then
clonedeps "agios" "https://github.com/francielizanon/agios.git" "-b development" &
clonedeps "agios" "https://github.com/jeanbez/agios.git" "" "-b gekkofs" &
fi

# get date
+30 −12
Original line number Diff line number Diff line
@@ -38,8 +38,12 @@ struct write_chunk_args {
    ABT_eventual eventual;
};

/*
#ifdef GKFS_ENABLE_AGIOS
unordered_map<unsigned long long int, ABT_eventual> eventual_requests;
pthread_mutex_t eventual_requests_lock;
#endif
*/

/**
 * Used by an argobots threads. Argument args has the following fields:
@@ -156,17 +160,17 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) {
    ABT_eventual eventual = ABT_EVENTUAL_NULL;

    /* creating eventual */
    ABT_eventual_create(sizeof(unsigned long long int), &eventual);
    ABT_eventual_create(sizeof(int64_t), &eventual);

    unsigned long long int request_id = generate_unique_id();
    char *agios_path = (char*) in.path;

    pthread_mutex_lock(&eventual_requests_lock);
    eventual_requests[request_id] = eventual;
    pthread_mutex_unlock(&eventual_requests_lock);
    // pthread_mutex_lock(&eventual_requests_lock);
    // eventual_requests[request_id] = eventual;
    // pthread_mutex_unlock(&eventual_requests_lock);

    // we should call AGIOS before chunking (as that is an internal way to handle the requests)
    if (!agios_add_request(agios_path, AGIOS_WRITE, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, NULL)) {
    if (!agios_add_request(agios_path, AGIOS_WRITE, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, agios_eventual_callback, eventual)) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS", __func__);
    } else {
        ADAFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS", __func__, request_id);
@@ -398,17 +402,17 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) {
    ABT_eventual eventual = ABT_EVENTUAL_NULL;

    /* creating eventual */
    ABT_eventual_create(sizeof(unsigned long long int), &eventual);
    ABT_eventual_create(sizeof(int64_t), &eventual);

    unsigned long long int request_id = generate_unique_id();
    char *agios_path = (char*) in.path;

    pthread_mutex_lock(&eventual_requests_lock);
    eventual_requests[request_id] = eventual;
    pthread_mutex_unlock(&eventual_requests_lock);
    // pthread_mutex_lock(&eventual_requests_lock);
    // eventual_requests[request_id] = eventual;
    // pthread_mutex_unlock(&eventual_requests_lock);

    // we should call AGIOS before chunking (as that is an internal way to handle the requests)
    if (!agios_add_request(agios_path, AGIOS_READ, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, NULL)) {
    if (!agios_add_request(agios_path, AGIOS_READ, in.offset, in.total_chunk_size, request_id, AGIOS_SERVER_ID_IGNORE, agios_eventual_callback, eventual)) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS", __func__);
    } else {
        ADAFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS", __func__, request_id);
@@ -654,19 +658,22 @@ static hg_return_t rpc_srv_chunk_stat(hg_handle_t handle) {

DEFINE_MARGO_RPC_HANDLER(rpc_srv_chunk_stat)

/*
#ifdef GKFS_ENABLE_AGIOS
void *agios_callback(int64_t request_id) {
    ADAFS_DATA->spdlogger()->debug("{}() request {} is ready", __func__, request_id);

    pthread_mutex_lock(&eventual_requests_lock);
    ABT_eventual_set(eventual_requests[request_id], &request_id, sizeof(unsigned long long int));
    ABT_eventual_set(eventual_requests[request_id], &request_id, sizeof(int64_t));
    eventual_requests.erase(request_id);
    pthread_mutex_unlock(&eventual_requests_lock);

    return 0;
}
#endif
*/

/*
#ifdef GKFS_ENABLE_AGIOS
void *agios_callback_aggregated(int64_t *requests, int32_t total) {
    for (int i=0; i<total; i++) {
@@ -675,7 +682,7 @@ void *agios_callback_aggregated(int64_t *requests, int32_t total) {
        ADAFS_DATA->spdlogger()->debug("{}() request [{}/{}] {} is ready", __func__, i+1, total, request_id);

        pthread_mutex_lock(&eventual_requests_lock);
        ABT_eventual_set(eventual_requests[request_id], &request_id, sizeof(unsigned long long int));
        ABT_eventual_set(eventual_requests[request_id], &request_id, sizeof(int64_t));
        eventual_requests.erase(request_id);
        pthread_mutex_unlock(&eventual_requests_lock);
    }
@@ -683,3 +690,14 @@ void *agios_callback_aggregated(int64_t *requests, int32_t total) {
    return 0;
}
#endif
*/

#ifdef GKFS_ENABLE_AGIOS
void *agios_eventual_callback(int64_t request_id, void *info) {
    ADAFS_DATA->spdlogger()->debug("{}() custom callback request {} is ready", __func__, request_id);

    ABT_eventual_set((ABT_eventual) info, &request_id, sizeof(int64_t));
    
    return 0;
}
#endif
 No newline at end of file
+1 −1
Original line number Diff line number Diff line
@@ -160,7 +160,7 @@ void destroy_enviroment() {
void agios_initialize() {
    char configuration[] = "/tmp/agios.conf";
    
    if (!agios_init(agios_callback, agios_callback_aggregated, configuration, 0)) {
    if (!agios_init(NULL, NULL, configuration, 0)) {
        ADAFS_DATA->spdlogger()->error("{}() Failed to initialize AGIOS scheduler: '{}'", __func__, configuration);   

        agios_exit();