-
Notifications
You must be signed in to change notification settings - Fork 51
Open
Description
Race Condition Analysis
1. Websocket Closure Race (socket.py:41-42)
running.stop()
await ws.close()
The listener task stops the running flag and closes the websocket immediately when any non-text/binary message is received. This creates a race where:
- The worker task may still be processing queued messages
- The Publisher may have messages in its internal queue (publisher.py:19)
- Messages in the Publisher's asyncio.Queue get lost when the websocket closes
2. Publisher Queue Loss (publisher.py:77-78, triples_import.py:50)
await self.publisher.send(None, elt) # Goes to asyncio.Queue
The Publisher uses an asyncio.Queue (max_size=10 by default) that's not persistent. When destroy() is called:
- publisher.stop() sets running=False
- Any messages still in the queue are lost
- The publisher task may not drain remaining messages before stopping
3. Ungraceful Shutdown Order (triples_import.py:28-34)
async def destroy(self):
self.running.stop() # Stops worker loop
if self.ws:
await self.ws.close() # Closes websocket
await self.publisher.stop() # Stops publisher
This shutdown order means the websocket closes before ensuring all queued messages are sent to Pulsar.
Recommended Fixes
1. Drain Publisher Queue: Ensure Publisher.stop() waits for the queue to empty before stopping
2. Graceful Shutdown: In destroy(), wait for publisher queue to drain before closing websocket
3. Publisher Persistence: Add flush/drain methods to guarantee message delivery before shutdown
The core issue is that the Publisher's asyncio.Queue acts as a buffer, but there's no guarantee these buffered messages reach Pulsar when the connection closes abruptly.
Metadata
Metadata
Assignees
Labels
No labels