From ac083338d1780dab1c7bde79d52a6a7c901b8bbb Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 09:33:57 +0000 Subject: [PATCH 01/11] simplify wait_readable loop --- src/zmq_anyio/_socket.py | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index 164fb99..9178749 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -19,9 +19,10 @@ get_cancelled_exc_class, sleep, wait_readable, + ClosedResourceError, ) from anyio.abc import TaskGroup, TaskStatus -from anyioutils import FIRST_COMPLETED, Future, create_task, wait +from anyioutils import FIRST_COMPLETED, Future, create_task import zmq from zmq import EVENTS, POLLIN, POLLOUT @@ -890,31 +891,24 @@ async def _start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED): task_status.started() self.started.set() self._thread = get_ident() + + async def wait_or_cancel() -> None: + assert self.stopped is not None + await self.stopped.wait() + tg.cancel_scope.cancel() + try: while True: - wait_stopped_task = create_task( - self.stopped.wait(), - self._task_group, - exception_handler=ignore_exceptions, - ) - tasks = [ - create_task( - wait_readable(self._shadow_sock), # type: ignore[arg-type] - self._task_group, - exception_handler=ignore_exceptions, - ), - wait_stopped_task, - ] - done, pending = await wait( - tasks, self._task_group, return_when=FIRST_COMPLETED - ) - for task in pending: - task.cancel() - if wait_stopped_task in done: + async with create_task_group() as tg: + tg.start_soon(wait_or_cancel) + await wait_readable(self._shadow_sock) + if self.stopped.is_set(): break await self._handle_events() - except BaseException: - pass + except get_cancelled_exc_class(): + raise + except ClosedResourceError: + self._task_group.cancel_scope.cancel() finally: self._exited.set() From 9236831b90812a2aa92c7e4989b4f82128875785 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 10:15:38 +0000 Subject: [PATCH 02/11] handle more close possibilities --- src/zmq_anyio/_socket.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index 9178749..bb47165 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -22,7 +22,7 @@ ClosedResourceError, ) from anyio.abc import TaskGroup, TaskStatus -from anyioutils import FIRST_COMPLETED, Future, create_task +from anyioutils import Future, create_task import zmq from zmq import EVENTS, POLLIN, POLLOUT @@ -898,22 +898,19 @@ async def wait_or_cancel() -> None: tg.cancel_scope.cancel() try: - while True: + while not self.closed: async with create_task_group() as tg: tg.start_soon(wait_or_cancel) await wait_readable(self._shadow_sock) + tg.cancel_scope.cancel() if self.stopped.is_set(): break await self._handle_events() - except get_cancelled_exc_class(): - raise except ClosedResourceError: self._task_group.cancel_scope.cancel() finally: self._exited.set() - - assert self.stopped is not None - self.stopped.set() + self.stopped.set() async def stop(self): assert self._exited is not None From cddb5726c35bc0b23587b4af956154e0d35bb4c2 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 10:17:58 +0000 Subject: [PATCH 03/11] install graingert's notify_closing --- .github/workflows/test.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f6d8873..e4d1247 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,7 +32,9 @@ jobs: with: python-version: ${{ matrix.python-version }} - name: Install dependencies - run: pip install -e ".[test]" + run: | + pip install -e ".[test]" + pip install git+https://github.com/agronholm/anyio.git@notify-closing#egg=anyio --ignore-installed - name: Check with mypy and ruff if: ${{ (matrix.python-version == '3.13') && (matrix.os == 'ubuntu-latest') }} run: | From ce2455dc943caa1ef531c4048089322d2e021643 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 10:20:13 +0000 Subject: [PATCH 04/11] issue close notifications --- src/zmq_anyio/_socket.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index bb47165..7f7771a 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -20,6 +20,7 @@ sleep, wait_readable, ClosedResourceError, + notify_closing, ) from anyio.abc import TaskGroup, TaskStatus from anyioutils import Future, create_task @@ -924,11 +925,13 @@ async def stop(self): self.close() def close(self, linger: int | None = None) -> None: - try: - if not self.closed and self._fd is not None: + fd = self._fd + if not self.closed and fd is not None: + notify_closing(fd) + try: super().close(linger=linger) - except BaseException: - pass + except BaseException: + pass assert self.stopped is not None self.stopped.set() From 532a63f4a74c26561028a870516fc5b9148daea7 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 10:34:53 +0000 Subject: [PATCH 05/11] side step ExceptionGroup stuff --- src/zmq_anyio/_socket.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index 7f7771a..4bc0f18 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -902,13 +902,14 @@ async def wait_or_cancel() -> None: while not self.closed: async with create_task_group() as tg: tg.start_soon(wait_or_cancel) - await wait_readable(self._shadow_sock) + try: + await wait_readable(self._shadow_sock) + except ClosedResourceError: + break tg.cancel_scope.cancel() if self.stopped.is_set(): break await self._handle_events() - except ClosedResourceError: - self._task_group.cancel_scope.cancel() finally: self._exited.set() self.stopped.set() From 8ec402b2b59c4fb9f81a27258bdbba67e5e2fbd1 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 10:35:07 +0000 Subject: [PATCH 06/11] close sockets inside the event loop --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index 8f68ac6..99510ab 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -66,7 +66,7 @@ def context(contexts): @pytest.fixture -def sockets(contexts): +async def sockets(contexts): sockets = [] yield sockets # ensure any tracked sockets get their contexts cleaned up From 670f08e619e5d94c71d3285078a29caa9c7eac2a Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 11:14:53 +0000 Subject: [PATCH 07/11] catch exception from .fileno() --- src/zmq_anyio/_socket.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index 4bc0f18..961f697 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -898,12 +898,18 @@ async def wait_or_cancel() -> None: await self.stopped.wait() tg.cancel_scope.cancel() + def fileno() -> int: + try: + return self._shadow_sock.fileno() + except zmq.ZMQError: + return -1 + try: - while not self.closed: + while (fd := fileno()) > 0: async with create_task_group() as tg: tg.start_soon(wait_or_cancel) try: - await wait_readable(self._shadow_sock) + await wait_readable(fd) except ClosedResourceError: break tg.cancel_scope.cancel() From e4dd540876d9dc9eccbbbbf09c8130e1bd04844e Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 18 Mar 2025 11:19:22 +0000 Subject: [PATCH 08/11] don't check closed-ness via zmq if possible --- src/zmq_anyio/_socket.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index 961f697..6c25c89 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -899,6 +899,8 @@ async def wait_or_cancel() -> None: tg.cancel_scope.cancel() def fileno() -> int: + if self.closed: + return -1 try: return self._shadow_sock.fileno() except zmq.ZMQError: From 4c50edb4993459ce23dac59162d53c4adca91c76 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 14 Apr 2025 12:47:23 +0100 Subject: [PATCH 09/11] Update .github/workflows/test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e4d1247..97be353 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,7 +34,7 @@ jobs: - name: Install dependencies run: | pip install -e ".[test]" - pip install git+https://github.com/agronholm/anyio.git@notify-closing#egg=anyio --ignore-installed + pip install git+https://github.com/agronholm/anyio.git#egg=anyio --ignore-installed - name: Check with mypy and ruff if: ${{ (matrix.python-version == '3.13') && (matrix.os == 'ubuntu-latest') }} run: | From 538246896499dd893b53926523245e3537591147 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 14 Apr 2025 12:50:37 +0100 Subject: [PATCH 10/11] Update src/zmq_anyio/_socket.py --- src/zmq_anyio/_socket.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index 6c25c89..0284ea2 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -913,6 +913,7 @@ def fileno() -> int: try: await wait_readable(fd) except ClosedResourceError: + tg.cancel_scope.cancel() break tg.cancel_scope.cancel() if self.stopped.is_set(): From acef5cc7f62bafc2d6b89a03aa0e8da215119ff0 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Mon, 14 Apr 2025 12:51:16 +0100 Subject: [PATCH 11/11] Update src/zmq_anyio/_socket.py --- src/zmq_anyio/_socket.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zmq_anyio/_socket.py b/src/zmq_anyio/_socket.py index 0284ea2..e56c15c 100644 --- a/src/zmq_anyio/_socket.py +++ b/src/zmq_anyio/_socket.py @@ -913,9 +913,9 @@ def fileno() -> int: try: await wait_readable(fd) except ClosedResourceError: - tg.cancel_scope.cancel() break - tg.cancel_scope.cancel() + finally: + tg.cancel_scope.cancel() if self.stopped.is_set(): break await self._handle_events()