Skip to content

Ability to wait for background tasks to complete before moving to next test? #952

@mzealey

Description

@mzealey

Previously, we have been running the following code to allow various backgrounded asyncio tasks to complete before shutting down the event loop:

@pytest.fixture(scope="session") 
def event_loop(request: Request) -> Generator[asyncio.AbstractEventLoop, None, None]: 
    """Create an instance of the default event loop for each test case.""" 
 
    def custom_exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> NoReturn: 
        loop.default_exception_handler(context) 
 
        exception = context.get("exception") 
        loop.stop() 
        raise exception 
 
    loop = asyncio.get_event_loop_policy().new_event_loop() 
    loop.set_exception_handler(custom_exception_handler) 
    yield loop 
    # Wait until all background tasks have completed 
    loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop))) 
    loop.close() 

It would be nice to move this into the asyncio_default_fixture_loop_scope format, but it seems that the default event loop runner does not wait for background tasks to complete before the loop gets terminated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions