Skip to content

Commit 6436d4f

Browse files
committed
feat(Runner): add support for triggering metric value threshold tasks
1 parent 5cfc938 commit 6436d4f

File tree

1 file changed

+39
-3
lines changed

1 file changed

+39
-3
lines changed

silverback/runner.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import asyncio
2+
import operator
23
import signal
34
import sys
45
from abc import ABC, abstractmethod
5-
from datetime import timedelta
6-
from typing import Any, Coroutine, Type
6+
from collections import defaultdict
7+
from datetime import datetime, timedelta
8+
from decimal import Decimal
9+
from typing import Any, Callable, Coroutine, Type
710

811
import pycron # type: ignore[import-untyped]
912
import quattro
@@ -34,7 +37,7 @@
3437
from .main import SilverbackBot, TaskData
3538
from .recorder import BaseRecorder, TaskResult
3639
from .state import Datastore, StateSnapshot
37-
from .types import TaskType, utc_now
40+
from .types import Datapoint, ScalarDatapoint, ScalarType, TaskType, utc_now
3841
from .utils import async_wrap_iter, clean_hexbytes_dict, decode_topics_from_string
3942

4043
if sys.version_info < (3, 11):
@@ -57,6 +60,9 @@ def __init__(
5760
# TODO: Allow configuring datastore class
5861
self.datastore = Datastore()
5962
self.recorder = recorder
63+
self.metric_handlers: dict[str, list[Callable[[Datapoint, datetime], Coroutine]]] = (
64+
defaultdict(list)
65+
)
6066

6167
self.max_exceptions = max_exceptions
6268
self.exceptions = 0
@@ -104,6 +110,11 @@ async def run_task(self, task_data: TaskData, *args, raise_on_error: bool = Fals
104110
): # Display metrics in logs to help debug
105111
logger.info(f"{task_data.name} - Metrics collected\n {metrics_str}")
106112

113+
# Trigger checks for metric values
114+
for metric_name, datapoint in result.metrics.items():
115+
for handler in self.metric_handlers[metric_name]:
116+
self._runtime_task_group.create_task(handler(datapoint, result.completed))
117+
107118
if self.recorder: # Recorder configured to record
108119
await self.recorder.add_result(result)
109120

@@ -156,6 +167,25 @@ async def _cron_tasks(self, cron_tasks: list[TaskData]):
156167
# NOTE: Run this every minute (just in case of an unhandled shutdown)
157168
self._runtime_task_group.create_task(self._checkpoint())
158169

170+
async def _metric_task(self, task_data: TaskData) -> None:
171+
metric_name = task_data.labels["metric"]
172+
value_thresholds = {
173+
op: Decimal(val) # NOTE: Decimal is most flexible at handling strings
174+
for lbl, val in task_data.labels.items()
175+
if lbl.startswith("value:") and (op := getattr(operator, lbl.lstrip("value:"), None))
176+
}
177+
178+
def exceeds_value_threshold(data: ScalarType) -> bool:
179+
return all(op(Decimal(data), value) for op, value in value_thresholds.items())
180+
181+
async def check_value(datapoint: Datapoint, updated: datetime):
182+
if isinstance(datapoint, ScalarDatapoint) and exceeds_value_threshold(datapoint.data):
183+
self._runtime_task_group.create_task(self.run_task(task_data, datapoint.data))
184+
185+
self.metric_handlers[metric_name].append(check_value)
186+
187+
# TODO: Support rate threshold checks?
188+
159189
@abstractmethod
160190
async def _block_task(self, task_data: TaskData) -> None:
161191
"""
@@ -260,10 +290,15 @@ async def startup(self) -> list[Coroutine]:
260290
TaskType.SYSTEM_USER_TASKDATA, TaskType.EVENT_LOG
261291
)
262292

293+
metric_value_tasks_taskdata = await self.run_system_task(
294+
TaskType.SYSTEM_USER_TASKDATA, TaskType.METRIC_VALUE
295+
)
296+
263297
if (
264298
len(cron_tasks_taskdata)
265299
== len(new_block_tasks_taskdata)
266300
== len(event_log_tasks_taskdata)
301+
# NOTE: Skip metric value tasks, because they require other tasks to function
267302
== 0
268303
):
269304
raise NoTasksAvailableError()
@@ -272,6 +307,7 @@ async def startup(self) -> list[Coroutine]:
272307
self._cron_tasks(cron_tasks_taskdata),
273308
*map(self._block_task, new_block_tasks_taskdata),
274309
*map(self._event_task, event_log_tasks_taskdata),
310+
*map(self._metric_task, metric_value_tasks_taskdata),
275311
]
276312

277313
def _cleanup_tasks(self) -> list[Coroutine]:

0 commit comments

Comments
 (0)