Verified Commit d0645f6b authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add job::requirements class

parent 956958b6
Loading
Loading
Loading
Loading
+227 −193
Original line number Diff line number Diff line
@@ -168,6 +168,200 @@ private:
};

struct job_requirements;
struct dataset;

struct adhoc_storage {

    enum class type : std::underlying_type<ADM_adhoc_storage_type_t>::type {
        gekkofs = ADM_ADHOC_STORAGE_GEKKOFS,
        dataclay = ADM_ADHOC_STORAGE_DATACLAY,
        expand = ADM_ADHOC_STORAGE_EXPAND,
        hercules = ADM_ADHOC_STORAGE_HERCULES,
    };

    enum class execution_mode : std::underlying_type<ADM_adhoc_mode_t>::type {
        in_job_shared = ADM_ADHOC_MODE_IN_JOB_SHARED,
        in_job_dedicated = ADM_ADHOC_MODE_IN_JOB_DEDICATED,
        separate_new = ADM_ADHOC_MODE_SEPARATE_NEW,
        separate_existing = ADM_ADHOC_MODE_SEPARATE_EXISTING
    };

    enum class access_type : std::underlying_type<ADM_adhoc_mode_t>::type {
        read_only = ADM_ADHOC_ACCESS_RDONLY,
        write_only = ADM_ADHOC_ACCESS_WRONLY,
        read_write = ADM_ADHOC_ACCESS_RDWR,
    };

    struct resources {
        resources() = default;
        explicit resources(std::vector<scord::node> nodes);
        explicit resources(ADM_adhoc_resources_t res);
        explicit operator ADM_adhoc_resources_t() const;

        std::vector<scord::node>
        nodes() const;

        template <typename Archive>
        void
        serialize(Archive&& ar) {
            ar& m_nodes;
        }

    private:
        std::vector<scord::node> m_nodes;
    };

    struct ctx {

        ctx() = default;

        ctx(execution_mode exec_mode, access_type access_type,
            adhoc_storage::resources resources, std::uint32_t walltime,
            bool should_flush);

        explicit ctx(ADM_adhoc_context_t ctx);
        explicit operator ADM_adhoc_context_t() const;

        execution_mode
        exec_mode() const;
        enum access_type
        access_type() const;
        adhoc_storage::resources
        resources() const;
        std::uint32_t
        walltime() const;
        bool
        should_flush() const;

        template <class Archive>
        void
        serialize(Archive&& ar) {
            ar& m_exec_mode;
            ar& m_access_type;
            ar& m_resources;
            ar& m_walltime;
            ar& m_should_flush;
        }

    private:
        execution_mode m_exec_mode;
        enum access_type m_access_type;
        adhoc_storage::resources m_resources;
        std::uint32_t m_walltime;
        bool m_should_flush;
    };

    adhoc_storage();
    adhoc_storage(enum adhoc_storage::type type, std::string name,
                  std::uint64_t id, execution_mode exec_mode,
                  access_type access_type, adhoc_storage::resources res,
                  std::uint32_t walltime, bool should_flush);
    explicit adhoc_storage(ADM_adhoc_storage_t storage);
    explicit operator ADM_adhoc_storage_t() const;
    adhoc_storage(enum adhoc_storage::type type, std::string name,
                  std::uint64_t id, const scord::adhoc_storage::ctx& ctx);

    adhoc_storage(const adhoc_storage& other) noexcept;
    adhoc_storage(adhoc_storage&&) noexcept;
    adhoc_storage&
    operator=(const adhoc_storage&) noexcept;
    adhoc_storage&
    operator=(adhoc_storage&&) noexcept;
    ~adhoc_storage();

    std::string
    name() const;
    type
    type() const;
    std::uint64_t
    id() const;
    adhoc_storage::ctx
    context() const;

    void
    update(scord::adhoc_storage::ctx new_ctx);

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
};

struct pfs_storage {

    enum class type : std::underlying_type<ADM_pfs_storage_type_t>::type {
        lustre = ADM_PFS_STORAGE_LUSTRE,
        gpfs = ADM_PFS_STORAGE_GPFS
    };

    struct ctx {

        ctx() = default;

        explicit ctx(std::filesystem::path mount_point);

        explicit ctx(ADM_pfs_context_t ctx);
        explicit operator ADM_pfs_context_t() const;

        std::filesystem::path
        mount_point() const;

        template <class Archive>
        void
        serialize(Archive&& ar) {
            ar& m_mount_point;
        }

    private:
        std::filesystem::path m_mount_point;
    };

    pfs_storage();

    pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id,
                std::filesystem::path mount_point);

    pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id,
                const pfs_storage::ctx& pfs_ctx);

    explicit pfs_storage(ADM_pfs_storage_t storage);
    explicit operator ADM_pfs_storage_t() const;

    pfs_storage(const pfs_storage& other) noexcept;
    pfs_storage(pfs_storage&&) noexcept;
    pfs_storage&
    operator=(const pfs_storage& other) noexcept;
    pfs_storage&
    operator=(pfs_storage&&) noexcept;
    ~pfs_storage();

    std::string
    name() const;
    type
    type() const;
    std::uint64_t
    id() const;
    pfs_storage::ctx
    context() const;

    void
    update(scord::pfs_storage::ctx new_ctx);

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
};

struct job {

@@ -189,6 +383,39 @@ struct job {
        std::vector<scord::node> m_nodes;
    };

    struct requirements {

        requirements();
        requirements(std::vector<scord::dataset> inputs,
                     std::vector<scord::dataset> outputs);
        requirements(std::vector<scord::dataset> inputs,
                     std::vector<scord::dataset> outputs,
                     scord::adhoc_storage adhoc_storage);
        explicit requirements(ADM_job_requirements_t reqs);

        std::vector<scord::dataset>
        inputs() const;
        std::vector<scord::dataset>
        outputs() const;
        std::optional<scord::adhoc_storage>
        adhoc_storage() const;

        // The implementation for this must be deferred until
        // after the declaration of the PIMPL class
        template <class Archive>
        void
        serialize(Archive& ar) {
            ar& m_inputs;
            ar& m_outputs;
            ar& m_adhoc_storage;
        }

    private:
        std::vector<scord::dataset> m_inputs;
        std::vector<scord::dataset> m_outputs;
        std::optional<scord::adhoc_storage> m_adhoc_storage;
    };

    job();
    job(job_id id, slurm_job_id slurm_id);
    explicit job(ADM_job_t job);
@@ -367,199 +594,6 @@ private:
    std::unique_ptr<impl> m_pimpl;
};

struct adhoc_storage {

    enum class type : std::underlying_type<ADM_adhoc_storage_type_t>::type {
        gekkofs = ADM_ADHOC_STORAGE_GEKKOFS,
        dataclay = ADM_ADHOC_STORAGE_DATACLAY,
        expand = ADM_ADHOC_STORAGE_EXPAND,
        hercules = ADM_ADHOC_STORAGE_HERCULES,
    };

    enum class execution_mode : std::underlying_type<ADM_adhoc_mode_t>::type {
        in_job_shared = ADM_ADHOC_MODE_IN_JOB_SHARED,
        in_job_dedicated = ADM_ADHOC_MODE_IN_JOB_DEDICATED,
        separate_new = ADM_ADHOC_MODE_SEPARATE_NEW,
        separate_existing = ADM_ADHOC_MODE_SEPARATE_EXISTING
    };

    enum class access_type : std::underlying_type<ADM_adhoc_mode_t>::type {
        read_only = ADM_ADHOC_ACCESS_RDONLY,
        write_only = ADM_ADHOC_ACCESS_WRONLY,
        read_write = ADM_ADHOC_ACCESS_RDWR,
    };

    struct resources {
        resources() = default;
        explicit resources(std::vector<scord::node> nodes);
        explicit resources(ADM_adhoc_resources_t res);
        explicit operator ADM_adhoc_resources_t() const;

        std::vector<scord::node>
        nodes() const;

        template <typename Archive>
        void
        serialize(Archive&& ar) {
            ar& m_nodes;
        }

    private:
        std::vector<scord::node> m_nodes;
    };

    struct ctx {

        ctx() = default;

        ctx(execution_mode exec_mode, access_type access_type,
            adhoc_storage::resources resources, std::uint32_t walltime,
            bool should_flush);

        explicit ctx(ADM_adhoc_context_t ctx);
        explicit operator ADM_adhoc_context_t() const;

        execution_mode
        exec_mode() const;
        enum access_type
        access_type() const;
        adhoc_storage::resources
        resources() const;
        std::uint32_t
        walltime() const;
        bool
        should_flush() const;

        template <class Archive>
        void
        serialize(Archive&& ar) {
            ar& m_exec_mode;
            ar& m_access_type;
            ar& m_resources;
            ar& m_walltime;
            ar& m_should_flush;
        }

    private:
        execution_mode m_exec_mode;
        enum access_type m_access_type;
        adhoc_storage::resources m_resources;
        std::uint32_t m_walltime;
        bool m_should_flush;
    };

    adhoc_storage();
    adhoc_storage(enum adhoc_storage::type type, std::string name,
                  std::uint64_t id, execution_mode exec_mode,
                  access_type access_type, adhoc_storage::resources res,
                  std::uint32_t walltime, bool should_flush);
    explicit adhoc_storage(ADM_adhoc_storage_t storage);
    explicit operator ADM_adhoc_storage_t() const;
    adhoc_storage(enum adhoc_storage::type type, std::string name,
                  std::uint64_t id, const scord::adhoc_storage::ctx& ctx);

    adhoc_storage(const adhoc_storage& other) noexcept;
    adhoc_storage(adhoc_storage&&) noexcept;
    adhoc_storage&
    operator=(const adhoc_storage&) noexcept;
    adhoc_storage&
    operator=(adhoc_storage&&) noexcept;
    ~adhoc_storage();

    std::string
    name() const;
    type
    type() const;
    std::uint64_t
    id() const;
    adhoc_storage::ctx
    context() const;

    void
    update(scord::adhoc_storage::ctx new_ctx);

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
};

struct pfs_storage {

    enum class type : std::underlying_type<ADM_pfs_storage_type_t>::type {
        lustre = ADM_PFS_STORAGE_LUSTRE,
        gpfs = ADM_PFS_STORAGE_GPFS
    };

    struct ctx {

        ctx() = default;

        explicit ctx(std::filesystem::path mount_point);

        explicit ctx(ADM_pfs_context_t ctx);
        explicit operator ADM_pfs_context_t() const;

        std::filesystem::path
        mount_point() const;

        template <class Archive>
        void
        serialize(Archive&& ar) {
            ar& m_mount_point;
        }

    private:
        std::filesystem::path m_mount_point;
    };

    pfs_storage();

    pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id,
                std::filesystem::path mount_point);

    pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id,
                const pfs_storage::ctx& pfs_ctx);

    explicit pfs_storage(ADM_pfs_storage_t storage);
    explicit operator ADM_pfs_storage_t() const;

    pfs_storage(const pfs_storage& other) noexcept;
    pfs_storage(pfs_storage&&) noexcept;
    pfs_storage&
    operator=(const pfs_storage& other) noexcept;
    pfs_storage&
    operator=(pfs_storage&&) noexcept;
    ~pfs_storage();

    std::string
    name() const;
    type
    type() const;
    std::uint64_t
    id() const;
    pfs_storage::ctx
    context() const;

    void
    update(scord::pfs_storage::ctx new_ctx);

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
};

struct job_requirements {

    job_requirements();
+47 −0
Original line number Diff line number Diff line
@@ -215,6 +215,53 @@ private:
    slurm_job_id m_slurm_job_id;
};

job::requirements::requirements() = default;

job::requirements::requirements(std::vector<scord::dataset> inputs,
                                std::vector<scord::dataset> outputs)
    : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)) {}

job::requirements::requirements(std::vector<scord::dataset> inputs,
                                std::vector<scord::dataset> outputs,
                                scord::adhoc_storage adhoc_storage)
    : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)),
      m_adhoc_storage(std::move(adhoc_storage)) {}

job::requirements::requirements(ADM_job_requirements_t reqs) {

    m_inputs.reserve(reqs->r_inputs->l_length);

    for(size_t i = 0; i < reqs->r_inputs->l_length; ++i) {
        m_inputs.emplace_back(reqs->r_inputs->l_datasets[i].d_id);
    }

    m_outputs.reserve(reqs->r_outputs->l_length);

    for(size_t i = 0; i < reqs->r_outputs->l_length; ++i) {
        m_outputs.emplace_back(reqs->r_outputs->l_datasets[i].d_id);
    }

    if(reqs->r_adhoc_storage) {
        m_adhoc_storage = scord::adhoc_storage(reqs->r_adhoc_storage);
    }
}

std::vector<scord::dataset>
job::requirements::inputs() const {
    return m_inputs;
}

std::vector<scord::dataset>
job::requirements::outputs() const {
    return m_outputs;
}

std::optional<scord::adhoc_storage>
job::requirements::adhoc_storage() const {
    return m_adhoc_storage;
}


job::resources::resources() = default;

job::resources::resources(std::vector<scord::node> nodes)