7
7
import time
8
8
from contextlib import closing
9
9
from enum import Enum
10
- from typing import Dict , List # noqa: F401
10
+ from typing import Dict , List , Tuple # noqa: F401
11
11
12
12
import pymysql
13
13
19
19
from datadog_checks .mysql .cursor import CommenterDictCursor
20
20
21
21
from .util import DatabaseConfigurationError , connect_with_session_variables , get_truncation_state , warning_with_tags
22
+ import uuid
22
23
23
24
try :
24
25
import datadog_agent
130
131
)
131
132
"""
132
133
134
+ # TiDB specific constants
135
+ TIDB_ACTIVITY_QUERY_LIMIT = 100
136
+
137
+ # TiDB specific activity query
138
+ TIDB_ACTIVITY_QUERY = """\
139
+ SELECT
140
+ ID as processlist_id,
141
+ USER as processlist_user,
142
+ HOST as processlist_host,
143
+ DB as processlist_db,
144
+ COMMAND as processlist_command,
145
+ STATE as processlist_state,
146
+ INFO as sql_text,
147
+ TIME as query_time,
148
+ MEM as memory_usage,
149
+ TxnStart as txn_start_time
150
+ FROM INFORMATION_SCHEMA.CLUSTER_PROCESSLIST
151
+ WHERE
152
+ COMMAND != 'Sleep'
153
+ AND INFO IS NOT NULL
154
+ AND INFO != ''
155
+ -- Exclude our own monitoring queries
156
+ AND INFO NOT LIKE '%CLUSTER_PROCESSLIST%'
157
+ AND INFO NOT LIKE '%datadog-agent%'
158
+ -- Exclude other system queries
159
+ AND INFO NOT LIKE '%INFORMATION_SCHEMA%'
160
+ AND INFO NOT LIKE '%performance_schema%'
161
+ ORDER BY TIME DESC
162
+ LIMIT {}
163
+ """ .format (TIDB_ACTIVITY_QUERY_LIMIT )
164
+
133
165
134
166
class MySQLVersion (Enum ):
135
167
# 8.0
@@ -183,6 +215,12 @@ def run_job(self):
183
215
'Waiting for events_waits_current availability to be determined by the check, skipping run.'
184
216
)
185
217
if self ._check .events_wait_current_enabled is False :
218
+ # Use TiDB-specific activity collection
219
+ if self ._check ._get_is_tidb (self ._db ):
220
+ self ._log .debug ("TiDB detected, using TiDB-specific activity collection" )
221
+ self ._collect_tidb_activity ()
222
+ return
223
+
186
224
azure_deployment_type = self ._config .cloud_metadata .get ("azure" , {}).get ("deployment_type" )
187
225
if azure_deployment_type != "flexible_server" :
188
226
self ._check .record_warning (
@@ -201,6 +239,168 @@ def run_job(self):
201
239
self ._check_version ()
202
240
self ._collect_activity ()
203
241
242
+ @tracked_method (agent_check_getter = agent_check_getter )
243
+ def _collect_tidb_activity (self ):
244
+ # type: () -> None
245
+ """Collect activity data from TiDB CLUSTER_PROCESSLIST"""
246
+ tags = [t for t in self ._tags if not t .startswith ('dd.internal' )]
247
+
248
+ with closing (self ._get_db_connection ().cursor (CommenterDictCursor )) as cursor :
249
+ rows = self ._get_tidb_activity (cursor )
250
+ rows = self ._normalize_tidb_rows (rows )
251
+ event = self ._create_tidb_activity_event (rows , tags )
252
+ payload = json .dumps (event , default = self ._json_event_encoding )
253
+ self ._check .database_monitoring_query_activity (payload )
254
+ self ._check .histogram (
255
+ "dd.mysql.activity.collect_activity.payload_size" ,
256
+ len (payload ),
257
+ tags = tags + self ._check ._get_debug_tags (),
258
+ )
259
+
260
+ @tracked_method (agent_check_getter = agent_check_getter , track_result_length = True )
261
+ def _get_tidb_activity (self , cursor ):
262
+ # type: (pymysql.cursor) -> List[Dict[str]]
263
+ """Execute TiDB activity query"""
264
+ self ._log .debug ("Running TiDB activity query [%s]" , TIDB_ACTIVITY_QUERY )
265
+ cursor .execute (TIDB_ACTIVITY_QUERY )
266
+ return cursor .fetchall ()
267
+
268
+ def _derive_tidb_wait_event (self , state ):
269
+ # type: (str) -> Tuple[str, str]
270
+ """
271
+ Derive wait event and wait event group from TiDB processlist state.
272
+ Returns (wait_event, wait_event_group)
273
+ """
274
+ if not state :
275
+ return 'CPU' , 'CPU'
276
+
277
+ state_lower = state .lower ()
278
+
279
+ # Map TiDB states to wait events
280
+ if 'autocommit' in state_lower :
281
+ return 'CPU' , 'CPU'
282
+ elif 'wait' in state_lower or 'lock' in state_lower :
283
+ return 'Lock' , 'Lock'
284
+ elif 'syncing' in state_lower or 'sync' in state_lower :
285
+ return 'Synch' , 'Synch'
286
+ else :
287
+ # Default to CPU for active queries
288
+ return 'CPU' , 'CPU'
289
+
290
+
291
+ def _normalize_tidb_rows (self , rows ):
292
+ # type: (List[Dict[str]]) -> List[Dict[str]]
293
+ """Normalize TiDB activity rows to match expected format"""
294
+ normalized_rows = []
295
+ estimated_size = 0
296
+
297
+ for row in rows :
298
+ # Generate unique identifiers for TiDB
299
+ thread_id = row .get ('processlist_id' , 0 )
300
+
301
+ # Derive wait event from state
302
+ state = row .get ('processlist_state' , '' )
303
+ wait_event , wait_event_group = self ._derive_tidb_wait_event (state )
304
+
305
+ # Convert TiDB fields to match MySQL activity format
306
+ normalized_row = {
307
+ 'thread_id' : thread_id ,
308
+ 'processlist_id' : row .get ('processlist_id' ),
309
+ 'processlist_user' : row .get ('processlist_user' ),
310
+ 'processlist_host' : row .get ('processlist_host' ),
311
+ 'processlist_db' : row .get ('processlist_db' ),
312
+ 'processlist_command' : row .get ('processlist_command' ),
313
+ 'processlist_state' : row .get ('processlist_state' ),
314
+ 'sql_text' : row .get ('sql_text' ),
315
+ 'query_time' : row .get ('query_time' , 0 ),
316
+ 'memory_usage' : row .get ('memory_usage' , 0 ),
317
+ 'txn_start_time' : row .get ('txn_start_time' ),
318
+ # Derived wait events
319
+ 'wait_event' : wait_event ,
320
+ 'wait_event_type' : wait_event_group ,
321
+ }
322
+
323
+ # Add query truncation state
324
+ if normalized_row ['sql_text' ] is not None :
325
+ normalized_row ['query_truncated' ] = get_truncation_state (normalized_row ['sql_text' ]).value
326
+
327
+ # Obfuscate the query
328
+ normalized_row = self ._obfuscate_and_sanitize_row (normalized_row )
329
+
330
+ estimated_size += self ._get_estimated_row_size_bytes (normalized_row )
331
+ if estimated_size > MySQLActivity .MAX_PAYLOAD_BYTES :
332
+ return normalized_rows
333
+
334
+ normalized_rows .append (normalized_row )
335
+
336
+ return normalized_rows
337
+
338
+ def _create_tidb_activity_event (self , active_sessions , tags ):
339
+ # type: (List[Dict[str]], List[str]) -> Dict[str]
340
+ """Create activity event payload for TiDB"""
341
+ # Convert rows to MySQL-compatible activity format
342
+ mysql_activity = []
343
+
344
+ for row in active_sessions :
345
+ # Calculate timing information
346
+ # Use milliseconds to avoid overflow issues
347
+ current_time_ms = int (time .time () * 1000 )
348
+ query_time_s = row .get ('query_time' , 0 )
349
+ query_time_ms = int (query_time_s * 1000 ) if query_time_s else 0
350
+ event_start_ms = max (0 , current_time_ms - query_time_ms )
351
+
352
+ # Generate event IDs based on thread_id and timestamp
353
+ event_id = hash (str (row ['thread_id' ]) + str (current_time_ms )) % (2 ** 31 ) # Keep it positive and reasonable
354
+
355
+ activity = {
356
+ # Essential identifiers
357
+ 'thread_id' : row ['thread_id' ],
358
+ 'processlist_id' : row ['processlist_id' ],
359
+ 'processlist_user' : row ['processlist_user' ],
360
+ 'processlist_host' : row ['processlist_host' ],
361
+ 'processlist_db' : row ['processlist_db' ],
362
+ 'processlist_command' : row ['processlist_command' ],
363
+ 'processlist_state' : row ['processlist_state' ],
364
+ 'sql_text' : row .get ('sql_text' ),
365
+ 'current_schema' : row .get ('processlist_db' ),
366
+ 'query_signature' : row .get ('query_signature' ),
367
+ 'dd_commands' : row .get ('dd_commands' , []),
368
+ 'dd_tables' : row .get ('dd_tables' , []),
369
+ 'dd_comments' : row .get ('dd_comments' , []),
370
+ 'query_truncated' : row .get ('query_truncated' ),
371
+ # Event identifiers
372
+ 'event_id' : event_id ,
373
+ 'end_event_id' : event_id , # Same as event_id for TiDB
374
+ # Timing information
375
+ 'event_timer_start' : event_start_ms * 1000000 , # Convert to nanoseconds
376
+ 'event_timer_end' : current_time_ms * 1000000 , # Convert to nanoseconds
377
+ 'lock_time' : 0 , # TiDB doesn't provide lock time in CLUSTER_PROCESSLIST
378
+ # Wait event info
379
+ 'wait_event' : row .get ('wait_event' , 'CPU' ),
380
+ 'wait_timer_start' : event_start_ms * 1000000 , # Same as event timer
381
+ 'wait_timer_end' : current_time_ms * 1000000 ,
382
+ # Additional MySQL compatibility fields
383
+ 'object_name' : None , # TiDB doesn't track file operations
384
+ 'object_type' : None ,
385
+ 'operation' : None ,
386
+ 'source' : '' ,
387
+ }
388
+
389
+ mysql_activity .append (activity )
390
+
391
+ return {
392
+ "host" : self ._check .reported_hostname ,
393
+ "ddagentversion" : datadog_agent .get_version (),
394
+ "ddsource" : "mysql" ,
395
+ "dbm_type" : "activity" ,
396
+ "collection_interval" : self .collection_interval ,
397
+ "ddtags" : tags ,
398
+ "timestamp" : time .time () * 1000 ,
399
+ "cloud_metadata" : self ._config .cloud_metadata ,
400
+ 'service' : self ._config .service ,
401
+ "mysql_activity" : mysql_activity ,
402
+ }
403
+
204
404
def _check_version (self ):
205
405
# type: () -> None
206
406
if self ._check .version .version_compatible ((8 ,)):
0 commit comments