Commit 3f1b02cf authored by Marc Vef's avatar Marc Vef
Browse files

Adding client skeleton for expansion

parent 1c07cec8
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -72,6 +72,7 @@ target_sources(
         rpc/forward_management.hpp
         rpc/forward_metadata.hpp
         rpc/forward_data.hpp
    rpc/forward_malleability.hpp
         syscalls/args.hpp
         syscalls/decoder.hpp
         syscalls/errno.hpp
+45 −0
Original line number Diff line number Diff line
/*
  Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This file is part of GekkoFS' POSIX interface.

  GekkoFS' POSIX interface is free software: you can redistribute it and/or
  modify it under the terms of the GNU Lesser General Public License as
  published by the Free Software Foundation, either version 3 of the License,
  or (at your option) any later version.

  GekkoFS' POSIX interface is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU Lesser General Public License for more details.

  You should have received a copy of the GNU Lesser General Public License
  along with GekkoFS' POSIX interface.  If not, see
  <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: LGPL-3.0-or-later
*/

#ifndef GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP
#define GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP

namespace gkfs::malleable::rpc {

int
forward_expand_start(int old_server_conf, int new_server_conf);

int
forward_expand_status();

int
forward_expand_finalize();
} // namespace gkfs::malleable::rpc

#endif // GEKKOFS_CLIENT_FORWARD_MALLEABILITY_HPP
+361 −18
Original line number Diff line number Diff line
@@ -63,7 +63,9 @@ hg_proc_void_t(hg_proc_t proc, void* data) {

} // namespace hermes::detail

namespace gkfs::rpc {
namespace gkfs {

namespace rpc {

//==============================================================================
// definitions for fs_config
@@ -125,7 +127,8 @@ struct fs_config {

        explicit input(const hermes::detail::hg_void_t& other) {}

        explicit operator hermes::detail::hg_void_t() {
        explicit
        operator hermes::detail::hg_void_t() {
            return {};
        }
    };
@@ -311,7 +314,8 @@ struct create {
        explicit input(const rpc_mk_node_in_t& other)
            : m_path(other.path), m_mode(other.mode) {}

        explicit operator rpc_mk_node_in_t() {
        explicit
        operator rpc_mk_node_in_t() {
            return {m_path.c_str(), m_mode};
        }

@@ -420,7 +424,8 @@ struct stat {

        explicit input(const rpc_path_only_in_t& other) : m_path(other.path) {}

        explicit operator rpc_path_only_in_t() {
        explicit
        operator rpc_path_only_in_t() {
            return {m_path.c_str()};
        }

@@ -546,7 +551,8 @@ struct remove_metadata {
        explicit input(const rpc_rm_node_in_t& other)
            : m_path(other.path), m_rm_dir(other.rm_dir) {}

        explicit operator rpc_rm_node_in_t() {
        explicit
        operator rpc_rm_node_in_t() {
            return {m_path.c_str(), m_rm_dir};
        }

@@ -678,7 +684,8 @@ struct decr_size {
        explicit input(const rpc_trunc_in_t& other)
            : m_path(other.path), m_length(other.length) {}

        explicit operator rpc_trunc_in_t() {
        explicit
        operator rpc_trunc_in_t() {
            return {m_path.c_str(), m_length};
        }

@@ -885,7 +892,8 @@ struct update_metadentry {
              m_atime_flag(other.atime_flag), m_mtime_flag(other.mtime_flag),
              m_ctime_flag(other.ctime_flag) {}

        explicit operator rpc_update_metadentry_in_t() {
        explicit
        operator rpc_update_metadentry_in_t() {
            return {m_path.c_str(), m_nlink,      m_mode,       m_uid,
                    m_gid,          m_size,       m_blocks,     m_atime,
                    m_mtime,        m_ctime,      m_nlink_flag, m_mode_flag,
@@ -1013,7 +1021,8 @@ struct get_metadentry_size {

        explicit input(const rpc_path_only_in_t& other) : m_path(other.path) {}

        explicit operator rpc_path_only_in_t() {
        explicit
        operator rpc_path_only_in_t() {
            return {m_path.c_str()};
        }

@@ -1148,7 +1157,8 @@ struct update_metadentry_size {
            : m_path(other.path), m_size(other.size), m_offset(other.offset),
              m_append(other.append) {}

        explicit operator rpc_update_metadentry_size_in_t() {
        explicit
        operator rpc_update_metadentry_size_in_t() {
            return {m_path.c_str(), m_size, m_offset, m_append};
        }

@@ -1276,7 +1286,8 @@ struct mk_symlink {
        explicit input(const rpc_mk_symlink_in_t& other)
            : m_path(other.path), m_target_path(other.target_path) {}

        explicit operator rpc_mk_symlink_in_t() {
        explicit
        operator rpc_mk_symlink_in_t() {
            return {m_path.c_str(), m_target_path.c_str()};
        }

@@ -1387,7 +1398,8 @@ struct remove_data {

        explicit input(const rpc_rm_node_in_t& other) : m_path(other.path) {}

        explicit operator rpc_rm_node_in_t() {
        explicit
        operator rpc_rm_node_in_t() {
            return {m_path.c_str()};
        }

@@ -1553,7 +1565,8 @@ struct write_data {
              m_total_chunk_size(other.total_chunk_size),
              m_buffers(other.bulk_handle) {}

        explicit operator rpc_write_data_in_t() {
        explicit
        operator rpc_write_data_in_t() {
            return {m_path.c_str(),      m_offset,          m_host_id,
                    m_host_size,         m_wbitset.c_str(), m_chunk_n,
                    m_chunk_start,       m_chunk_end,       m_total_chunk_size,
@@ -1738,7 +1751,8 @@ struct read_data {
              m_total_chunk_size(other.total_chunk_size),
              m_buffers(other.bulk_handle) {}

        explicit operator rpc_read_data_in_t() {
        explicit
        operator rpc_read_data_in_t() {
            return {m_path.c_str(),      m_offset,          m_host_id,
                    m_host_size,         m_wbitset.c_str(), m_chunk_n,
                    m_chunk_start,       m_chunk_end,       m_total_chunk_size,
@@ -1872,7 +1886,8 @@ struct trunc_data {
        explicit input(const rpc_trunc_in_t& other)
            : m_path(other.path), m_length(other.length) {}

        explicit operator rpc_trunc_in_t() {
        explicit
        operator rpc_trunc_in_t() {
            return {
                    m_path.c_str(),
                    m_length,
@@ -1991,7 +2006,8 @@ struct get_dirents {
        explicit input(const rpc_get_dirents_in_t& other)
            : m_path(other.path), m_buffers(other.bulk_handle) {}

        explicit operator rpc_get_dirents_in_t() {
        explicit
        operator rpc_get_dirents_in_t() {
            return {m_path.c_str(), hg_bulk_t(m_buffers)};
        }

@@ -2116,7 +2132,8 @@ struct get_dirents_extended {
        explicit input(const rpc_get_dirents_in_t& other)
            : m_path(other.path), m_buffers(other.bulk_handle) {}

        explicit operator rpc_get_dirents_in_t() {
        explicit
        operator rpc_get_dirents_in_t() {
            return {m_path.c_str(), hg_bulk_t(m_buffers)};
        }

@@ -2235,7 +2252,8 @@ struct chunk_stat {
        explicit input(const rpc_chunk_stat_in_t& other)
            : m_dummy(other.dummy) {}

        explicit operator rpc_chunk_stat_in_t() {
        explicit
        operator rpc_chunk_stat_in_t() {
            return {m_dummy};
        }

@@ -3693,8 +3711,333 @@ struct get_dirents_extended_proxy {
        size_t m_dirents_size;
    };
};
} // namespace rpc
namespace malleable::rpc {

//==============================================================================
// definitions for expand_start
struct expand_start {

    // forward declarations of public input/output types for this RPC
    class input;

    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = expand_start;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = rpc_expand_start_in_t;
    using mercury_output_type = rpc_err_out_t;

    // RPC public identifier
    // (N.B: we reuse the same IDs assigned by Margo so that the daemon
    // understands Hermes RPCs)
    constexpr static const uint64_t public_id = 50;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = 0;

    // RPC name
    constexpr static const auto name = gkfs::rpc::malleable::tag::expand_start;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb =
            HG_GEN_PROC_NAME(rpc_expand_start_in_t);

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb =
            HG_GEN_PROC_NAME(rpc_err_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t
        hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input(const uint32_t old_server_conf, uint32_t new_server_conf)
            : m_old_server_conf(old_server_conf),
              m_new_server_conf(new_server_conf) {}

        input(input&& rhs) = default;

        input(const input& other) = default;

        input&
        operator=(input&& rhs) = default;

        input&
        operator=(const input& other) = default;

        uint32_t
        old_server_conf() const {
            return m_old_server_conf;
        }

        uint32_t
        new_server_conf() const {
            return m_new_server_conf;
        }

        explicit input(const rpc_expand_start_in_t& other)
            : m_old_server_conf(other.old_server_conf),
              m_new_server_conf(other.new_server_conf) {}

        explicit
        operator rpc_expand_start_in_t() {
            return {m_old_server_conf, m_new_server_conf};
        }

    private:
        uint32_t m_old_server_conf;
        uint32_t m_new_server_conf;
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t
        hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() : m_err() {}

        output(int32_t err) : m_err(err) {}

        output(output&& rhs) = default;

        output(const output& other) = default;

        output&
        operator=(output&& rhs) = default;

        output&
        operator=(const output& other) = default;

        explicit output(const rpc_err_out_t& out) {
            m_err = out.err;
        }

        int32_t
        err() const {
            return m_err;
        }

    private:
        int32_t m_err;
    };
};

//==============================================================================
// definitions for expand_status
struct expand_status {

    // forward declarations of public input/output types for this RPC
    class input;

    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = expand_status;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = hermes::detail::hg_void_t;
    using mercury_output_type = rpc_err_out_t;

    // RPC public identifier
    // (N.B: we reuse the same IDs assigned by Margo so that the daemon
    // understands Hermes RPCs)
    constexpr static const uint64_t public_id = 51;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = 0;

    // RPC name
    constexpr static const auto name = gkfs::rpc::malleable::tag::expand_status;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb =
            hermes::detail::hg_proc_void_t;

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb =
            HG_GEN_PROC_NAME(rpc_err_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t
        hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input() {}

        input(input&& rhs) = default;

        input(const input& other) = default;

        input&
        operator=(input&& rhs) = default;

        input&
        operator=(const input& other) = default;

        explicit input(const hermes::detail::hg_void_t& other) {}

        explicit
        operator hermes::detail::hg_void_t() {
            return {};
        }
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t
        hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() : m_err() {}

        output(int32_t err) : m_err(err) {}

        output(output&& rhs) = default;

        output(const output& other) = default;

        output&
        operator=(output&& rhs) = default;

        output&
        operator=(const output& other) = default;

        explicit output(const rpc_err_out_t& out) {
            m_err = out.err;
        }

        int32_t
        err() const {
            return m_err;
        }

    private:
        int32_t m_err;
    };
};

//==============================================================================
// definitions for expand_finalize
struct expand_finalize {

    // forward declarations of public input/output types for this RPC
    class input;

    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = expand_finalize;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = hermes::detail::hg_void_t;
    using mercury_output_type = rpc_err_out_t;

    // RPC public identifier
    // (N.B: we reuse the same IDs assigned by Margo so that the daemon
    // understands Hermes RPCs)
    constexpr static const uint64_t public_id = 52;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = 0;

    // RPC name
    constexpr static const auto name =
            gkfs::rpc::malleable::tag::expand_finalize;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb =
            hermes::detail::hg_proc_void_t;

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb =
            HG_GEN_PROC_NAME(rpc_err_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t
        hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input() {}

        input(input&& rhs) = default;

        input(const input& other) = default;

        input&
        operator=(input&& rhs) = default;

        input&
        operator=(const input& other) = default;

        explicit input(const hermes::detail::hg_void_t& other) {}

        explicit
        operator hermes::detail::hg_void_t() {
            return {};
        }
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t
        hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() : m_err() {}

        output(int32_t err) : m_err(err) {}

        output(output&& rhs) = default;

        output(const output& other) = default;

        output&
        operator=(output&& rhs) = default;

        output&
        operator=(const output& other) = default;

        explicit output(const rpc_err_out_t& out) {
            m_err = out.err;
        }

        int32_t
        err() const {
            return m_err;
        }

    private:
        int32_t m_err;
    };
};

} // namespace gkfs::rpc
} // namespace malleable::rpc
} // namespace gkfs


#endif // GKFS_RPCS_TYPES_HPP
+26 −2
Original line number Diff line number Diff line
@@ -40,7 +40,8 @@ extern "C" {

struct linux_dirent64;

namespace gkfs::syscall {
namespace gkfs {
namespace syscall {

int
gkfs_open(const std::string& path, mode_t mode, int flags);
@@ -77,7 +78,30 @@ gkfs_remove(const std::string& path);

std::vector<std::string>
gkfs_get_file_list(const std::string& path);
} // namespace gkfs::syscall
} // namespace syscall
namespace malleable {

/**
 * @brief Start an expansion of the file system
 * @param old_server_conf old number of nodes
 * @param new_server_conf new number of nodes
 * @return error code
 */
int
expand_start(int old_server_conf, int new_server_conf);

/**
 * @brief Check for the current status of the expansion process
 * @return 0 when finished, positive numbers indicate how many daemons
 * are still redistributing data
 */
int
expand_status();

int
expand_finalize();
} // namespace malleable
} // namespace gkfs


extern "C" int
+7 −0
Original line number Diff line number Diff line
@@ -78,8 +78,15 @@ constexpr auto client_proxy_get_dirents_extended =
// Specific RPCs between daemon and proxy
constexpr auto proxy_daemon_write = "proxy_daemon_rpc_srv_write_data";
constexpr auto proxy_daemon_read = "proxy_daemon_rpc_srv_read_data";

} // namespace tag

namespace malleable::tag {
constexpr auto expand_start = "rpc_srv_expand_start";
constexpr auto expand_status = "rpc_srv_expand_status";
constexpr auto expand_finalize = "rpc_srv_expand_finalize";
} // namespace malleable::tag

namespace protocol {
constexpr auto na_sm = "na+sm";
constexpr auto ofi_sockets = "ofi+sockets";
Loading