Skip to content

Global tags support for Data Streams #8852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.instrumentation.spark;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator;
import org.apache.spark.executor.Executor;
Expand All @@ -27,6 +28,8 @@ protected CharSequence component() {
}

public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) {
AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_task");

span.setTag("task_id", taskRunner.taskId());
span.setTag("task_thread_name", taskRunner.threadName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.spark;

import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -19,6 +20,7 @@
public class DatadogSpark212Listener extends AbstractDatadogSparkListener {
public DatadogSpark212Listener(SparkConf sparkConf, String appId, String sparkVersion) {
super(sparkConf, appId, sparkVersion);
AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.spark;

import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -19,6 +20,7 @@
public class DatadogSpark213Listener extends AbstractDatadogSparkListener {
public DatadogSpark213Listener(SparkConf sparkConf, String appId, String sparkVersion) {
super(sparkConf, appId, sparkVersion);
AgentTracer.get().getDataStreamsMonitoring().addGlobalTag("service_type:spark_app");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class DefaultDataStreamsMonitoring implements DataStreamsMonitoring, Even
private volatile boolean configSupportsDataStreams = false;
private final ConcurrentHashMap<String, SchemaSampler> schemaSamplers;
private static final ThreadLocal<String> serviceNameOverride = new ThreadLocal<>();
private static final Set<String> globalTags = ConcurrentHashMap.newKeySet();

public DefaultDataStreamsMonitoring(
Config config,
Expand Down Expand Up @@ -111,7 +113,11 @@ public DefaultDataStreamsMonitoring(
traceConfigSupplier,
config.getWellKnownTags(),
new MsgPackDatastreamsPayloadWriter(
sink, config.getWellKnownTags(), DDTraceCoreInfo.VERSION, config.getPrimaryTag()),
sink,
config.getWellKnownTags(),
DDTraceCoreInfo.VERSION,
config.getPrimaryTag(),
globalTags),
Config.get().getDataStreamsBucketDurationNanoseconds());
}

Expand Down Expand Up @@ -196,6 +202,11 @@ public void clearThreadServiceName() {
serviceNameOverride.remove();
}

@Override
public void addGlobalTag(String tag) {
globalTags.add(tag);
}

private static String getThreadServiceName() {
return serviceNameOverride.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
private static final byte[] ENV = "Env".getBytes(ISO_8859_1);
Expand Down Expand Up @@ -42,15 +43,21 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter
private final WellKnownTags wellKnownTags;
private final byte[] tracerVersionValue;
private final byte[] primaryTagValue;
private final Set<String> globalTags;

public MsgPackDatastreamsPayloadWriter(
Sink sink, WellKnownTags wellKnownTags, String tracerVersion, String primaryTag) {
Sink sink,
WellKnownTags wellKnownTags,
String tracerVersion,
String primaryTag,
Set<String> globalTags) {
buffer = new GrowableBuffer(INITIAL_CAPACITY);
writer = new MsgPackWriter(buffer);
this.sink = sink;
this.wellKnownTags = wellKnownTags;
tracerVersionValue = tracerVersion.getBytes(ISO_8859_1);
primaryTagValue = primaryTag == null ? new byte[0] : primaryTag.getBytes(ISO_8859_1);
this.globalTags = globalTags;
}

public void reset() {
Expand Down Expand Up @@ -148,7 +155,7 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
Collection<StatsGroup> groups = bucket.getGroups();
packer.startArray(groups.size());
for (StatsGroup group : groups) {
boolean firstNode = group.getEdgeTags().isEmpty();
boolean firstNode = group.getEdgeTags().isEmpty() && globalTags.isEmpty();

packer.startMap(firstNode ? 5 : 6);

Expand All @@ -175,10 +182,13 @@ private void writeBucket(StatsBucket bucket, Writable packer) {
if (!firstNode) {
/* 6 */
packer.writeUTF8(EDGE_TAGS);
packer.startArray(group.getEdgeTags().size());
packer.startArray(group.getEdgeTags().size() + globalTags.size());
for (String tag : group.getEdgeTags()) {
packer.writeString(tag, null);
}
for (String tag : globalTags) {
packer.writeString(tag, null);
}
}
}
}
Expand All @@ -189,10 +199,13 @@ private void writeBacklogs(Collection<Map.Entry<List<String>, Long>> backlogs, W
for (Map.Entry<List<String>, Long> entry : backlogs) {
packer.startMap(2);
packer.writeUTF8(BACKLOG_TAGS);
packer.startArray(entry.getKey().size());
packer.startArray(entry.getKey().size() + globalTags.size());
for (String tag : entry.getKey()) {
packer.writeString(tag, null);
}
for (String tag : globalTags) {
packer.writeString(tag, null);
}
packer.writeUTF8(BACKLOG_VALUE);
packer.writeLong(entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class DataStreamsWritingTest extends DDCoreSpecification {

when:
def dataStreams = new DefaultDataStreamsMonitoring(fakeConfig, sharedCommObjects, timeSource, { traceConfig })
dataStreams.addGlobalTag("global:value")
dataStreams.start()
dataStreams.add(new StatsPoint([], 9, 0, 10, timeSource.currentTimeNanos, 0, 0, 0, null))
dataStreams.add(new StatsPoint(["type:testType", "group:testGroup", "topic:testTopic"], 1, 2, 5, timeSource.currentTimeNanos, 0, 0, 0, null))
Expand Down Expand Up @@ -186,37 +187,33 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackString() == "Stats"
assert unpacker.unpackArrayHeader() == 2 // 2 groups in first bucket

Set availableSizes = [5, 6] // we don't know the order the groups will be reported
2.times {
int mapHeaderSize = unpacker.unpackMapHeader()
assert availableSizes.remove(mapHeaderSize)
if (mapHeaderSize == 5) { // empty topic group
assert unpacker.unpackString() == "PathwayLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
assert unpacker.unpackLong() == 9
assert unpacker.unpackMapHeader() == 6

assert unpacker.unpackString() == "PathwayLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
def hashValue = unpacker.unpackLong()
if (hashValue == 9) {
assert unpacker.unpackString() == "ParentHash"
assert unpacker.unpackLong() == 0
} else { //other group
assert unpacker.unpackString() == "PathwayLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "EdgeLatency"
unpacker.skipValue()
assert unpacker.unpackString() == "PayloadSize"
unpacker.skipValue()
assert unpacker.unpackString() == "Hash"
assert unpacker.unpackLong() == 1
assert unpacker.unpackString() == "EdgeTags"
assert unpacker.unpackArrayHeader() == 1
assert unpacker.unpackString() == "global:value"
} else {
assert hashValue == 1
assert unpacker.unpackString() == "ParentHash"
assert unpacker.unpackLong() == 2
assert unpacker.unpackString() == "EdgeTags"
assert unpacker.unpackArrayHeader() == 3
assert unpacker.unpackArrayHeader() == 4
assert unpacker.unpackString() == "type:testType"
assert unpacker.unpackString() == "group:testGroup"
assert unpacker.unpackString() == "topic:testTopic"
assert unpacker.unpackString() == "global:value"
}
}

Expand All @@ -225,10 +222,11 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackArrayHeader() == 1
assert unpacker.unpackMapHeader() == 2
assert unpacker.unpackString() == "Tags"
assert unpacker.unpackArrayHeader() == 3
assert unpacker.unpackArrayHeader() == 4
assert unpacker.unpackString() == "partition:1"
assert unpacker.unpackString() == "topic:testTopic"
assert unpacker.unpackString() == "type:kafka_produce"
assert unpacker.unpackString() == "global:value"
assert unpacker.unpackString() == "Value"
assert unpacker.unpackLong() == 130

Expand Down Expand Up @@ -256,10 +254,11 @@ class DataStreamsWritingTest extends DDCoreSpecification {
assert unpacker.unpackString() == "ParentHash"
assert unpacker.unpackLong() == (hash == 1 ? 2 : 4)
assert unpacker.unpackString() == "EdgeTags"
assert unpacker.unpackArrayHeader() == 3
assert unpacker.unpackArrayHeader() == 4
assert unpacker.unpackString() == "type:testType"
assert unpacker.unpackString() == "group:testGroup"
assert unpacker.unpackString() == (hash == 1 ? "topic:testTopic" : "topic:testTopic2")
assert unpacker.unpackString() == "global:value"
}

assert unpacker.unpackString() == "ProductMask"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,7 @@ public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer {

/** clearThreadServiceName clears up service name override for Thread.currentThread() */
void clearThreadServiceName();

/** addGlobalTag allows adding global non-hashable tags to DSM payloads */
void addGlobalTag(String tag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public void setThreadServiceName(String serviceName) {}
@Override
public void clearThreadServiceName() {}

@Override
public void addGlobalTag(String tag) {}

@Override
public void setConsumeCheckpoint(String type, String source, DataStreamsContextCarrier carrier) {}

Expand Down
Loading