Skip to content

Commit fa7458a

Browse files
committed
feat: support cronjob tasks
1 parent d57b91a commit fa7458a

File tree

4 files changed

+63
-2
lines changed

4 files changed

+63
-2
lines changed

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
"eth-pydantic-types", # Use same version as eth-ape
7070
"exceptiongroup; python_version < '3.11'", # Used with TaskGroup
7171
"packaging", # Use same version as eth-ape
72+
"pycron>=3.1,<4", # Checking/triggering cron tasks
7273
"pydantic_settings", # Use same version as eth-ape
7374
"quattro>=25.2,<26", # Manage task groups and background tasks
7475
"taskiq[metrics]>=0.11.16,<0.12",

silverback/main.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import atexit
22
import inspect
33
from collections import defaultdict
4-
from datetime import timedelta
4+
from datetime import datetime, timedelta
55
from functools import wraps
66
from types import MethodType
77
from typing import Any, Awaitable, Callable
88

9+
import pycron # type: ignore[import-untyped]
910
from ape.api.networks import LOCAL_NETWORK_NAME
1011
from ape.contracts import ContractEvent, ContractEventWrapper, ContractInstance
1112
from ape.logging import logger
@@ -300,6 +301,7 @@ def broker_task_decorator(
300301
self,
301302
task_type: TaskType,
302303
container: BlockContainer | ContractEvent | ContractEventWrapper | None = None,
304+
cron_schedule: str | None = None,
303305
) -> Callable[[Callable], AsyncTaskiqDecoratedTask]:
304306
"""
305307
Dynamically create a new broker task that handles tasks of ``task_type``.
@@ -365,6 +367,17 @@ def add_taskiq_task(
365367
labels["contract_address"] = contract_address
366368
labels["event_signature"] = container.abi.signature
367369

370+
elif task_type is TaskType.CRON_JOB:
371+
# NOTE: If cron schedule has never been true over a year timeframe, it's bad
372+
if not cron_schedule or not pycron.has_been(
373+
cron_schedule, datetime.now() - timedelta(days=366)
374+
):
375+
raise InvalidContainerTypeError(
376+
f"'{cron_schedule}' is not a valid cron schedule"
377+
)
378+
379+
labels["cron"] = cron_schedule
380+
368381
self.tasks[task_type].append(TaskData(name=handler.__name__, labels=labels))
369382

370383
if self.use_fork:
@@ -498,3 +511,12 @@ def on_(
498511
# TODO: Support account transaction polling
499512
# TODO: Support mempool polling?
500513
raise InvalidContainerTypeError(container)
514+
515+
def cron(self, cron_schedule: str) -> Callable:
516+
"""
517+
Create task to run on a schedule.
518+
519+
Args:
520+
cron_schedule (str): A cron-like schedule string.
521+
"""
522+
return self.broker_task_decorator(TaskType.CRON_JOB, cron_schedule=cron_schedule)

silverback/runner.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
import signal
33
import sys
44
from abc import ABC, abstractmethod
5+
from datetime import timedelta
56
from typing import Any, Coroutine, Type
67

8+
import pycron # type: ignore[import-untyped]
79
import quattro
810
from ape import chain
911
from ape.logging import logger
@@ -33,7 +35,7 @@
3335
from .main import SilverbackBot, TaskData
3436
from .recorder import BaseRecorder, TaskResult
3537
from .state import Datastore, StateSnapshot
36-
from .types import TaskType
38+
from .types import TaskType, utc_now
3739
from .utils import async_wrap_iter
3840

3941
if sys.version_info < (3, 11):
@@ -123,6 +125,31 @@ async def _checkpoint(
123125
):
124126
await self.datastore.save(snapshot)
125127

128+
async def _cron_tasks(self, cron_tasks: list[TaskData]):
129+
"""
130+
Handle all cron tasks
131+
"""
132+
133+
while True:
134+
# NOTE: Sleep until next exact time boundary (every minute)
135+
current_time = utc_now()
136+
wait_time = timedelta(
137+
seconds=60 - 1 - current_time.second,
138+
microseconds=int(1e6) - current_time.microsecond,
139+
)
140+
await asyncio.sleep(wait_time.total_seconds())
141+
current_time += wait_time
142+
143+
for task_data in cron_tasks:
144+
if not (cron := task_data.labels.get("cron")):
145+
logger.warning(f"Cron task missing `cron` label: '{task_data.name}'")
146+
continue
147+
148+
if pycron.is_now(cron, dt=current_time):
149+
self._runtime_task_group.create_task(self.run_task(task_data, current_time))
150+
151+
# NOTE: TaskGroup waits for all tasks to complete before continuing
152+
126153
@abstractmethod
127154
async def _block_task(self, task_data: TaskData) -> None:
128155
"""
@@ -209,6 +236,13 @@ async def startup(self) -> list[Coroutine]:
209236
# NOTE: No need to handle results otherwise
210237

211238
# Create our long-running event listeners
239+
cron_tasks_taskdata = (
240+
await self.run_system_task(TaskType.SYSTEM_USER_TASKDATA, TaskType.CRON_JOB)
241+
if Version(config.sdk_version) >= Version("0.7.15")
242+
# NOTE: Not supported in prior versions
243+
else []
244+
)
245+
212246
new_block_tasks_taskdata = await self.run_system_task(
213247
TaskType.SYSTEM_USER_TASKDATA, TaskType.NEW_BLOCK
214248
)
@@ -309,6 +343,9 @@ async def wait_for_graceful_shutdown():
309343

310344
try:
311345
async with quattro.TaskGroup() as tg:
346+
# NOTE: Our runtime tasks can use this to spawn more tasks
347+
self._runtime_task_group = tg
348+
312349
# NOTE: User tasks that should run forever
313350
for coro in user_tasks:
314351
tg.create_task(coro)

silverback/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class TaskType(str, Enum):
2121

2222
# User-accessible Tasks
2323
STARTUP = "user:startup"
24+
CRON_JOB = "user:cron-job"
2425
NEW_BLOCK = "user:new-block"
2526
EVENT_LOG = "user:event-log"
2627
SHUTDOWN = "user:shutdown"

0 commit comments

Comments
 (0)