Skip to content

Commit ac08333

Browse files
committed
simplify wait_readable loop
1 parent e2216b8 commit ac08333

File tree

1 file changed

+16
-22
lines changed

1 file changed

+16
-22
lines changed

src/zmq_anyio/_socket.py

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
get_cancelled_exc_class,
2020
sleep,
2121
wait_readable,
22+
ClosedResourceError,
2223
)
2324
from anyio.abc import TaskGroup, TaskStatus
24-
from anyioutils import FIRST_COMPLETED, Future, create_task, wait
25+
from anyioutils import FIRST_COMPLETED, Future, create_task
2526

2627
import zmq
2728
from zmq import EVENTS, POLLIN, POLLOUT
@@ -890,31 +891,24 @@ async def _start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
890891
task_status.started()
891892
self.started.set()
892893
self._thread = get_ident()
894+
895+
async def wait_or_cancel() -> None:
896+
assert self.stopped is not None
897+
await self.stopped.wait()
898+
tg.cancel_scope.cancel()
899+
893900
try:
894901
while True:
895-
wait_stopped_task = create_task(
896-
self.stopped.wait(),
897-
self._task_group,
898-
exception_handler=ignore_exceptions,
899-
)
900-
tasks = [
901-
create_task(
902-
wait_readable(self._shadow_sock), # type: ignore[arg-type]
903-
self._task_group,
904-
exception_handler=ignore_exceptions,
905-
),
906-
wait_stopped_task,
907-
]
908-
done, pending = await wait(
909-
tasks, self._task_group, return_when=FIRST_COMPLETED
910-
)
911-
for task in pending:
912-
task.cancel()
913-
if wait_stopped_task in done:
902+
async with create_task_group() as tg:
903+
tg.start_soon(wait_or_cancel)
904+
await wait_readable(self._shadow_sock)
905+
if self.stopped.is_set():
914906
break
915907
await self._handle_events()
916-
except BaseException:
917-
pass
908+
except get_cancelled_exc_class():
909+
raise
910+
except ClosedResourceError:
911+
self._task_group.cancel_scope.cancel()
918912
finally:
919913
self._exited.set()
920914

0 commit comments

Comments
 (0)