Skip to content

Commit d57b91a

Browse files
committed
refactor(Runner): just use coroutines directly as tasks, don't indirect
1 parent d24fc6a commit d57b91a

File tree

1 file changed

+31
-40
lines changed

1 file changed

+31
-40
lines changed

silverback/runner.py

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,15 @@ async def _checkpoint(
124124
await self.datastore.save(snapshot)
125125

126126
@abstractmethod
127-
async def _block_task(self, task_data: TaskData) -> Coroutine | None:
127+
async def _block_task(self, task_data: TaskData) -> None:
128128
"""
129-
Handle a block_handler task
129+
Set up a task block_handler task
130130
"""
131131

132132
@abstractmethod
133-
async def _event_task(self, task_data: TaskData) -> Coroutine | None:
133+
async def _event_task(self, task_data: TaskData) -> None:
134134
"""
135-
Handle an event handler task for the given contract event
135+
Set up a task for the given contract event
136136
"""
137137

138138
async def startup(self) -> list[Coroutine]:
@@ -221,12 +221,9 @@ async def startup(self) -> list[Coroutine]:
221221
raise NoTasksAvailableError()
222222

223223
return [
224-
task
225-
for task in await quattro.gather(
226-
*map(self._block_task, new_block_tasks_taskdata),
227-
*map(self._event_task, event_log_tasks_taskdata),
228-
)
229-
if task is not None
224+
self._cron_tasks(cron_tasks_taskdata),
225+
*map(self._block_task, new_block_tasks_taskdata),
226+
*map(self._event_task, event_log_tasks_taskdata),
230227
]
231228

232229
def _cleanup_tasks(self) -> list[Coroutine]:
@@ -347,7 +344,7 @@ def __init__(self, bot: SilverbackBot, *args, **kwargs):
347344

348345
self.ws_uri = ws_uri
349346

350-
async def _block_task(self, task_data: TaskData) -> None:
347+
async def _block_task(self, task_data: TaskData):
351348
new_block_task_kicker = self.get_task(task_data.name)
352349

353350
async def block_handler(ctx: NewHeadsSubscriptionContext):
@@ -361,7 +358,7 @@ async def block_handler(ctx: NewHeadsSubscriptionContext):
361358
)
362359
logger.debug(f"Handling blocks via {sub_id}")
363360

364-
async def _event_task(self, task_data: TaskData) -> None:
361+
async def _event_task(self, task_data: TaskData):
365362
if not (contract_address := task_data.labels.get("contract_address")):
366363
raise StartupFailure("Contract instance required.")
367364

@@ -420,7 +417,7 @@ def __init__(self, bot: SilverbackBot, *args, **kwargs):
420417
"Do not use in production over long time periods unless you know what you're doing."
421418
)
422419

423-
async def _block_task(self, task_data: TaskData) -> Coroutine:
420+
async def _block_task(self, task_data: TaskData):
424421
new_block_task_kicker = self.get_task(task_data.name)
425422

426423
if block_settings := self.bot.poll_settings.get("_blocks_"):
@@ -432,20 +429,17 @@ async def _block_task(self, task_data: TaskData) -> Coroutine:
432429
new_block_timeout if new_block_timeout is not None else self.bot.new_block_timeout
433430
)
434431

435-
async def block_handler() -> None:
436-
async for block in async_wrap_iter(
437-
chain.blocks.poll_blocks(
438-
# NOTE: No start block because we should begin polling from head
439-
new_block_timeout=new_block_timeout,
440-
)
441-
):
442-
await self._checkpoint(last_block_seen=block.number)
443-
await self._handle_task(await new_block_task_kicker.kiq(block))
444-
await self._checkpoint(last_block_processed=block.number)
445-
446-
return block_handler()
432+
async for block in async_wrap_iter(
433+
chain.blocks.poll_blocks(
434+
# NOTE: No start block because we should begin polling from head
435+
new_block_timeout=new_block_timeout,
436+
)
437+
):
438+
await self._checkpoint(last_block_seen=block.number)
439+
await self._handle_task(await new_block_task_kicker.kiq(block))
440+
await self._checkpoint(last_block_processed=block.number)
447441

448-
async def _event_task(self, task_data: TaskData) -> Coroutine:
442+
async def _event_task(self, task_data: TaskData):
449443
if not (contract_address := task_data.labels.get("contract_address")):
450444
raise StartupFailure("Contract instance required.")
451445

@@ -465,17 +459,14 @@ async def _event_task(self, task_data: TaskData) -> Coroutine:
465459
new_block_timeout if new_block_timeout is not None else self.bot.new_block_timeout
466460
)
467461

468-
async def event_handler() -> None:
469-
async for event in async_wrap_iter(
470-
self.provider.poll_logs(
471-
# NOTE: No start block because we should begin polling from head
472-
address=contract_address,
473-
new_block_timeout=new_block_timeout,
474-
events=[event_abi],
475-
)
476-
):
477-
await self._checkpoint(last_block_seen=event.block_number)
478-
await self._handle_task(await event_log_task_kicker.kiq(event))
479-
await self._checkpoint(last_block_processed=event.block_number)
480-
481-
return event_handler()
462+
async for event in async_wrap_iter(
463+
self.provider.poll_logs(
464+
# NOTE: No start block because we should begin polling from head
465+
address=contract_address,
466+
new_block_timeout=new_block_timeout,
467+
events=[event_abi],
468+
)
469+
):
470+
await self._checkpoint(last_block_seen=event.block_number)
471+
await self._handle_task(await event_log_task_kicker.kiq(event))
472+
await self._checkpoint(last_block_processed=event.block_number)

0 commit comments

Comments
 (0)