Skip to content

Commit be1dd36

Browse files
sdwr98Spikhalskiy
authored andcommitted
Addresses temporalio#258 - OpenTracing support
Enhances the ContextPropagator API by providing more lifecycle methods, and adds an implementation of OpenTracing
1 parent 86189b2 commit be1dd36

16 files changed

+1018
-413
lines changed

temporal-sdk/build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,14 @@ dependencies {
4242
api project(':temporal-serviceclient')
4343
api group: 'com.google.code.gson', name: 'gson', version: '2.8.6'
4444
api group: 'io.micrometer', name: 'micrometer-core', version: '1.6.3'
45+
api group: 'io.opentracing', name: 'opentracing-api', version: '0.33.0'
4546

4647
implementation group: 'com.google.guava', name: 'guava', version: '30.1-jre'
4748
implementation group: 'com.cronutils', name: 'cron-utils', version: '9.1.3'
4849
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.1'
4950
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.1'
51+
implementation group: 'io.opentracing', name: 'opentracing-util', version: '0.33.0'
52+
5053
if (!JavaVersion.current().isJava8()) {
5154
implementation 'javax.annotation:javax.annotation-api:1.3.2'
5255
}
@@ -55,6 +58,7 @@ dependencies {
5558
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
5659
testImplementation group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
5760
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
61+
testImplementation group: 'io.opentracing', name: 'opentracing-mock', version: '0.33.0'
5862
}
5963

6064
configurations.all {

temporal-sdk/src/main/java/io/temporal/common/context/ContextPropagator.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@
2323
import java.util.Map;
2424

2525
/**
26-
* Context Propagators are used to propagate information from workflow to activity, workflow to
27-
* child workflow, and workflow to child thread (using {@link io.temporal.workflow.Async}).
26+
* Context Propagators are used to propagate information from workflow stub to workflow, workflow to
27+
* activity, workflow to child workflow, and workflow to child thread (using {@link
28+
* io.temporal.workflow.Async}).
29+
*
30+
* <p>It is important to note that all threads share one ContextPropagator instance, so your
31+
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
2832
*
2933
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
3034
* with a given prefix along the code path looks like this:
@@ -126,4 +130,27 @@ public interface ContextPropagator {
126130

127131
/** Sets the current context */
128132
void setCurrentContext(Object context);
133+
134+
/**
135+
* This is a lifecycle method, called after the context has been propagated to the
136+
* workflow/activity thread but the workflow/activity has not yet started.
137+
*/
138+
default void setUp() {
139+
// No-op
140+
}
141+
142+
/** This is a lifecycle method, called after the workflow/activity has completed. */
143+
default void finish() {
144+
// No-op
145+
}
146+
147+
/**
148+
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
149+
* exception. {@link #finish()} is called after this method.
150+
*
151+
* @param t The unhandled exception that caused the workflow/activity to terminate
152+
*/
153+
default void onError(Throwable t) {
154+
// No-op
155+
}
129156
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.common.context;
21+
22+
import io.opentracing.Scope;
23+
import io.opentracing.Span;
24+
import io.opentracing.SpanContext;
25+
import io.opentracing.Tracer;
26+
import io.opentracing.log.Fields;
27+
import io.opentracing.propagation.Format;
28+
import io.opentracing.propagation.TextMap;
29+
import io.opentracing.tag.Tags;
30+
import io.opentracing.util.GlobalTracer;
31+
import io.temporal.api.common.v1.Payload;
32+
import io.temporal.common.converter.DataConverter;
33+
import io.temporal.internal.logging.LoggerTag;
34+
import java.util.HashMap;
35+
import java.util.Iterator;
36+
import java.util.Map;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
import org.slf4j.MDC;
40+
41+
/** Support for OpenTracing spans */
42+
public class OpenTracingContextPropagator implements ContextPropagator {
43+
44+
private static final Logger log = LoggerFactory.getLogger(OpenTracingContextPropagator.class);
45+
46+
private static final ThreadLocal<SpanContext> currentOpenTracingSpanContext = new ThreadLocal<>();
47+
private static final ThreadLocal<Span> currentOpenTracingSpan = new ThreadLocal<>();
48+
private static final ThreadLocal<Scope> currentOpenTracingScope = new ThreadLocal<>();
49+
50+
public static SpanContext getCurrentOpenTracingSpanContext() {
51+
return currentOpenTracingSpanContext.get();
52+
}
53+
54+
public static void setCurrentOpenTracingSpanContext(SpanContext ctx) {
55+
if (ctx != null) {
56+
currentOpenTracingSpanContext.set(ctx);
57+
}
58+
}
59+
60+
@Override
61+
public String getName() {
62+
return "OpenTracing";
63+
}
64+
65+
@Override
66+
public Map<String, Payload> serializeContext(Object context) {
67+
Map<String, Payload> serializedContext = new HashMap<>();
68+
Map<String, String> contextMap = (Map<String, String>) context;
69+
if (contextMap != null) {
70+
for (Map.Entry<String, String> entry : contextMap.entrySet()) {
71+
serializedContext.put(
72+
entry.getKey(), DataConverter.getDefaultInstance().toPayload(entry.getValue()).get());
73+
}
74+
}
75+
return serializedContext;
76+
}
77+
78+
@Override
79+
public Object deserializeContext(Map<String, Payload> context) {
80+
Map<String, String> contextMap = new HashMap<>();
81+
for (Map.Entry<String, Payload> entry : context.entrySet()) {
82+
contextMap.put(
83+
entry.getKey(),
84+
DataConverter.getDefaultInstance()
85+
.fromPayload(entry.getValue(), String.class, String.class));
86+
}
87+
return contextMap;
88+
}
89+
90+
@Override
91+
public Object getCurrentContext() {
92+
Tracer currentTracer = GlobalTracer.get();
93+
Span currentSpan = currentTracer.scopeManager().activeSpan();
94+
if (currentSpan != null) {
95+
HashMapTextMap contextTextMap = new HashMapTextMap();
96+
currentTracer.inject(currentSpan.context(), Format.Builtin.TEXT_MAP, contextTextMap);
97+
return contextTextMap.getBackingMap();
98+
} else {
99+
return null;
100+
}
101+
}
102+
103+
@Override
104+
public void setCurrentContext(Object context) {
105+
Tracer currentTracer = GlobalTracer.get();
106+
Map<String, String> contextAsMap = (Map<String, String>) context;
107+
if (contextAsMap != null) {
108+
HashMapTextMap contextTextMap = new HashMapTextMap(contextAsMap);
109+
setCurrentOpenTracingSpanContext(
110+
currentTracer.extract(Format.Builtin.TEXT_MAP, contextTextMap));
111+
}
112+
}
113+
114+
@Override
115+
public void setUp() {
116+
Tracer openTracingTracer = GlobalTracer.get();
117+
Tracer.SpanBuilder builder =
118+
openTracingTracer
119+
.buildSpan("cadence.workflow")
120+
.withTag("resource.name", MDC.get(LoggerTag.WORKFLOW_TYPE));
121+
if (getCurrentOpenTracingSpanContext() != null) {
122+
builder.asChildOf(getCurrentOpenTracingSpanContext());
123+
}
124+
Span span = builder.start();
125+
openTracingTracer.activateSpan(span);
126+
currentOpenTracingSpan.set(span);
127+
Scope scope = openTracingTracer.activateSpan(span);
128+
currentOpenTracingScope.set(scope);
129+
}
130+
131+
@Override
132+
public void onError(Throwable t) {
133+
Span span = currentOpenTracingSpan.get();
134+
if (span != null) {
135+
Tags.ERROR.set(span, true);
136+
Map<String, Object> errorData = new HashMap<>();
137+
errorData.put(Fields.EVENT, "error");
138+
if (t != null) {
139+
errorData.put(Fields.ERROR_OBJECT, t);
140+
errorData.put(Fields.MESSAGE, t.getMessage());
141+
}
142+
span.log(errorData);
143+
}
144+
}
145+
146+
@Override
147+
public void finish() {
148+
Scope currentScope = currentOpenTracingScope.get();
149+
Span currentSpan = currentOpenTracingSpan.get();
150+
if (currentScope != null) {
151+
currentScope.close();
152+
}
153+
if (currentSpan != null) {
154+
currentSpan.finish();
155+
}
156+
currentOpenTracingScope.remove();
157+
currentOpenTracingSpan.remove();
158+
currentOpenTracingSpanContext.remove();
159+
}
160+
161+
/** Just check for other instances of the same class */
162+
@Override
163+
public boolean equals(Object obj) {
164+
if (obj == null) {
165+
return false;
166+
}
167+
if (this == obj) {
168+
return true;
169+
}
170+
return this.getClass().equals(obj.getClass());
171+
}
172+
173+
@Override
174+
public int hashCode() {
175+
return this.getClass().hashCode();
176+
}
177+
178+
private class HashMapTextMap implements TextMap {
179+
private final HashMap<String, String> backingMap = new HashMap<>();
180+
181+
public HashMapTextMap() {
182+
// Noop
183+
}
184+
185+
public HashMapTextMap(Map<String, String> spanData) {
186+
backingMap.putAll(spanData);
187+
}
188+
189+
@Override
190+
public Iterator<Map.Entry<String, String>> iterator() {
191+
return backingMap.entrySet().iterator();
192+
}
193+
194+
@Override
195+
public void put(String key, String value) {
196+
backingMap.put(key, value);
197+
}
198+
199+
public HashMap<String, String> getBackingMap() {
200+
return backingMap;
201+
}
202+
}
203+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.internal.context;
21+
22+
import io.temporal.common.context.ContextPropagator;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
/** This class holds the current set of context propagators */
30+
public abstract class AbstractContextThreadLocal {
31+
32+
private static final Logger log = LoggerFactory.getLogger(AbstractContextThreadLocal.class);
33+
34+
/**
35+
* Returns the context propagators for the current thread
36+
*
37+
* @return
38+
*/
39+
protected abstract List<ContextPropagator> getPropagatorsForThread();
40+
41+
/** Sets the context propagators for this thread */
42+
public abstract void setContextPropagators(List<ContextPropagator> contextPropagators);
43+
44+
public List<ContextPropagator> getContextPropagators() {
45+
return getPropagatorsForThread();
46+
}
47+
48+
public Map<String, Object> getCurrentContextForPropagation() {
49+
Map<String, Object> contextData = new HashMap<>();
50+
for (ContextPropagator propagator : getPropagatorsForThread()) {
51+
contextData.put(propagator.getName(), propagator.getCurrentContext());
52+
}
53+
return contextData;
54+
}
55+
56+
/**
57+
* Injects the context data into the thread for each configured context propagator
58+
*
59+
* @param contextData The context data received from the server
60+
*/
61+
public void propagateContextToCurrentThread(Map<String, Object> contextData) {
62+
if (contextData == null || contextData.isEmpty()) {
63+
return;
64+
}
65+
for (ContextPropagator propagator : getPropagatorsForThread()) {
66+
if (contextData.containsKey(propagator.getName())) {
67+
propagator.setCurrentContext(contextData.get(propagator.getName()));
68+
}
69+
}
70+
}
71+
72+
/** Calls {@link ContextPropagator#setUp()} for each propagator */
73+
public void setUpContextPropagators() {
74+
for (ContextPropagator propagator : getPropagatorsForThread()) {
75+
try {
76+
propagator.setUp();
77+
} catch (Throwable t) {
78+
// Don't let an error in one propagator block the others
79+
log.error("Error calling setUp() on a contextpropagator", t);
80+
}
81+
}
82+
}
83+
84+
/**
85+
* Calls {@link ContextPropagator#onError(Throwable)} for each propagator
86+
*
87+
* @param t The Throwable that caused the workflow/activity to finish
88+
*/
89+
public void onErrorContextPropagators(Throwable t) {
90+
for (ContextPropagator propagator : getPropagatorsForThread()) {
91+
try {
92+
propagator.onError(t);
93+
} catch (Throwable t1) {
94+
// Don't let an error in one propagator block the others
95+
log.error("Error calling onError() on a contextpropagator", t1);
96+
}
97+
}
98+
}
99+
100+
/** Calls {@link ContextPropagator#finish()} for each propagator */
101+
public void finishContextPropagators() {
102+
for (ContextPropagator propagator : getPropagatorsForThread()) {
103+
try {
104+
propagator.finish();
105+
} catch (Throwable t) {
106+
// Don't let an error in one propagator block the others
107+
log.error("Error calling finish() on a contextpropagator", t);
108+
}
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)