Skip to content

Feat/MCOL-5996-cmapi-integration-testing #3568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: stable-23.10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmapi/cmapi_server/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[tool.ruff]
line-length = 80
target-version = "py39"
# Enable common rule sets
select = [
"E", # pycodestyle errors
"F", # pyflakes: undefined names, unused imports, etc.
"I", # isort: import sorting
"B", # flake8-bugbear: common bugs and anti-patterns
"UP", # pyupgrade: use modern Python syntax
"N", # pep8-naming: naming conventions
]

ignore = []

# Exclude cache and temporary directories
exclude = [
"__pycache__",
]

[tool.ruff.format]
quote-style = "single"
Empty file.
41 changes: 41 additions & 0 deletions cmapi/integration_tests/cluster_mgmt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import json
from pathlib import Path
import subprocess
from integration_tests.utils import change_directory


def cluster_exists(terraform_dir) -> bool:
"""
Check if the cluster exists by running terraform output command.
"""
with change_directory(terraform_dir):
try:
outputs = json.loads(subprocess.check_output(
["terraform", "output", "-json"],
text=True,
))
return bool(outputs)
except subprocess.CalledProcessError as e:
print(f"Error checking cluster existence: {e}")
return False


def create_cluster(terraform_dir: Path) -> None:
"""
Create the cluster by running terraform and ansible
"""
with change_directory(terraform_dir):
subprocess.check_call(["terraform", "init"])
subprocess.check_call(["terraform", "plan"])
subprocess.check_call(["terraform", "apply", "-auto-approve"])
subprocess.check_call(["ansible-playbook", "provision.yml"])

# Run our own provisioning playbook to prepare the cluster for the tests
our_playbook = Path(__file__).parent / "integration_tests.yml"
subprocess.check_call(["ansible-playbook", str(our_playbook.absolute())])


def destroy_cluster(terraform_dir: Path) -> None:
assert cluster_exists(terraform_dir), "Cluster does not exist"
with change_directory(terraform_dir):
subprocess.check_call(["terraform", "destroy", "-auto-approve"])
175 changes: 175 additions & 0 deletions cmapi/integration_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import json
from pathlib import Path
import subprocess
from integration_tests import cluster_mgmt
from integration_tests.ssh import ClusterConfig, HostConfig, run_on_all_hosts_parallel
from integration_tests.state import activate_single_node_config
from integration_tests.utils import change_directory
import pytest

def pytest_addoption(parser):
"""Add terraform directory option to pytest command line arguments."""
parser.addoption(
"--terraform-dir",
action="store",
default=None,
required=True,
help="Directory where terraform was run to set up the test cluster",
)
parser.addoption(
"--create-cluster",
action="store_true",
default=False,
help="Create the cluster before running tests",
)
parser.addoption(
"--destroy-cluster",
action="store_true",
default=False,
help="Destroy the cluster after running tests",
)


@pytest.fixture(scope="session")
def terraform_dir(request) -> Path:
"""
Fixture to provide the terraform directory path.
Fails explicitly if --terraform-dir option is not provided.
"""
terraform_directory = request.config.getoption("--terraform-dir")
if not terraform_directory:
pytest.fail("--terraform-dir option is required but not provided")
terraform_directory = Path(terraform_directory).resolve()
if not terraform_directory.is_dir():
pytest.fail(f"Specified terraform directory '{terraform_directory}' does not exist")
return terraform_directory


@pytest.fixture(scope="session")
def prepared_cluster(request, terraform_dir):
"""
Provides existence of the cluster
If it doesn't exist and --create-cluster is passed, it creates it.
If it exists and --create-cluster is passed, it re-creates it.
If --destroy-cluster is passed, it destroys the cluster when the tests finish.
"""
create_cluster = request.config.getoption("--create-cluster")
destroy_cluster = request.config.getoption("--destroy-cluster")

cluster_already_exists = cluster_mgmt.cluster_exists(terraform_dir)

if create_cluster:
if cluster_already_exists:
print("Cluster already exists, destroying it first...")
cluster_mgmt.destroy_cluster(terraform_dir)
print("Creating the cluster...")
cluster_mgmt.create_cluster(terraform_dir)


yield


if destroy_cluster:
print("Destroying the cluster...")
cluster_mgmt.destroy_cluster(terraform_dir)


@pytest.fixture(scope="session")
def cluster_config(terraform_dir, prepared_cluster) -> ClusterConfig:
"""
Fixture to provide the cluster configuration.
"""
with change_directory(terraform_dir):
# Read the cluster configuration from the terraform output
outputs = json.loads(subprocess.check_output(
["terraform", "output", "-json"],
text=True,
))

if outputs == {}:
pytest.fail("Terraform output is empty, looks like cluster is not yet created. Pass --create-cluster option to create it.")

key_file_path = Path(outputs["ssh_key_file"]["value"])

mcs_nodes = []
for host_descr in outputs["columnstore_nodes"]["value"]:
mcs_nodes.append(
HostConfig(
name=host_descr["name"],
private_ip=host_descr["private_ip"],
public_fqdn=host_descr["public_dns"],
key_file_path=key_file_path,
)
)

maxscale_nodes = []
for host_descr in outputs["maxscale_nodes"]["value"]:
maxscale_nodes.append(
HostConfig(
name=host_descr["name"],
private_ip=host_descr["private_ip"],
public_fqdn=host_descr["public_dns"],
key_file_path=key_file_path,
)
)

all_hosts = mcs_nodes + maxscale_nodes
# Check if all hosts are reachable
for host in all_hosts:
if not host.is_reachable():
pytest.fail(f"Host {host.name} is not reachable via SSH")

cluster_config = ClusterConfig(mcs_hosts=mcs_nodes, maxscale_hosts=maxscale_nodes)
return cluster_config


@pytest.fixture
def disconnected_cluster(cluster_config):
"""
Fixture to activate single node configuration on all hosts in the cluster,
none of the nodes are in the cluster.
"""
print("Activating single node configuration on all hosts...")
run_on_all_hosts_parallel(cluster_config.mcs_hosts, activate_single_node_config)


@pytest.fixture
def only_primary_in_cluster(disconnected_cluster, cluster_config):
"""
Fixture that provides a cluster with only the primary node added
"""
print("Adding only primary node to the cluster...")
cmd = f"cluster node add --node {cluster_config.primary.private_ip}"
cluster_config.primary.exec_mcs(cmd)

status_out = cluster_config.primary.exec_mcs("cluster status")
assert status_out["num_nodes"] == 1
assert str(cluster_config.primary.private_ip) in status_out


@pytest.fixture
def only_primary_and_one_replica_in_cluster(only_primary_in_cluster, cluster_config):
"""
Fixture that provides a cluster with only the primary and one replica node added
"""
print("Adding one replica...")
cmd = f"cluster node add --node {cluster_config.replicas[0].private_ip}"
cluster_config.primary.exec_mcs(cmd)

status_out = cluster_config.primary.exec_mcs("cluster status")
assert status_out["num_nodes"] == 2


@pytest.fixture
def complete_cluster(cluster_config, disconnected_cluster, only_primary_in_cluster):
"""
Fixture to activate multinode configuration on all hosts in the cluster.
"""
print("Adding replicas to the cluster...")
cmd = "cluster node add "
for host in cluster_config.replicas:
cmd += f"--node {host.private_ip} "
cluster_config.primary.exec_mcs(cmd)

status_out = cluster_config.primary.exec_mcs("cluster status")
assert status_out["num_nodes"] == len(cluster_config.mcs_hosts)
23 changes: 23 additions & 0 deletions cmapi/integration_tests/integration_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Ansible provisioning for integration tests

- hosts: "primary,replicas"
become: yes
become_user: root
gather_facts: no

tasks:
# Install cs_package_manager
- name: "Download cs_package_manager"
get_url:
url: "https://raw.githubusercontent.com/mariadb-corporation/mariadb-columnstore-engine/refs/heads/cs_package_manager_v3.5/cmapi/scripts/cs_package_manager.sh"
dest: "/tmp/cs_package_manager.sh"
mode: '0755'

- name: "Install cs_package_manager"
command: "install -o root -g root -m 0755 /tmp/cs_package_manager.sh /usr/local/bin/cs_package_manager"

# Install iptables (will use nftables behind the scenes)
- name: "Install iptables"
package:
name:
- iptables
121 changes: 121 additions & 0 deletions cmapi/integration_tests/ssh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import concurrent.futures
from contextlib import contextmanager
import json
from pathlib import Path
from typing import Any, Callable
from fabric import Connection

SSH_USER = "rocky"


class HostConfig:
"""Keeps configuration for a host in the cluster"""
def __init__(self, name: str, private_ip: str, public_fqdn: str, key_file_path: Path) -> None:
self.name = name
self.private_ip = private_ip
self.public_fqdn = public_fqdn
self.key_file_path = key_file_path

def __repr__(self) -> str:
return f"HostConfig(name={self.name}, private_ip={self.private_ip}, public_fqdn={self.public_fqdn}"

@contextmanager
def ssh_connection(self):
"""Connect to the host using SSH (via Fabric)"""
conn = Connection(
host=self.public_fqdn,
user=SSH_USER,
connect_kwargs={
"key_filename": str(self.key_file_path),
}
)

yield conn

conn.close()

def is_reachable(self) -> bool:
"""Check if the host is reachable via SSH"""
try:
with self.ssh_connection() as conn:
conn.run("echo 'SSH connection successful'", hide=True)
return True
except Exception as e:
print(f"SSH connection failed for {self.name}: {e}")
return False

def exec_mcs(self, params: str, drop_timestamp: bool = True) -> dict:
"""Execute an mcs subcommand and return its parsed output"""
with self.ssh_connection() as conn:
print(f"{self.name}: Executing 'mcs {params}'")
result = conn.sudo(f"mcs {params}", hide=True)
if result.return_code != 0:
raise RuntimeError(f"Failed to execute mcs command on {self.name}: {result.stderr}")
output = json.loads(result.stdout)
return output


class ClusterConfig:
"""Keeps configuration of the cluster"""
def __init__(self, mcs_hosts: list[HostConfig], maxscale_hosts: list[HostConfig]) -> None:
self.mcs_hosts = mcs_hosts
self.maxscale_hosts = maxscale_hosts

def __repr__(self) -> str:
return f"ClusterConfig(mcs_hosts={self.mcs_hosts}"

@property
def primary(self) -> HostConfig:
return self.mcs_hosts[0]

@property
def replicas(self) -> list[HostConfig]:
return self.mcs_hosts[1:]


def run_on_all_hosts_parallel(hosts: list[HostConfig], func: Callable[[HostConfig], Any], max_workers=None):
"""
Run a function on all hosts in parallel.

Args:
hosts: List of HostConfig objects
func: Function that takes a HostConfig as its only argument
max_workers: Maximum number of worker threads (None = default based on system)

Returns:
Dictionary mapping hosts to function results
"""
results = {}

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Start the function execution on each host
future_to_host = {executor.submit(func, host): host for host in hosts}

# Process results as they complete
for future in concurrent.futures.as_completed(future_to_host):
host = future_to_host[future]
try:
result = future.result()
results[host.name] = result
except Exception as exc:
print(f"Error on host {host.name}: {exc}")
results[host.name] = exc
raise

return results


def block_port(host: HostConfig, port: int) -> None:
"""Block a port on the host using iptables"""
with host.ssh_connection() as conn:
print(f"{host.name}: Blocking port {port}")
conn.sudo(f"iptables -A INPUT -p tcp --dport {port} -j DROP")
print(f"{host.name}: Port {port} blocked")


def unblock_port(host: HostConfig, port: int) -> None:
"""Unblock a port on the host using iptables"""
with host.ssh_connection() as conn:
print(f"{host.name}: Unblocking port {port}")
conn.sudo(f"iptables -D INPUT -p tcp --dport {port} -j DROP")
print(f"{host.name}: Port {port} unblocked")
Loading