|
23 | 23 | from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError, NoSignerLoaded
|
24 | 24 | from .settings import Settings
|
25 | 25 | from .state import StateSnapshot
|
26 |
| -from .types import SilverbackID, TaskType |
| 26 | +from .types import ScalarType, SilverbackID, TaskType |
27 | 27 | from .utils import encode_topics_to_string, parse_hexbytes_dict
|
28 | 28 |
|
29 | 29 |
|
@@ -377,8 +377,10 @@ def broker_task_decorator(
|
377 | 377 | self,
|
378 | 378 | task_type: TaskType,
|
379 | 379 | container: BlockContainer | ContractEvent | ContractEventWrapper | None = None,
|
380 |
| - cron_schedule: str | None = None, |
381 | 380 | filter_args: dict[str, Any] | None = None,
|
| 381 | + cron_schedule: str | None = None, |
| 382 | + metric_name: str | None = None, |
| 383 | + value_threshold: dict[str, ScalarType] | None = None, |
382 | 384 | ) -> Callable[[Callable], AsyncTaskiqDecoratedTask]:
|
383 | 385 | """
|
384 | 386 | Dynamically create a new broker task that handles tasks of ``task_type``.
|
@@ -483,6 +485,16 @@ def add_taskiq_task(
|
483 | 485 |
|
484 | 486 | labels["cron"] = cron_schedule
|
485 | 487 |
|
| 488 | + elif task_type is TaskType.METRIC_VALUE: |
| 489 | + # NOTE: This shouldn't happen to users |
| 490 | + assert metric_name, "Must supply `metric_name=`." |
| 491 | + labels["metric"] = metric_name |
| 492 | + |
| 493 | + if value_threshold: |
| 494 | + labels.update( |
| 495 | + {f"value:{lbl}": str(val) for lbl, val in value_threshold.items()} |
| 496 | + ) |
| 497 | + |
486 | 498 | self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels))
|
487 | 499 |
|
488 | 500 | if self.use_fork:
|
@@ -633,3 +645,55 @@ def cron(self, cron_schedule: str) -> Callable:
|
633 | 645 | cron_schedule (str): A cron-like schedule string.
|
634 | 646 | """
|
635 | 647 | return self.broker_task_decorator(TaskType.CRON_JOB, cron_schedule=cron_schedule)
|
| 648 | + |
| 649 | + def on_metric( |
| 650 | + self, |
| 651 | + metric_name: str, |
| 652 | + ge: ScalarType | None = None, |
| 653 | + gt: ScalarType | None = None, |
| 654 | + le: ScalarType | None = None, |
| 655 | + lt: ScalarType | None = None, |
| 656 | + eq: ScalarType | None = None, |
| 657 | + ne: ScalarType | None = None, |
| 658 | + # TODO: Support `rate_[ge|gt|le|...]` too? |
| 659 | + ) -> Callable: |
| 660 | + """ |
| 661 | + Create a task that runs when the value of a specified metric has tripped a threshold. |
| 662 | +
|
| 663 | + ```{notice} |
| 664 | + If no keyword args provided to this decorator, it will trigger on every update of metric. |
| 665 | + ``` |
| 666 | +
|
| 667 | + ```{notice} |
| 668 | + Multiple keyword args provided to this decorator means logical AND of all of them. |
| 669 | + ``` |
| 670 | +
|
| 671 | + Args: |
| 672 | + metric_name (str): The name of the metric to monitor for threshold exceedence. |
| 673 | + ge (ScalarType | None): trigger when metric value is greater than or equal to value. |
| 674 | + gt (ScalarType | None): trigger when metric value is greater than value. |
| 675 | + le (ScalarType | None): trigger when metric value is less than or equal to value. |
| 676 | + lt (ScalarType | None): trigger when metric value is less than value. |
| 677 | + eq (ScalarType | None): trigger when metric value is equal to value. |
| 678 | + ne (ScalarType | None): trigger when metric value is not equal to value. |
| 679 | + """ |
| 680 | + value_threshold: dict[str, ScalarType] = {} |
| 681 | + if ge is not None: |
| 682 | + value_threshold["ge"] = ge |
| 683 | + if gt is not None: |
| 684 | + value_threshold["gt"] = gt |
| 685 | + if le is not None: |
| 686 | + value_threshold["le"] = le |
| 687 | + if lt is not None: |
| 688 | + value_threshold["lt"] = lt |
| 689 | + if eq is not None: |
| 690 | + value_threshold["eq"] = eq |
| 691 | + if ne is not None: |
| 692 | + value_threshold["ne"] = ne |
| 693 | + |
| 694 | + return self.broker_task_decorator( |
| 695 | + TaskType.METRIC_VALUE, |
| 696 | + metric_name=metric_name, |
| 697 | + # NOTE: When empty, allow all values |
| 698 | + value_threshold=value_threshold or None, |
| 699 | + ) |
0 commit comments