Skip to content

Commit 46330fc

Browse files
committed
ingestion: tweak thread shutdown handling
1 parent 0b9a810 commit 46330fc

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

src/polar_sdk/ingestion/_base.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import logging
55
import queue
66
import threading
7-
import time
87
from collections.abc import Callable
98
from typing import TYPE_CHECKING, Union
109

@@ -55,8 +54,7 @@ def __init__(
5554
self._queue = queue.Queue["EventsModelTypedDict"](maxsize=max_queue_size)
5655

5756
self._thread = threading.Thread(target=self._worker, daemon=True)
58-
self._thread_running = threading.Event()
59-
self._thread_running.set()
57+
self._should_close_thread = threading.Event()
6058
self._thread.start()
6159

6260
atexit.register(self.close)
@@ -125,7 +123,7 @@ def close(self) -> None:
125123
It's called automatically on program exit.
126124
"""
127125
logger.debug("Shutting down, flushing remaining events...")
128-
self._thread_running.clear()
126+
self._should_close_thread.set()
129127

130128
# Try to flush remaining events
131129
try:
@@ -134,20 +132,20 @@ def close(self) -> None:
134132
logger.error("Error during shutdown flush: %s", e)
135133

136134
if self._thread.is_alive():
137-
self._thread.join(timeout=5.0)
135+
self._thread.join(timeout=1.0)
138136

139137
self._stack.close()
140138
logger.debug("Shutdown complete")
141139

142140
def _worker(self) -> None:
143141
logger.debug("Worker thread started")
144-
while self._thread_running.is_set():
142+
while not self._should_close_thread.is_set():
145143
try:
146144
self.flush(self.max_batch_size)
147145
except Exception as e:
148146
logger.error("Error in worker thread: %s", e)
149147

150-
time.sleep(self.flush_interval)
148+
self._should_close_thread.wait(timeout=self.flush_interval)
151149

152150
def _send_batch(self, events: list["EventsModelTypedDict"]) -> None:
153151
response = self._client.events.ingest(request={"events": events})

0 commit comments

Comments
 (0)