Skip to content

Potential race condition in triple / embeddings import queue #474

@cybermaggedon

Description

@cybermaggedon

Race Condition Analysis

  1. Websocket Closure Race (socket.py:41-42)
  running.stop()
  await ws.close()
  The listener task stops the running flag and closes the websocket immediately when any non-text/binary message is received. This creates a race where:
  - The worker task may still be processing queued messages
  - The Publisher may have messages in its internal queue (publisher.py:19)
  - Messages in the Publisher's asyncio.Queue get lost when the websocket closes

  2. Publisher Queue Loss (publisher.py:77-78, triples_import.py:50)
  await self.publisher.send(None, elt)  # Goes to asyncio.Queue
  The Publisher uses an asyncio.Queue (max_size=10 by default) that's not persistent. When destroy() is called:
  - publisher.stop() sets running=False
  - Any messages still in the queue are lost
  - The publisher task may not drain remaining messages before stopping

  3. Ungraceful Shutdown Order (triples_import.py:28-34)
  async def destroy(self):
      self.running.stop()         # Stops worker loop
      if self.ws:
          await self.ws.close()   # Closes websocket
      await self.publisher.stop() # Stops publisher

  This shutdown order means the websocket closes before ensuring all queued messages are sent to Pulsar.

  Recommended Fixes

  1. Drain Publisher Queue: Ensure Publisher.stop() waits for the queue to empty before stopping
  2. Graceful Shutdown: In destroy(), wait for publisher queue to drain before closing websocket
  3. Publisher Persistence: Add flush/drain methods to guarantee message delivery before shutdown

  The core issue is that the Publisher's asyncio.Queue acts as a buffer, but there's no guarantee these buffered messages reach Pulsar when the connection closes abruptly.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions