Skip to content

Add the ability to cancel UDFs by client disconnect or timeout #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions .github/workflows/code-check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
name: Coverage tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:

jobs:
test-coverage:
runs-on: ubuntu-latest
environment: Base

services:
singlestore:
image: ghcr.io/singlestore-labs/singlestoredb-dev:latest
ports:
- 3307:3306
- 8081:8080
- 9081:9081
env:
SINGLESTORE_LICENSE: ${{ secrets.SINGLESTORE_LICENSE }}
ROOT_PASSWORD: "root"

steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r test-requirements.txt

- name: Install SingleStore package
run: |
pip install .

- name: Check for changes in monitored directories
id: check-changes
run: |
# Define directories to monitor (space-separated)
MONITORED_DIRS="singlestoredb/management singlestoredb/fusion"

# Determine the base commit to compare against
if [ "${{ github.event_name }}" == "pull_request" ]; then
# For PRs, compare against the target branch (usually master/main)
BASE_COMMIT="origin/${{ github.event.pull_request.base.ref }}"
echo "Pull Request: Comparing against $BASE_COMMIT"
elif [ "${{ github.ref_name }}" == "main" ] || [ "${{ github.ref_name }}" == "master" ]; then
# For pushes to main/master, compare against previous commit
BASE_COMMIT="HEAD~1"
echo "Push to main/master: Comparing against $BASE_COMMIT"
else:
# For pushes to other branches, compare against master/main
if git rev-parse --verify origin/main >/dev/null 2>&1; then
BASE_COMMIT="origin/main"
echo "Push to branch: Comparing against origin/main"
elif git rev-parse --verify origin/master >/dev/null 2>&1; then
BASE_COMMIT="origin/master"
echo "Push to branch: Comparing against origin/master"
else
# Fallback to previous commit if master/main not found
BASE_COMMIT="HEAD~1"
echo "Fallback: Comparing against HEAD~1"
fi
fi

echo "Checking for changes in: $MONITORED_DIRS"
echo "Comparing against: $BASE_COMMIT"

# Check for any changes in monitored directories
CHANGES_FOUND=false
CHANGED_DIRS=""

for DIR in $MONITORED_DIRS; do
if [ -d "$DIR" ]; then
CHANGED_FILES=$(git diff --name-only $BASE_COMMIT HEAD -- "$DIR" || true)
if [ -n "$CHANGED_FILES" ]; then
echo "✅ Changes detected in: $DIR"
echo "Files changed:"
echo "$CHANGED_FILES" | sed 's/^/ - /'
CHANGES_FOUND=true
if [ -z "$CHANGED_DIRS" ]; then
CHANGED_DIRS="$DIR"
else
CHANGED_DIRS="$CHANGED_DIRS,$DIR"
fi
else
echo "❌ No changes in: $DIR"
fi
else
echo "⚠️ Directory not found: $DIR"
fi
done

# Set outputs
if [ "$CHANGES_FOUND" = true ]; then
echo "changes-detected=true" >> $GITHUB_OUTPUT
echo "changed-directories=$CHANGED_DIRS" >> $GITHUB_OUTPUT
echo ""
echo "🎯 RESULT: Changes detected in monitored directories"
else
echo "changes-detected=false" >> $GITHUB_OUTPUT
echo "changed-directories=" >> $GITHUB_OUTPUT
echo ""
echo "🎯 RESULT: No changes in monitored directories"
fi

- name: Run MySQL protocol tests (with management API)
if: steps.check-changes.outputs.changes-detected == 'true'
run: |
pytest -v --cov=singlestoredb --pyargs singlestoredb.tests
env:
COVERAGE_FILE: "coverage-mysql.cov"
SINGLESTOREDB_URL: "root:root@127.0.0.1:3307"
SINGLESTOREDB_PURE_PYTHON: 0
SINGLESTORE_LICENSE: ${{ secrets.SINGLESTORE_LICENSE }}
SINGLESTOREDB_MANAGEMENT_TOKEN: ${{ secrets.CLUSTER_API_KEY }}
SINGLESTOREDB_FUSION_ENABLE_HIDDEN: "1"

- name: Run MySQL protocol tests (without management API)
if: steps.check-changes.outputs.changes-detected == 'false'
run: |
pytest -v -m 'not management' --cov=singlestoredb --pyargs singlestoredb.tests
env:
COVERAGE_FILE: "coverage-mysql.cov"
SINGLESTOREDB_URL: "root:root@127.0.0.1:3307"
SINGLESTOREDB_PURE_PYTHON: 0
SINGLESTORE_LICENSE: ${{ secrets.SINGLESTORE_LICENSE }}
SINGLESTOREDB_MANAGEMENT_TOKEN: ${{ secrets.CLUSTER_API_KEY }}
SINGLESTOREDB_FUSION_ENABLE_HIDDEN: "1"

- name: Run MySQL protocol tests (pure Python)
run: |
pytest -v -m 'not management' --cov=singlestoredb --pyargs singlestoredb.tests
env:
COVERAGE_FILE: "coverage-mysql-py.cov"
SINGLESTOREDB_URL: "root:root@127.0.0.1:3307"
SINGLESTOREDB_PURE_PYTHON: 1
SINGLESTORE_LICENSE: ${{ secrets.SINGLESTORE_LICENSE }}
SINGLESTOREDB_MANAGEMENT_TOKEN: ${{ secrets.CLUSTER_API_KEY }}
SINGLESTOREDB_FUSION_ENABLE_HIDDEN: "1"

- name: Run HTTP protocol tests
run: |
pytest -v -m 'not management' --cov=singlestoredb --pyargs singlestoredb.tests
env:
COVERAGE_FILE: "coverage-http.cov"
SINGLESTOREDB_URL: "http://root:root@127.0.0.1:9081"
SINGLESTORE_LICENSE: ${{ secrets.SINGLESTORE_LICENSE }}
SINGLESTOREDB_MANAGEMENT_TOKEN: ${{ secrets.CLUSTER_API_KEY }}
# Can not change databases using HTTP API. The URL below will be
# used to create the database and the generated database name will
# be applied to the above URL.
SINGLESTOREDB_INIT_DB_URL: "root:root@127.0.0.1:3307"
SINGLESTOREDB_FUSION_ENABLE_HIDDEN: "1"

- name: Generate report
run: |
coverage combine coverage-mysql.cov coverage-http.cov coverage-mysql-py.cov
coverage report
coverage xml
coverage html
10 changes: 5 additions & 5 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
name: Coverage tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
schedule:
- cron: "0 1 * * *"
workflow_dispatch:

jobs:
Expand All @@ -24,7 +22,9 @@ jobs:
ROOT_PASSWORD: "root"

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Python
uses: actions/setup-python@v4
Expand Down
5 changes: 5 additions & 0 deletions singlestoredb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@
environ=['SINGLESTOREDB_EXT_FUNC_PORT'],
)

register_option(
'external_function.timeout', 'int', check_int, 24*60*60,
'Specifies the timeout in seconds for processing a batch of rows.',
environ=['SINGLESTOREDB_EXT_FUNC_TIMEOUT'],
)

#
# Debugging options
Expand Down
45 changes: 32 additions & 13 deletions singlestoredb/functions/decorator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import functools
import inspect
from typing import Any
Expand All @@ -19,6 +20,7 @@
]

ReturnType = ParameterType
UDFType = Callable[..., Any]


def is_valid_type(obj: Any) -> bool:
Expand Down Expand Up @@ -100,38 +102,50 @@ def _func(
name: Optional[str] = None,
args: Optional[ParameterType] = None,
returns: Optional[ReturnType] = None,
) -> Callable[..., Any]:
timeout: Optional[int] = None,
) -> UDFType:
"""Generic wrapper for UDF and TVF decorators."""

_singlestoredb_attrs = { # type: ignore
k: v for k, v in dict(
name=name,
args=expand_types(args),
returns=expand_types(returns),
timeout=timeout,
).items() if v is not None
}

# No func was specified, this is an uncalled decorator that will get
# called later, so the wrapper much be created with the func passed
# in at that time.
if func is None:
def decorate(func: Callable[..., Any]) -> Callable[..., Any]:
def decorate(func: UDFType) -> UDFType:

def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
return func(*args, **kwargs) # type: ignore
if asyncio.iscoroutinefunction(func):
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
return await func(*args, **kwargs) # type: ignore
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(async_wrapper)

wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore

return functools.wraps(func)(wrapper)
else:
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
return func(*args, **kwargs) # type: ignore
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(wrapper)

return decorate

def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
return func(*args, **kwargs) # type: ignore

wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
if asyncio.iscoroutinefunction(func):
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
return await func(*args, **kwargs) # type: ignore
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(async_wrapper)

return functools.wraps(func)(wrapper)
else:
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
return func(*args, **kwargs) # type: ignore
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(wrapper)


def udf(
Expand All @@ -140,7 +154,8 @@ def udf(
name: Optional[str] = None,
args: Optional[ParameterType] = None,
returns: Optional[ReturnType] = None,
) -> Callable[..., Any]:
timeout: Optional[int] = None,
) -> UDFType:
"""
Define a user-defined function (UDF).

Expand All @@ -167,6 +182,9 @@ def udf(
Specifies the return data type of the function. This parameter
works the same way as `args`. If the function is a table-valued
function, the return type should be a `Table` object.
timeout : int, optional
The timeout in seconds for the UDF execution. If not specified,
the global default timeout is used.

Returns
-------
Expand All @@ -178,4 +196,5 @@ def udf(
name=name,
args=args,
returns=returns,
timeout=timeout,
)
Loading