|
11 | 11 | from ape.contracts import ContractEvent, ContractEventWrapper, ContractInstance
|
12 | 12 | from ape.logging import logger
|
13 | 13 | from ape.managers.chain import BlockContainer
|
14 |
| -from ape.types import ContractLog |
| 14 | +from ape.types import AddressType, ContractLog |
15 | 15 | from ape.utils import ManagerAccessMixin
|
| 16 | +from eth_typing import HexStr |
16 | 17 | from eth_utils import keccak, to_hex
|
| 18 | +from ethpm_types.abi import encode_topic_value |
17 | 19 | from packaging.version import Version
|
18 | 20 | from pydantic import BaseModel
|
19 | 21 | from taskiq import AsyncTaskiqDecoratedTask, TaskiqEvents
|
@@ -348,11 +350,33 @@ async def fork_handler(*args, **kwargs):
|
348 | 350 |
|
349 | 351 | return fork_handler
|
350 | 352 |
|
| 353 | + def _convert_arg_to_hexstr(self, arg_value: Any, arg_type: str) -> HexStr | list[HexStr] | None: |
| 354 | + python_type: Any |
| 355 | + if "int" in arg_type: |
| 356 | + python_type = int |
| 357 | + elif "bytes" in arg_type: |
| 358 | + python_type = bytes |
| 359 | + elif arg_type == "address": |
| 360 | + python_type = AddressType |
| 361 | + elif arg_type == "string": |
| 362 | + python_type = str |
| 363 | + else: |
| 364 | + raise ValueError(f"Unable to support ABI Type '{arg_type}'.") |
| 365 | + |
| 366 | + if isinstance(arg_value, list): |
| 367 | + arg_value = [self.conversion_manager.convert(v, python_type) for v in arg_value] |
| 368 | + |
| 369 | + else: |
| 370 | + arg_value = self.conversion_manager.convert(arg_value, python_type) |
| 371 | + |
| 372 | + return encode_topic_value(arg_type, arg_value) # type: ignore[return-value] |
| 373 | + |
351 | 374 | def broker_task_decorator(
|
352 | 375 | self,
|
353 | 376 | task_type: TaskType,
|
354 | 377 | container: BlockContainer | ContractEvent | ContractEventWrapper | None = None,
|
355 | 378 | cron_schedule: str | None = None,
|
| 379 | + filter_args: dict[str, Any] | None = None, |
356 | 380 | ) -> Callable[[Callable], AsyncTaskiqDecoratedTask]:
|
357 | 381 | """
|
358 | 382 | Dynamically create a new broker task that handles tasks of ``task_type``.
|
@@ -411,17 +435,36 @@ def add_taskiq_task(
|
411 | 435 | elif task_type is TaskType.EVENT_LOG:
|
412 | 436 | assert container is not None and isinstance(container, ContractEvent)
|
413 | 437 | # NOTE: allows broad capture filters (matching multiple addresses)
|
414 |
| - if contract_address := getattr(container.contract, "address", None): |
415 |
| - labels["address"] = contract_address |
| 438 | + if contract := getattr(container, "contract", None): |
| 439 | + labels["address"] = contract.address |
| 440 | + |
416 | 441 | labels["event"] = container.abi.signature
|
417 |
| - labels["topics"] = encode_topics_to_string( |
418 |
| - [ |
419 |
| - # Topic 0: event_id |
420 |
| - to_hex(keccak(text=container.abi.selector)), |
421 |
| - # Topic 1-4: event args ([..., ...] represent OR) |
422 |
| - # TODO: Add filter args |
423 |
| - ] |
424 |
| - ) |
| 442 | + |
| 443 | + topics: list[list[HexStr] | HexStr | None] = [ |
| 444 | + # Topic 0: event_id |
| 445 | + to_hex(keccak(text=container.abi.selector)) |
| 446 | + ] |
| 447 | + |
| 448 | + # Topic 1-3: event args ([..., ...] represent OR) |
| 449 | + if filter_args: |
| 450 | + for arg in container.abi.inputs: |
| 451 | + if not arg.indexed: |
| 452 | + break # Inputs should be ordered indexed first |
| 453 | + |
| 454 | + if arg_value := filter_args.pop(arg.name, None): |
| 455 | + topics.append(self._convert_arg_to_hexstr(arg_value, arg.type)) |
| 456 | + |
| 457 | + else: |
| 458 | + # Skip this indexed argument (`None` is wildcard match) |
| 459 | + topics.append(None) |
| 460 | + # NOTE: Will clean up extra Nones in `encode_topics_to_string` |
| 461 | + |
| 462 | + if unmatched_args := "', '".join(filter_args): |
| 463 | + raise InvalidContainerTypeError( |
| 464 | + f"Args are not available for filtering: '{unmatched_args}'." |
| 465 | + ) |
| 466 | + |
| 467 | + labels["topics"] = encode_topics_to_string(topics) |
425 | 468 |
|
426 | 469 | handler = self._ensure_log(container, handler)
|
427 | 470 |
|
@@ -518,6 +561,8 @@ def on_(
|
518 | 561 | # TODO: possibly remove these
|
519 | 562 | new_block_timeout: int | None = None,
|
520 | 563 | start_block: int | None = None,
|
| 564 | + filter_args: dict[str, Any] | None = None, |
| 565 | + **filter_kwargs: dict[str, Any], |
521 | 566 | ):
|
522 | 567 | """
|
523 | 568 | Create task to handle events created by the `container` trigger.
|
@@ -565,7 +610,14 @@ def on_(
|
565 | 610 | else:
|
566 | 611 | self.poll_settings[key] = {"start_block": start_block}
|
567 | 612 |
|
568 |
| - return self.broker_task_decorator(TaskType.EVENT_LOG, container=container) |
| 613 | + if filter_args: |
| 614 | + filter_kwargs.update(filter_args) |
| 615 | + |
| 616 | + return self.broker_task_decorator( |
| 617 | + TaskType.EVENT_LOG, |
| 618 | + container=container, |
| 619 | + filter_args=filter_kwargs, |
| 620 | + ) |
569 | 621 |
|
570 | 622 | # TODO: Support account transaction polling
|
571 | 623 | # TODO: Support mempool polling?
|
|
0 commit comments