8
8
import time
9
9
from datetime import datetime , timezone
10
10
from concurrent .futures import ThreadPoolExecutor
11
- from typing import Any
11
+ from typing import Any , Optional , TYPE_CHECKING
12
+
13
+ if TYPE_CHECKING :
14
+ from .telemetry import TelemetryManager
12
15
13
16
from mcpcat_api import ApiClient , Configuration , EventsApi
14
17
from mcpcat .modules .constants import EVENT_ID_PREFIX , MCPCAT_API_URL
@@ -30,21 +33,21 @@ def __init__(self, api_client=None):
30
33
self .max_retries = 3
31
34
self .max_queue_size = 10000 # Prevent unbounded growth
32
35
self .concurrency = 5 # Max parallel requests
33
-
36
+
34
37
# Allow injection of api_client for testing
35
38
if api_client is None :
36
39
config = Configuration (host = MCPCAT_API_URL )
37
40
api_client_instance = ApiClient (configuration = config )
38
41
self .api_client = EventsApi (api_client = api_client_instance )
39
42
else :
40
43
self .api_client = api_client
41
-
44
+
42
45
self ._shutdown = False
43
46
self ._shutdown_event = threading .Event ()
44
-
47
+
45
48
# Thread pool for processing events
46
49
self .executor = ThreadPoolExecutor (max_workers = self .concurrency )
47
-
50
+
48
51
# Start worker thread
49
52
self .worker_thread = threading .Thread (target = self ._worker , daemon = True )
50
53
self .worker_thread .start ()
@@ -68,7 +71,7 @@ def _worker(self) -> None:
68
71
try :
69
72
# Wait for an event with timeout
70
73
event = self .queue .get (timeout = 0.1 )
71
-
74
+
72
75
# Submit event processing to thread pool
73
76
# The executor will queue it if all workers are busy
74
77
try :
@@ -80,7 +83,7 @@ def _worker(self) -> None:
80
83
self .queue .put_nowait (event )
81
84
except queue .Full :
82
85
write_to_log (f"Could not requeue event { event .id or 'unknown' } - queue full" )
83
-
86
+
84
87
except queue .Empty :
85
88
continue
86
89
except Exception as e :
@@ -105,7 +108,20 @@ def _process_event(self, event: UnredactedEvent) -> None:
105
108
106
109
if event :
107
110
event .id = event .id or generate_prefixed_ksuid ("evt" )
108
- self ._send_event (event )
111
+
112
+ # Export to telemetry backends if configured (non-blocking)
113
+ if _telemetry_manager :
114
+ try :
115
+ _telemetry_manager .export (event )
116
+ except Exception as e :
117
+ write_to_log (f"Telemetry export submission failed: { e } " )
118
+
119
+ # Send to MCPCat API only if project_id exists
120
+ if event .project_id :
121
+ self ._send_event (event )
122
+ elif not _telemetry_manager :
123
+ # Only warn if we have neither MCPCat nor telemetry configured
124
+ write_to_log ("Warning: Event has no project_id and no telemetry exporters configured" )
109
125
110
126
def _send_event (self , event : Event , retries : int = 0 ) -> None :
111
127
"""Send event to API."""
@@ -164,22 +180,39 @@ def destroy(self) -> None:
164
180
write_to_log (f"Shutdown complete. { remaining } events were not processed." )
165
181
166
182
183
+ # Global telemetry manager instance (optional)
184
+ _telemetry_manager : Optional ['TelemetryManager' ] = None
185
+
186
+
187
+ def set_telemetry_manager (manager : Optional ['TelemetryManager' ]) -> None :
188
+ """
189
+ Set the global telemetry manager instance.
190
+
191
+ Args:
192
+ manager: TelemetryManager instance or None to disable telemetry
193
+ """
194
+ global _telemetry_manager
195
+ _telemetry_manager = manager
196
+ if manager :
197
+ write_to_log (f"Telemetry manager set with { manager .get_exporter_count ()} exporter(s)" )
198
+
199
+
167
200
# Global event queue instance
168
201
event_queue = EventQueue ()
169
202
170
203
171
204
def _shutdown_handler (signum , frame ):
172
205
"""Handle shutdown signals."""
173
-
206
+
174
207
write_to_log ("Received shutdown signal, gracefully shutting down..." )
175
-
208
+
176
209
# Reset signal handlers to default behavior to avoid recursive calls
177
210
signal .signal (signal .SIGINT , signal .SIG_DFL )
178
211
signal .signal (signal .SIGTERM , signal .SIG_DFL )
179
-
212
+
180
213
# Perform graceful shutdown
181
214
event_queue .destroy ()
182
-
215
+
183
216
# Force exit after graceful shutdown
184
217
os ._exit (0 )
185
218
@@ -215,15 +248,16 @@ def publish_event(server: Any, event: UnredactedEvent) -> None:
215
248
session_info = get_session_info (server , data )
216
249
217
250
# Create full event with all required fields
218
- # Merge event data with session info
251
+ # Merge event data with session info
219
252
event_data = event .model_dump (exclude_none = True )
220
253
session_data = session_info .model_dump (exclude_none = True )
221
-
254
+
255
+ # Merge data, ensuring project_id from data takes precedence
222
256
merged_data = {** event_data , ** session_data }
223
-
257
+ merged_data ['project_id' ] = data .project_id # Override with tracking data's project_id
258
+
224
259
full_event = UnredactedEvent (
225
260
** merged_data ,
226
- project_id = data .project_id ,
227
261
redaction_fn = data .options .redact_sensitive_information ,
228
262
)
229
263
0 commit comments