From eb9456db03108a47b3d7ec03c74a6903aaeebb9e Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Mon, 19 May 2025 17:16:35 -0500 Subject: [PATCH 1/8] Added addGlobalTag method --- .../DefaultDataStreamsMonitoring.java | 18 +++++++++--------- .../MsgPackDatastreamsPayloadWriter.java | 17 +++++++++++++---- .../AgentDataStreamsMonitoring.java | 3 +++ .../datastreams/NoopDataStreamsMonitoring.java | 3 +++ 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index fcf844baf0a..2cf01b8e4ee 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -36,14 +36,8 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.util.AgentTaskScheduler; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; + +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -78,6 +72,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even private volatile boolean configSupportsDataStreams = false; private final ConcurrentHashMap schemaSamplers; private static final ThreadLocal serviceNameOverride = new ThreadLocal<>(); + private static final Set globalTags = ConcurrentHashMap.newKeySet(); public DefaultDataStreamsMonitoring( Config config, @@ -111,7 +106,7 @@ public DefaultDataStreamsMonitoring( traceConfigSupplier, config.getWellKnownTags(), new MsgPackDatastreamsPayloadWriter( - sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag()), + sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag(), globalTags), Config.get().getDataStreamsBucketDurationNanoseconds()); } @@ -196,6 +191,11 @@ public void clearThreadServiceName() { serviceNameOverride.remove(); } + @Override + public void addGlobalTag(String tag) { + globalTags.add(tag); + } + private static String getThreadServiceName() { return serviceNameOverride.get(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 2baa8943de0..e16713bc81b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -12,6 +12,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter { private static final byte[] ENV = "Env".getBytes(ISO_8859_1); @@ -42,15 +43,17 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private final WellKnownTags wellKnownTags; private final byte[] tracerVersionValue; private final byte[] primaryTagValue; + private final Set globalTags; public MsgPackDatastreamsPayloadWriter( - Sink sink, WellKnownTags wellKnownTags, String tracerVersion, String primaryTag) { + Sink sink, WellKnownTags wellKnownTags, String tracerVersion, String primaryTag, Set globalTags) { buffer = new GrowableBuffer(INITIAL_CAPACITY); writer = new MsgPackWriter(buffer); this.sink = sink; this.wellKnownTags = wellKnownTags; tracerVersionValue = tracerVersion.getBytes(ISO_8859_1); primaryTagValue = primaryTag == null ? new byte[0] : primaryTag.getBytes(ISO_8859_1); + this.globalTags = globalTags; } public void reset() { @@ -148,7 +151,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) { Collection groups = bucket.getGroups(); packer.startArray(groups.size()); for (StatsGroup group : groups) { - boolean firstNode = group.getEdgeTags().isEmpty(); + boolean firstNode = group.getEdgeTags().isEmpty() && globalTags.isEmpty(); packer.startMap(firstNode ? 5 : 6); @@ -175,10 +178,13 @@ private void writeBucket(StatsBucket bucket, Writable packer) { if (!firstNode) { /* 6 */ packer.writeUTF8(EDGE_TAGS); - packer.startArray(group.getEdgeTags().size()); + packer.startArray(group.getEdgeTags().size() + globalTags.size()); for (String tag : group.getEdgeTags()) { packer.writeString(tag, null); } + for (String tag : globalTags) { + packer.writeString(tag, null); + } } } } @@ -189,10 +195,13 @@ private void writeBacklogs(Collection, Long>> backlogs, W for (Map.Entry, Long> entry : backlogs) { packer.startMap(2); packer.writeUTF8(BACKLOG_TAGS); - packer.startArray(entry.getKey().size()); + packer.startArray(entry.getKey().size() + globalTags.size()); for (String tag : entry.getKey()) { packer.writeString(tag, null); } + for (String tag : globalTags) { + packer.writeString(tag, null); + } packer.writeUTF8(BACKLOG_VALUE); packer.writeLong(entry.getValue()); } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index e6ddac36bac..5b8a4d648d0 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -46,4 +46,7 @@ public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { /** clearThreadServiceName clears up service name override for Thread.currentThread() */ void clearThreadServiceName(); + + /** addGlobalTag allows adding global non-hashable tags to DSM payloads */ + void addGlobalTag(String tag); } diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index 020b492639d..c73053c8b12 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -47,6 +47,9 @@ public void setThreadServiceName(String serviceName) {} @Override public void clearThreadServiceName() {} + @Override + public void addGlobalTag(String tag) {} + @Override public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {} From 5efd832ef84ca59a6692ab11f0d3fa09291883f4 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Mon, 19 May 2025 17:33:31 -0500 Subject: [PATCH 2/8] Added service_type tags --- .../instrumentation/spark/SparkExecutorDecorator.java | 3 +++ .../spark/DatadogSpark212Listener.java | 3 +++ .../spark/DatadogSpark213Listener.java | 3 +++ .../datastreams/DefaultDataStreamsMonitoring.java | 11 +++++++++-- 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index 0f9519c696c..ed18143a0db 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -1,6 +1,7 @@ package datadog.trace.instrumentation.spark; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; import org.apache.spark.executor.Executor; @@ -27,6 +28,8 @@ protected CharSequence component() { } public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) { + AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_task"); + span.setTag("task_id", taskRunner.taskId()); span.setTag("task_thread_name", taskRunner.threadName()); } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index fdae211077e..bd1e9b4af80 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -3,6 +3,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; + +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; @@ -19,6 +21,7 @@ public class DatadogSpark212Listener extends AbstractDatadogSparkListener { public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); + AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } @Override diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 115cdcbb9b0..64281f01afb 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -3,6 +3,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; + +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; @@ -19,6 +21,7 @@ public class DatadogSpark213Listener extends AbstractDatadogSparkListener { public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); + AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index 2cf01b8e4ee..d1a521892b6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -36,8 +36,15 @@ import datadog.trace.core.DDSpan; import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.util.AgentTaskScheduler; - -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; From baeff396165d5d35961c7232790a0707c24fda79 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Mon, 19 May 2025 17:34:58 -0500 Subject: [PATCH 3/8] Spotless apply --- .../core/datastreams/DefaultDataStreamsMonitoring.java | 6 +++++- .../core/datastreams/MsgPackDatastreamsPayloadWriter.java | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index d1a521892b6..f98f5116c2f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -113,7 +113,11 @@ public DefaultDataStreamsMonitoring( traceConfigSupplier, config.getWellKnownTags(), new MsgPackDatastreamsPayloadWriter( - sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag(), globalTags), + sink, + config.getWellKnownTags(), + DDTraceCoreInfo.VERSION, + config.getPrimaryTag(), + globalTags), Config.get().getDataStreamsBucketDurationNanoseconds()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index e16713bc81b..d20dc967021 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -46,7 +46,11 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private final Set globalTags; public MsgPackDatastreamsPayloadWriter( - Sink sink, WellKnownTags wellKnownTags, String tracerVersion, String primaryTag, Set globalTags) { + Sink sink, + WellKnownTags wellKnownTags, + String tracerVersion, + String primaryTag, + Set globalTags) { buffer = new GrowableBuffer(INITIAL_CAPACITY); writer = new MsgPackWriter(buffer); this.sink = sink; From c1b257dffc9a3a38cca7e0e20bb1e962c86c9ed6 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 20 May 2025 09:37:15 -0500 Subject: [PATCH 4/8] Spotless apply --- .../trace/instrumentation/spark/DatadogSpark213Listener.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 64281f01afb..3fe2db2dfe9 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -1,10 +1,9 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import java.util.ArrayList; import java.util.Collection; import java.util.List; - -import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; From 3b97b2cd8542ef53a267997b6be15d25804a648f Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 20 May 2025 09:57:06 -0500 Subject: [PATCH 5/8] Spotless apply, again --- .../trace/instrumentation/spark/DatadogSpark212Listener.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index bd1e9b4af80..93c05682bfb 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -1,10 +1,9 @@ package datadog.trace.instrumentation.spark; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import java.util.ArrayList; import java.util.Collection; import java.util.List; - -import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.StageInfo; From 4f7254e8aa0d53841291f9f7de17b2bc09538f5c Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 20 May 2025 13:26:13 -0500 Subject: [PATCH 6/8] Added temporary debug print statements --- .../trace/instrumentation/spark/SparkExecutorDecorator.java | 1 + .../trace/instrumentation/spark/DatadogSpark212Listener.java | 1 + .../trace/instrumentation/spark/DatadogSpark213Listener.java | 1 + .../trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java | 2 ++ 4 files changed, 5 insertions(+) diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index ed18143a0db..c525c532650 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -28,6 +28,7 @@ protected CharSequence component() { } public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) { + System.out.println("### Setting global tag in task start"); AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_task"); span.setTag("task_id", taskRunner.taskId()); diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index 93c05682bfb..7b283b9ed09 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -20,6 +20,7 @@ public class DatadogSpark212Listener extends AbstractDatadogSparkListener { public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); + System.out.println("### Setting global tag in listener"); AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 3fe2db2dfe9..4f02054602a 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -20,6 +20,7 @@ public class DatadogSpark213Listener extends AbstractDatadogSparkListener { public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); + System.out.println("### Setting global tag in listener"); AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index d20dc967021..55aa244757d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -187,6 +187,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) { packer.writeString(tag, null); } for (String tag : globalTags) { + System.out.println("### writing global tag for stats " + tag); packer.writeString(tag, null); } } @@ -204,6 +205,7 @@ private void writeBacklogs(Collection, Long>> backlogs, W packer.writeString(tag, null); } for (String tag : globalTags) { + System.out.println("### writing global tag for backlogs " + tag); packer.writeString(tag, null); } packer.writeUTF8(BACKLOG_VALUE); From ee7ee6410501b2ab38dd14a20d8d21095132856f Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Tue, 20 May 2025 16:14:28 -0500 Subject: [PATCH 7/8] Removed debug logs --- .../trace/instrumentation/spark/SparkExecutorDecorator.java | 1 - .../trace/instrumentation/spark/DatadogSpark212Listener.java | 1 - .../trace/instrumentation/spark/DatadogSpark213Listener.java | 1 - .../trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java | 2 -- 4 files changed, 5 deletions(-) diff --git a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java index c525c532650..ed18143a0db 100644 --- a/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java +++ b/dd-java-agent/instrumentation/spark-executor/src/main/java/datadog/trace/instrumentation/spark/SparkExecutorDecorator.java @@ -28,7 +28,6 @@ protected CharSequence component() { } public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) { - System.out.println("### Setting global tag in task start"); AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_task"); span.setTag("task_id", taskRunner.taskId()); diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java index 7b283b9ed09..93c05682bfb 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark212Listener.java @@ -20,7 +20,6 @@ public class DatadogSpark212Listener extends AbstractDatadogSparkListener { public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); - System.out.println("### Setting global tag in listener"); AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java index 4f02054602a..3fe2db2dfe9 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/DatadogSpark213Listener.java @@ -20,7 +20,6 @@ public class DatadogSpark213Listener extends AbstractDatadogSparkListener { public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) { super(sparkConf, appId, sparkVersion); - System.out.println("### Setting global tag in listener"); AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app"); } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 55aa244757d..d20dc967021 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -187,7 +187,6 @@ private void writeBucket(StatsBucket bucket, Writable packer) { packer.writeString(tag, null); } for (String tag : globalTags) { - System.out.println("### writing global tag for stats " + tag); packer.writeString(tag, null); } } @@ -205,7 +204,6 @@ private void writeBacklogs(Collection, Long>> backlogs, W packer.writeString(tag, null); } for (String tag : globalTags) { - System.out.println("### writing global tag for backlogs " + tag); packer.writeString(tag, null); } packer.writeUTF8(BACKLOG_VALUE); From c1b1b5c5b2ffc5fdf1b4218c9ab639ab7f354785 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Wed, 21 May 2025 15:29:52 -0500 Subject: [PATCH 8/8] Added global tags to tests --- .../datastreams/DataStreamsWritingTest.groovy | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy index f3a435cec1e..29e0708e4a7 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DataStreamsWritingTest.groovy @@ -134,6 +134,7 @@ class DataStreamsWritingTest extends DDCoreSpecification { when: def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig }) + dataStreams.addGlobalTag("global:value") dataStreams.start() dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null)) dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null)) @@ -186,37 +187,33 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == "Stats" assert unpacker.unpackArrayHeader() == 2 // 2 groups in first bucket - Set availableSizes = [5, 6] // we don't know the order the groups will be reported 2.times { - int mapHeaderSize = unpacker.unpackMapHeader() - assert availableSizes.remove(mapHeaderSize) - if (mapHeaderSize == 5) { // empty topic group - assert unpacker.unpackString() == "PathwayLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "EdgeLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "PayloadSize" - unpacker.skipValue() - assert unpacker.unpackString() == "Hash" - assert unpacker.unpackLong() == 9 + assert unpacker.unpackMapHeader() == 6 + + assert unpacker.unpackString() == "PathwayLatency" + unpacker.skipValue() + assert unpacker.unpackString() == "EdgeLatency" + unpacker.skipValue() + assert unpacker.unpackString() == "PayloadSize" + unpacker.skipValue() + assert unpacker.unpackString() == "Hash" + def hashValue = unpacker.unpackLong() + if (hashValue == 9) { assert unpacker.unpackString() == "ParentHash" assert unpacker.unpackLong() == 0 - } else { //other group - assert unpacker.unpackString() == "PathwayLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "EdgeLatency" - unpacker.skipValue() - assert unpacker.unpackString() == "PayloadSize" - unpacker.skipValue() - assert unpacker.unpackString() == "Hash" - assert unpacker.unpackLong() == 1 + assert unpacker.unpackString() == "EdgeTags" + assert unpacker.unpackArrayHeader() == 1 + assert unpacker.unpackString() == "global:value" + } else { + assert hashValue == 1 assert unpacker.unpackString() == "ParentHash" assert unpacker.unpackLong() == 2 assert unpacker.unpackString() == "EdgeTags" - assert unpacker.unpackArrayHeader() == 3 + assert unpacker.unpackArrayHeader() == 4 assert unpacker.unpackString() == "type:testType" assert unpacker.unpackString() == "group:testGroup" assert unpacker.unpackString() == "topic:testTopic" + assert unpacker.unpackString() == "global:value" } } @@ -225,10 +222,11 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackArrayHeader() == 1 assert unpacker.unpackMapHeader() == 2 assert unpacker.unpackString() == "Tags" - assert unpacker.unpackArrayHeader() == 3 + assert unpacker.unpackArrayHeader() == 4 assert unpacker.unpackString() == "partition:1" assert unpacker.unpackString() == "topic:testTopic" assert unpacker.unpackString() == "type:kafka_produce" + assert unpacker.unpackString() == "global:value" assert unpacker.unpackString() == "Value" assert unpacker.unpackLong() == 130 @@ -256,10 +254,11 @@ class DataStreamsWritingTest extends DDCoreSpecification { assert unpacker.unpackString() == "ParentHash" assert unpacker.unpackLong() == (hash == 1 ? 2 : 4) assert unpacker.unpackString() == "EdgeTags" - assert unpacker.unpackArrayHeader() == 3 + assert unpacker.unpackArrayHeader() == 4 assert unpacker.unpackString() == "type:testType" assert unpacker.unpackString() == "group:testGroup" assert unpacker.unpackString() == (hash == 1 ? "topic:testTopic" : "topic:testTopic2") + assert unpacker.unpackString() == "global:value" } assert unpacker.unpackString() == "ProductMask"