diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index 0f9519c696c..ed18143a0db 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -1,6 +1,7 @@ package datadog.trace.instrumentation.spark; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; import org.apache.spark.executor.Executor; @@ -27,6 +28,8 @@ protected CharSequence component() { } public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) { + AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_task"); + span.setTag("task_id", taskRunner.taskId()); span.setTag("task_thread_name", taskRunner.threadName()); } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index fdae211077e..93c05682bfb 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -19,6 +20,7 @@ public class DatadogSpark212Listener extends AbstractDatadogSparkListener { public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); + AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } @Override diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 115cdcbb9b0..3fe2db2dfe9 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -19,6 +20,7 @@ public class DatadogSpark213Listener extends AbstractDatadogSparkListener { public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); + AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index fcf844baf0a..f98f5116c2f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -44,6 +44,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -78,6 +79,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean configSupportsDataStreams = false; private final ConcurrentHashMap schemaSamplers; private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); + private static final Set globalTags = ConcurrentHashMap.newKeySet(); public DefaultDataStreamsMonitoring( Config config, @@ -111,7 +113,11 @@ public DefaultDataStreamsMonitoring( traceConfigSupplier, config.getWellKnownTags(), new MsgPackDatastreamsPayloadWriter( - sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag()), + sink, + config.getWellKnownTags(), + DDTraceCoreInfo.VERSION, + config.getPrimaryTag(), + globalTags), Config.get().getDataStreamsBucketDurationNanoseconds()); } @@ -196,6 +202,11 @@ public void clearThreadServiceName() { serviceNameOverride.remove(); } + @Override + public void addGlobalTag(String tag) { + globalTags.add(tag); + } + private static String getThreadServiceName() { return serviceNameOverride.get(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 2baa8943de0..d20dc967021 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -12,6 +12,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter { private static final byte[] ENV = "Env".getBytes(ISO_8859_1); @@ -42,15 +43,21 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private final WellKnownTags wellKnownTags; private final byte[] tracerVersionValue; private final byte[] primaryTagValue; + private final Set globalTags; public MsgPackDatastreamsPayloadWriter( - Sink sink, WellKnownTags wellKnownTags, String tracerVersion, String primaryTag) { + Sink sink, + WellKnownTags wellKnownTags, + String tracerVersion, + String primaryTag, + Set globalTags) { buffer = new GrowableBuffer(INITIAL_CAPACITY); writer = new MsgPackWriter(buffer); this.sink = sink; this.wellKnownTags = wellKnownTags; tracerVersionValue = tracerVersion.getBytes(ISO_8859_1); primaryTagValue = primaryTag == null ? new byte[0] : primaryTag.getBytes(ISO_8859_1); + this.globalTags = globalTags; } public void reset() { @@ -148,7 +155,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) { Collection groups = bucket.getGroups(); packer.startArray(groups.size()); for (StatsGroup group : groups) { - boolean firstNode = group.getEdgeTags().isEmpty(); + boolean firstNode = group.getEdgeTags().isEmpty() && globalTags.isEmpty(); packer.startMap(firstNode ? 5 : 6); @@ -175,10 +182,13 @@ private void writeBucket(StatsBucket bucket, Writable packer) { if (!firstNode) { /* 6 */ packer.writeUTF8(EDGE_TAGS); - packer.startArray(group.getEdgeTags().size()); + packer.startArray(group.getEdgeTags().size() + globalTags.size()); for (String tag : group.getEdgeTags()) { packer.writeString(tag, null); } + for (String tag : globalTags) { + packer.writeString(tag, null); + } } } } @@ -189,10 +199,13 @@ private void writeBacklogs(Collection, Long>> backlogs, W for (Map.Entry, Long> entry : backlogs) { packer.startMap(2); packer.writeUTF8(BACKLOG_TAGS); - packer.startArray(entry.getKey().size()); + packer.startArray(entry.getKey().size() + globalTags.size()); for (String tag : entry.getKey()) { packer.writeString(tag, null); } + for (String tag : globalTags) { + packer.writeString(tag, null); + } packer.writeUTF8(BACKLOG_VALUE); packer.writeLong(entry.getValue()); } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index f3a435cec1e..29e0708e4a7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -134,6 +134,7 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) + dataStreams.addGlobalTag("global:value") dataStreams.start() dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null)) dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) @@ -186,37 +187,33 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == "Stats" assert unpacker.unpackArrayHeader() == 2 // 2 groups in first bucket - Set availableSizes = [5, 6] // we don't know the order the groups will be reported 2.times { - int mapHeaderSize = unpacker.unpackMapHeader() - assert availableSizes.remove(mapHeaderSize) - if (mapHeaderSize == 5) { // empty topic group - assert unpacker.unpackString() == "PathwayLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "EdgeLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "PayloadSize" - unpacker.skipValue() - assert unpacker.unpackString() == "Hash" - assert unpacker.unpackLong() == 9 + assert unpacker.unpackMapHeader() == 6 + + assert unpacker.unpackString() == "PathwayLatency" + unpacker.skipValue() + assert unpacker.unpackString() == "EdgeLatency" + unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() + assert unpacker.unpackString() == "Hash" + def hashValue = unpacker.unpackLong() + if (hashValue == 9) { assert unpacker.unpackString() == "ParentHash" assert unpacker.unpackLong() == 0 - } else { //other group - assert unpacker.unpackString() == "PathwayLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "EdgeLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "PayloadSize" - unpacker.skipValue() - assert unpacker.unpackString() == "Hash" - assert unpacker.unpackLong() == 1 + assert unpacker.unpackString() == "EdgeTags" + assert unpacker.unpackArrayHeader() == 1 + assert unpacker.unpackString() == "global:value" + } else { + assert hashValue == 1 assert unpacker.unpackString() == "ParentHash" assert unpacker.unpackLong() == 2 assert unpacker.unpackString() == "EdgeTags" - assert unpacker.unpackArrayHeader() == 3 + assert unpacker.unpackArrayHeader() == 4 assert unpacker.unpackString() == "type:testType" assert unpacker.unpackString() == "group:testGroup" assert unpacker.unpackString() == "topic:testTopic" + assert unpacker.unpackString() == "global:value" } } @@ -225,10 +222,11 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackArrayHeader() == 1 assert unpacker.unpackMapHeader() == 2 assert unpacker.unpackString() == "Tags" - assert unpacker.unpackArrayHeader() == 3 + assert unpacker.unpackArrayHeader() == 4 assert unpacker.unpackString() == "partition:1" assert unpacker.unpackString() == "topic:testTopic" assert unpacker.unpackString() == "type:kafka_produce" + assert unpacker.unpackString() == "global:value" assert unpacker.unpackString() == "Value" assert unpacker.unpackLong() == 130 @@ -256,10 +254,11 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == "ParentHash" assert unpacker.unpackLong() == (hash == 1 ? 2 : 4) assert unpacker.unpackString() == "EdgeTags" - assert unpacker.unpackArrayHeader() == 3 + assert unpacker.unpackArrayHeader() == 4 assert unpacker.unpackString() == "type:testType" assert unpacker.unpackString() == "group:testGroup" assert unpacker.unpackString() == (hash == 1 ? "topic:testTopic" : "topic:testTopic2") + assert unpacker.unpackString() == "global:value" } assert unpacker.unpackString() == "ProductMask" diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index e6ddac36bac..5b8a4d648d0 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -46,4 +46,7 @@ public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { /** clearThreadServiceName clears up service name override for Thread.currentThread() */ void clearThreadServiceName(); + + /** addGlobalTag allows adding global non-hashable tags to DSM payloads */ + void addGlobalTag(String tag); } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index 020b492639d..c73053c8b12 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -47,6 +47,9 @@ public void setThreadServiceName(String serviceName) {} @Override public void clearThreadServiceName() {} + @Override + public void addGlobalTag(String tag) {} + @Override public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {}