Skip to content

Commit 070a7d1

Browse files
authored
tracing: remove asyncresource from kafka instrumentations (#5703)
* fix kafka asyncstorage
1 parent 63f4c03 commit 070a7d1

File tree

12 files changed

+201
-142
lines changed

12 files changed

+201
-142
lines changed

.github/workflows/appsec.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,3 +302,35 @@ jobs:
302302
- uses: ./.github/actions/node/active-lts
303303
- run: yarn test:appsec:plugins:ci
304304
- uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d # v5.4.2
305+
306+
kafka:
307+
runs-on: ubuntu-latest
308+
services:
309+
kafka:
310+
image: apache/kafka-native:3.8.0-rc2
311+
env:
312+
KAFKA_PROCESS_ROLES: broker,controller
313+
KAFKA_NODE_ID: '1'
314+
KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
315+
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093
316+
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
317+
KAFKA_CLUSTER_ID: r4zt_wrqTRuT7W2NJsB_GA
318+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
319+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
320+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
321+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
322+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: '0'
323+
ports:
324+
- 9092:9092
325+
- 9093:9093
326+
env:
327+
PLUGINS: kafkajs
328+
SERVICES: kafka
329+
steps:
330+
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
331+
- uses: ./.github/actions/node/oldest-maintenance-lts
332+
- uses: ./.github/actions/install
333+
- run: yarn test:appsec:plugins:ci
334+
- uses: ./.github/actions/node/active-lts
335+
- run: yarn test:appsec:plugins:ci
336+
- uses: codecov/codecov-action@ad3126e916f78f00edff4ed0317cf185271ccc2d # v5.4.2

packages/datadog-instrumentations/src/confluentinc-kafka-javascript.js

Lines changed: 86 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22

33
const {
44
addHook,
5-
channel,
6-
AsyncResource
5+
channel
76
} = require('./helpers/instrument')
87
const shimmer = require('../../datadog-shimmer')
98

@@ -32,7 +31,6 @@ const disabledHeaderWeakSet = new WeakSet()
3231
// we need to store the offset per partition per topic for the consumer to track offsets for DSM
3332
const latestConsumerOffsets = new Map()
3433

35-
// Customize the instrumentation for Confluent Kafka JavaScript
3634
addHook({ name: '@confluentinc/kafka-javascript', versions: ['>=1.0.0'] }, (module) => {
3735
// Hook native module classes first
3836
instrumentBaseModule(module)
@@ -62,23 +60,25 @@ function instrumentBaseModule (module) {
6260

6361
const brokers = this.globalConfig?.['bootstrap.servers']
6462

65-
const asyncResource = new AsyncResource('bound-anonymous-fn')
66-
return asyncResource.runInAsyncScope(() => {
67-
try {
68-
channels.producerStart.publish({
69-
topic,
70-
messages: [{ key, value: message }],
71-
bootstrapServers: brokers
72-
})
63+
const ctx = {
64+
topic,
65+
messages: [{ key, value: message }],
66+
bootstrapServers: brokers
67+
}
7368

74-
const result = produce.apply(this, arguments)
69+
return channels.producerStart.runStores(ctx, () => {
70+
try {
71+
const headers = convertHeaders(ctx.messages[0].headers)
72+
const result = produce.apply(this, [topic, partition, message, key, timestamp, opaque, headers])
7573

76-
channels.producerCommit.publish()
77-
channels.producerFinish.publish()
74+
ctx.result = result
75+
channels.producerCommit.publish(ctx)
76+
channels.producerFinish.publish(ctx)
7877
return result
7978
} catch (error) {
80-
channels.producerError.publish(error)
81-
channels.producerFinish.publish()
79+
ctx.error = error
80+
channels.producerError.publish(ctx)
81+
channels.producerFinish.publish(ctx)
8282
throw error
8383
}
8484
})
@@ -110,32 +110,39 @@ function instrumentBaseModule (module) {
110110
callback = numMessages
111111
}
112112

113+
const ctx = {
114+
groupId
115+
}
113116
// Handle callback-based consumption
114117
if (typeof callback === 'function') {
115118
return consume.call(this, numMessages, function wrappedCallback (err, messages) {
116119
if (messages && messages.length > 0) {
117120
messages.forEach(message => {
118-
channels.consumerStart.publish({
119-
topic: message?.topic,
120-
partition: message?.partition,
121-
message,
122-
groupId
123-
})
121+
ctx.topic = message?.topic
122+
ctx.partition = message?.partition
123+
ctx.message = message
124+
125+
// TODO: We should be using publish here instead of runStores but we need bindStart to be called
126+
channels.consumerStart.runStores(ctx, () => {})
124127
updateLatestOffset(message?.topic, message?.partition, message?.offset, groupId)
125128
})
126129
}
127130

128131
if (err) {
129-
channels.consumerError.publish(err)
132+
ctx.error = err
133+
channels.consumerError.publish(ctx)
130134
}
131135

132136
try {
133137
const result = callback.apply(this, arguments)
134-
channels.consumerFinish.publish()
138+
if (messages && messages.length > 0) {
139+
channels.consumerFinish.publish(ctx)
140+
}
135141
return result
136142
} catch (error) {
137-
channels.consumerError.publish(error)
138-
channels.consumerFinish.publish()
143+
ctx.error = error
144+
channels.consumerError.publish(ctx)
145+
channels.consumerFinish.publish(ctx)
139146
throw error
140147
}
141148
})
@@ -204,45 +211,44 @@ function instrumentKafkaJS (kafkaJS) {
204211
return send.apply(this, arguments)
205212
}
206213

207-
const asyncResource = new AsyncResource('bound-anonymous-fn')
208-
return asyncResource.runInAsyncScope(() => {
209-
try {
210-
channels.producerStart.publish({
211-
topic: payload?.topic,
212-
messages: payload?.messages || [],
213-
bootstrapServers: kafka._ddBrokers,
214-
disableHeaderInjection: disabledHeaderWeakSet.has(producer)
215-
})
214+
const ctx = {
215+
topic: payload?.topic,
216+
messages: payload?.messages || [],
217+
bootstrapServers: kafka._ddBrokers,
218+
disableHeaderInjection: disabledHeaderWeakSet.has(producer)
219+
}
216220

221+
return channels.producerStart.runStores(ctx, () => {
222+
try {
217223
const result = send.apply(this, arguments)
218224

219-
result.then(
220-
asyncResource.bind(res => {
221-
channels.producerCommit.publish(res)
222-
channels.producerFinish.publish()
223-
}),
224-
asyncResource.bind(err => {
225-
if (err) {
226-
// Fixes bug where we would inject message headers for kafka brokers
227-
// that don't support headers (version <0.11). On the error, we disable
228-
// header injection. Tnfortunately the error name / type is not more specific.
229-
// This approach is implemented by other tracers as well.
230-
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
231-
disabledHeaderWeakSet.add(producer)
232-
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
233-
'Please look at broker logs for more information. ' +
234-
'Tracer message header injection for Kafka is disabled.')
235-
}
236-
channels.producerError.publish(err)
225+
result.then((res) => {
226+
ctx.result = res
227+
channels.producerCommit.publish(ctx)
228+
channels.producerFinish.publish(ctx)
229+
}, (err) => {
230+
if (err) {
231+
// Fixes bug where we would inject message headers for kafka brokers
232+
// that don't support headers (version <0.11). On the error, we disable
233+
// header injection. Tnfortunately the error name / type is not more specific.
234+
// This approach is implemented by other tracers as well.
235+
if (err.name === 'KafkaJSError' && err.type === 'ERR_UNKNOWN') {
236+
disabledHeaderWeakSet.add(producer)
237+
log.error('Kafka Broker responded with UNKNOWN_SERVER_ERROR (-1). ' +
238+
'Please look at broker logs for more information. ' +
239+
'Tracer message header injection for Kafka is disabled.')
237240
}
238-
channels.producerFinish.publish()
239-
})
240-
)
241+
ctx.error = err
242+
channels.producerError.publish(ctx)
243+
}
244+
channels.producerFinish.publish(ctx)
245+
})
241246

242247
return result
243248
} catch (e) {
244-
channels.producerError.publish(e)
245-
channels.producerFinish.publish()
249+
ctx.error = e
250+
channels.producerError.publish(ctx)
251+
channels.producerFinish.publish(ctx)
246252
throw e
247253
}
248254
})
@@ -350,33 +356,37 @@ function wrapKafkaCallback (callback, { startCh, commitCh, finishCh, errorCh },
350356
return function wrappedKafkaCallback (payload) {
351357
const commitPayload = getPayload(payload)
352358

353-
const asyncResource = new AsyncResource('bound-anonymous-fn')
354-
return asyncResource.runInAsyncScope(() => {
355-
startCh.publish(commitPayload)
359+
const ctx = {
360+
extractedArgs: commitPayload
361+
}
356362

363+
return startCh.runStores(ctx, () => {
357364
updateLatestOffset(commitPayload?.topic, commitPayload?.partition, commitPayload?.offset, commitPayload?.groupId)
358365

359366
try {
360367
const result = callback.apply(this, arguments)
361368

362369
if (result && typeof result.then === 'function') {
363370
return result
364-
.then(asyncResource.bind(res => {
365-
finishCh.publish()
371+
.then((res) => {
372+
ctx.result = res
373+
finishCh.publish(ctx)
366374
return res
367-
}))
368-
.catch(asyncResource.bind(err => {
369-
errorCh.publish(err)
370-
finishCh.publish()
375+
})
376+
.catch((err) => {
377+
ctx.error = err
378+
errorCh.publish(ctx)
379+
finishCh.publish(ctx)
371380
throw err
372-
}))
381+
})
373382
} else {
374-
finishCh.publish()
383+
finishCh.publish(ctx)
375384
return result
376385
}
377386
} catch (error) {
378-
errorCh.publish(error)
379-
finishCh.publish()
387+
ctx.error = error
388+
errorCh.publish(ctx)
389+
finishCh.publish(ctx)
380390
throw error
381391
}
382392
})
@@ -404,3 +414,8 @@ function updateLatestOffset (topic, partition, offset, groupId) {
404414
function getLatestOffsets () {
405415
return [...latestConsumerOffsets.values()]
406416
}
417+
418+
function convertHeaders (headers) {
419+
// convert headers from object to array of objects with 1 key and value per array entry
420+
return Object.entries(headers).map(([key, value]) => ({ [key.toString()]: value.toString() }))
421+
}

0 commit comments

Comments
 (0)