Plugin for taskiq that adds a new result backend, broker and schedule source based on YDB.
This project can be installed using pip/poetry/uv (choose your preferred package manager):
pip install taskiq-ydb- Define your broker with asyncpg:
# broker_example.py
import asyncio
from ydb.aio.driver import DriverConfig
from taskiq_ydb import YdbBroker, YdbResultBackend
driver_config = DriverConfig(
endpoint='grpc://localhost:2136',
database='/local',
)
broker = YdbBroker(
driver_config=driver_config,
).with_result_backend(
YdbResultBackend(driver_config=driver_config),
)
@broker.task('solve_all_problems')
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print('All problems are solved!')
async def main() -> None:
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()
if __name__ == '__main__':
asyncio.run(main())- Start a worker to process tasks (by default taskiq runs two instances of worker):
taskiq worker broker_example:broker- Run
broker_example.pyfile to send a task to the worker:
python broker_example.pyYour experience with other drivers will be pretty similar. Just change the import statement and that's it.
- Define your broker and schedule source:
# scheduler_example.py
import asyncio
from taskiq import TaskiqScheduler
from ydb.aio.driver import DriverConfig
from taskiq_ydb import YdbBroker, YdbScheduleSource
driver_config = DriverConfig(
endpoint='grpc://localhost:2136',
database='/local',
)
broker = YdbBroker(driver_config=driver_config)
scheduler = TaskiqScheduler(
broker=broker,
sources=[
YdbScheduleSource(
driver_config=driver_config,
broker=broker,
),
],
)
@broker.task(
task_name='solve_all_problems',
schedule=[
{
'cron': '*/1 * * * *', # type: str, either cron or time should be specified.
'cron_offset': None, # type: str | timedelta | None, can be omitted.
'time': None, # type: datetime | None, either cron or time should be specified.
'args': [], # type list[Any] | None, can be omitted.
'kwargs': {}, # type: dict[str, Any] | None, can be omitted.
'labels': {}, # type: dict[str, Any] | None, can be omitted.
},
],
)
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print('All problems are solved!')- Start worker processes:
taskiq worker scheduler_example:broker- Run scheduler process:
taskiq scheduler scheduler_example:scheduler