From a103c2cb041d6246472bbe5ca56f2f29f9e7450c Mon Sep 17 00:00:00 2001 From: dervoeti Date: Fri, 13 Jun 2025 13:22:14 +0200 Subject: [PATCH 1/3] feat: support custom product versions --- tests/templates/kuttl/smoke/check-s3.py | 8 ++++++-- tests/templates/kuttl/smoke_aws/check-s3.py | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/tests/templates/kuttl/smoke/check-s3.py b/tests/templates/kuttl/smoke/check-s3.py index 310d0870..f3c44c34 100755 --- a/tests/templates/kuttl/smoke/check-s3.py +++ b/tests/templates/kuttl/smoke/check-s3.py @@ -2,6 +2,7 @@ import trino import argparse import sys +import re if not sys.warnoptions: import warnings @@ -48,8 +49,11 @@ def run_query(connection, query): trino_version = run_query(connection, "select node_version from system.runtime.nodes where coordinator = true and state = 'active'")[0][0] print(f"[INFO] Testing against Trino version \"{trino_version}\"") - assert len(trino_version) >= 3 - assert trino_version.isnumeric() + # Strip SDP release suffix from the version string + trino_product_version = re.split(r'-stackable', trino_version, maxsplit=1)[0] + + assert len(trino_product_version) >= 3 + assert trino_product_version.isnumeric() assert trino_version == run_query(connection, "select version()")[0][0] run_query(connection, "CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')") diff --git a/tests/templates/kuttl/smoke_aws/check-s3.py b/tests/templates/kuttl/smoke_aws/check-s3.py index e73bbc1d..e624a9a5 100755 --- a/tests/templates/kuttl/smoke_aws/check-s3.py +++ b/tests/templates/kuttl/smoke_aws/check-s3.py @@ -2,6 +2,7 @@ import trino import argparse import sys +import re if not sys.warnoptions: import warnings @@ -57,8 +58,11 @@ def run_query(connection, query): )[0][0] print(f'[INFO] Testing against Trino version "{trino_version}"') - assert len(trino_version) >= 3 - assert trino_version.isnumeric() + # Strip SDP release suffix from the version string + trino_product_version = re.split(r'-stackable', trino_version, maxsplit=1)[0] + + assert len(trino_product_version) >= 3 + assert trino_product_version.isnumeric() assert trino_version == run_query(connection, "select version()")[0][0] # WARNING (@NickLarsenNZ): Hard-coded bucket From 78966daca715cc50d0351051459c1991e113c96d Mon Sep 17 00:00:00 2001 From: dervoeti Date: Fri, 13 Jun 2025 13:30:42 +0200 Subject: [PATCH 2/3] chore: changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4324b90f..0848c4d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ All notable changes to this project will be documented in this file. - The defaults from the docker images itself will now apply, which will be different from 1000/0 going forward - This is marked as breaking because tools and policies might exist, which require these fields to be set - Deprecate Trino 470 ([#755]). +- test: support custom versions ([#760]). ### Fixed @@ -52,6 +53,7 @@ All notable changes to this project will be documented in this file. [#748]: https://github.com/stackabletech/trino-operator/pull/748 [#752]: https://github.com/stackabletech/trino-operator/pull/752 [#755]: https://github.com/stackabletech/trino-operator/pull/755 +[#760]: https://github.com/stackabletech/trino-operator/pull/760 ## [25.3.0] - 2025-03-21 From 9a21a6f72c8e8cd0524e6a0aa7b75c3a89138d76 Mon Sep 17 00:00:00 2001 From: dervoeti Date: Fri, 13 Jun 2025 15:08:06 +0200 Subject: [PATCH 3/3] chore: python formatting --- tests/templates/kuttl/smoke/check-s3.py | 230 ++++++++++++++++---- tests/templates/kuttl/smoke_aws/check-s3.py | 2 +- 2 files changed, 187 insertions(+), 45 deletions(-) diff --git a/tests/templates/kuttl/smoke/check-s3.py b/tests/templates/kuttl/smoke/check-s3.py index f3c44c34..ff461c1c 100755 --- a/tests/templates/kuttl/smoke/check-s3.py +++ b/tests/templates/kuttl/smoke/check-s3.py @@ -10,7 +10,11 @@ def get_connection(username, password, namespace): - host = 'trino-coordinator-default-0.trino-coordinator-default.' + namespace + '.svc.cluster.local' + host = ( + "trino-coordinator-default-0.trino-coordinator-default." + + namespace + + ".svc.cluster.local" + ) # If you want to debug this locally use # kubectl -n kuttl-test-XXX port-forward svc/trino-coordinator-default 8443 # host = '127.0.0.1' @@ -19,7 +23,7 @@ def get_connection(username, password, namespace): host=host, port=8443, user=username, - http_scheme='https', + http_scheme="https", auth=trino.auth.BasicAuthentication(username, password), session_properties={"query_max_execution_time": "60s"}, ) @@ -34,11 +38,13 @@ def run_query(connection, query): return cursor.fetchall() -if __name__ == '__main__': +if __name__ == "__main__": # Construct an argument parser all_args = argparse.ArgumentParser() # Add arguments to the parser - all_args.add_argument("-n", "--namespace", required=True, help="Namespace the test is running in") + all_args.add_argument( + "-n", "--namespace", required=True, help="Namespace the test is running in" + ) args = vars(all_args.parse_args()) namespace = args["namespace"] @@ -46,17 +52,23 @@ def run_query(connection, query): print("Starting S3 tests...") connection = get_connection("admin", "admin", namespace) - trino_version = run_query(connection, "select node_version from system.runtime.nodes where coordinator = true and state = 'active'")[0][0] - print(f"[INFO] Testing against Trino version \"{trino_version}\"") + trino_version = run_query( + connection, + "select node_version from system.runtime.nodes where coordinator = true and state = 'active'", + )[0][0] + print(f'[INFO] Testing against Trino version "{trino_version}"') # Strip SDP release suffix from the version string - trino_product_version = re.split(r'-stackable', trino_version, maxsplit=1)[0] + trino_product_version = re.split(r"-stackable", trino_version, maxsplit=1)[0] assert len(trino_product_version) >= 3 assert trino_product_version.isnumeric() assert trino_version == run_query(connection, "select version()")[0][0] - run_query(connection, "CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')") + run_query( + connection, + "CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')", + ) run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data") run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data_copy") @@ -64,7 +76,9 @@ def run_query(connection, query): run_query(connection, "DROP TABLE IF EXISTS hive.hdfs.taxi_data_copy") run_query(connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg") - run_query(connection, """ + run_query( + connection, + """ CREATE TABLE IF NOT EXISTS hive.minio.taxi_data ( vendor_id VARCHAR, tpep_pickup_datetime VARCHAR, @@ -77,13 +91,24 @@ def run_query(connection, query): format = 'csv', skip_header_line_count = 1 ) - """) - assert run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data")[0][0] == 5000 - rows_written = run_query(connection, "CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data")[0][0] + """, + ) + assert ( + run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data")[0][0] == 5000 + ) + rows_written = run_query( + connection, + "CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data", + )[0][0] assert rows_written == 5000 or rows_written == 0 - assert run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_copy")[0][0] == 5000 + assert ( + run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_copy")[0][0] + == 5000 + ) - rows_written = run_query(connection, """ + rows_written = run_query( + connection, + """ CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_transformed AS SELECT CAST(vendor_id as BIGINT) as vendor_id, @@ -93,61 +118,178 @@ def run_query(connection, query): CAST(trip_distance as DOUBLE) as trip_distance, CAST(ratecode_id as BIGINT) as ratecode_id FROM hive.minio.taxi_data -""")[0][0] +""", + )[0][0] assert rows_written == 5000 or rows_written == 0 - assert run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_transformed")[0][0] == 5000 + assert ( + run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_transformed")[ + 0 + ][0] + == 5000 + ) print("[INFO] Testing HDFS") - run_query(connection, "CREATE SCHEMA IF NOT EXISTS hive.hdfs WITH (location = 'hdfs://hdfs/trino/')") - rows_written = run_query(connection, "CREATE TABLE IF NOT EXISTS hive.hdfs.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data")[0][0] + run_query( + connection, + "CREATE SCHEMA IF NOT EXISTS hive.hdfs WITH (location = 'hdfs://hdfs/trino/')", + ) + rows_written = run_query( + connection, + "CREATE TABLE IF NOT EXISTS hive.hdfs.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data", + )[0][0] assert rows_written == 5000 or rows_written == 0 - assert run_query(connection, "SELECT COUNT(*) FROM hive.hdfs.taxi_data_copy")[0][0] == 5000 + assert ( + run_query(connection, "SELECT COUNT(*) FROM hive.hdfs.taxi_data_copy")[0][0] + == 5000 + ) print("[INFO] Testing Iceberg") - run_query(connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg") # Clean up table to don't fail an second run - assert run_query(connection, """ + run_query( + connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg" + ) # Clean up table to don't fail an second run + assert ( + run_query( + connection, + """ CREATE TABLE IF NOT EXISTS iceberg.minio.taxi_data_copy_iceberg WITH (partitioning = ARRAY['vendor_id', 'passenger_count'], format = 'parquet') AS SELECT * FROM hive.minio.taxi_data -""")[0][0] == 5000 +""", + )[0][0] + == 5000 + ) # Check current count - assert run_query(connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg")[0][0] == 5000 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"')[0][0] == 1 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"')[0][0] == 12 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"')[0][0] == 12 + assert ( + run_query( + connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" + )[0][0] + == 5000 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"', + )[0][0] + == 1 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"', + )[0][0] + == 12 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"', + )[0][0] + == 12 + ) - assert run_query(connection, "INSERT INTO iceberg.minio.taxi_data_copy_iceberg SELECT * FROM hive.minio.taxi_data")[0][0] == 5000 + assert ( + run_query( + connection, + "INSERT INTO iceberg.minio.taxi_data_copy_iceberg SELECT * FROM hive.minio.taxi_data", + )[0][0] + == 5000 + ) # Check current count - assert run_query(connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg")[0][0] == 10000 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"')[0][0] == 2 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"')[0][0] == 12 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"')[0][0] == 24 + assert ( + run_query( + connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" + )[0][0] + == 10000 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"', + )[0][0] + == 2 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"', + )[0][0] + == 12 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"', + )[0][0] + == 24 + ) - if trino_version == '377': + if trino_version == "377": # io.trino.spi.TrinoException: This connector [iceberg] does not support versioned tables - print("[INFO] Skipping the Iceberg tests reading versioned tables for trino version 377 as it does not support versioned tables") + print( + "[INFO] Skipping the Iceberg tests reading versioned tables for trino version 377 as it does not support versioned tables" + ) else: # Check count for first snapshot - first_snapshot = run_query(connection, 'select snapshot_id from iceberg.minio."taxi_data_copy_iceberg$snapshots" order by committed_at limit 1')[0][0] - assert run_query(connection, f"SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg FOR VERSION AS OF {first_snapshot}")[0][0] == 5000 + first_snapshot = run_query( + connection, + 'select snapshot_id from iceberg.minio."taxi_data_copy_iceberg$snapshots" order by committed_at limit 1', + )[0][0] + assert ( + run_query( + connection, + f"SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg FOR VERSION AS OF {first_snapshot}", + )[0][0] + == 5000 + ) # Compact files - run_query(connection, "ALTER TABLE iceberg.minio.taxi_data_copy_iceberg EXECUTE optimize") + run_query( + connection, "ALTER TABLE iceberg.minio.taxi_data_copy_iceberg EXECUTE optimize" + ) # Check current count - assert run_query(connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg")[0][0] == 10000 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"')[0][0] == 3 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"')[0][0] == 12 - assert run_query(connection, 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"')[0][0] == 12 # Compaction yeah :) + assert ( + run_query( + connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" + )[0][0] + == 10000 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"', + )[0][0] + == 3 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"', + )[0][0] + == 12 + ) + assert ( + run_query( + connection, + 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"', + )[0][0] + == 12 + ) # Compaction yeah :) # Test could be improved by also testing update and deletes # Test postgres connection - run_query(connection, 'SHOW SCHEMAS IN postgresgeneric') - run_query(connection, 'CREATE SCHEMA IF NOT EXISTS postgresgeneric.tpch') - run_query(connection, 'CREATE TABLE IF NOT EXISTS postgresgeneric.tpch.nation AS SELECT * FROM tpch.tiny.nation') - assert run_query(connection, "SELECT COUNT(*) FROM postgresgeneric.tpch.nation")[0][0] == 25 + run_query(connection, "SHOW SCHEMAS IN postgresgeneric") + run_query(connection, "CREATE SCHEMA IF NOT EXISTS postgresgeneric.tpch") + run_query( + connection, + "CREATE TABLE IF NOT EXISTS postgresgeneric.tpch.nation AS SELECT * FROM tpch.tiny.nation", + ) + assert ( + run_query(connection, "SELECT COUNT(*) FROM postgresgeneric.tpch.nation")[0][0] + == 25 + ) print("[SUCCESS] All tests in check-s3.py succeeded!") diff --git a/tests/templates/kuttl/smoke_aws/check-s3.py b/tests/templates/kuttl/smoke_aws/check-s3.py index e624a9a5..296d99d4 100755 --- a/tests/templates/kuttl/smoke_aws/check-s3.py +++ b/tests/templates/kuttl/smoke_aws/check-s3.py @@ -59,7 +59,7 @@ def run_query(connection, query): print(f'[INFO] Testing against Trino version "{trino_version}"') # Strip SDP release suffix from the version string - trino_product_version = re.split(r'-stackable', trino_version, maxsplit=1)[0] + trino_product_version = re.split(r"-stackable", trino_version, maxsplit=1)[0] assert len(trino_product_version) >= 3 assert trino_product_version.isnumeric()