Skip to content

Commit f952f3b

Browse files
Extract Vert.x json body response schemas
1 parent ba4e70f commit f952f3b

File tree

12 files changed

+281
-2
lines changed

12 files changed

+281
-2
lines changed

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public class AppSecRequestContext implements DataBundle, Closeable {
105105
private boolean reqDataPublished;
106106
private boolean rawReqBodyPublished;
107107
private boolean convertedReqBodyPublished;
108+
private boolean responseBodyPublished;
108109
private boolean respDataPublished;
109110
private boolean pathParamsPublished;
110111
private volatile Map<String, String> derivatives;
@@ -502,6 +503,14 @@ public void setConvertedReqBodyPublished(boolean convertedReqBodyPublished) {
502503
this.convertedReqBodyPublished = convertedReqBodyPublished;
503504
}
504505

506+
public boolean isResponseBodyPublished() {
507+
return responseBodyPublished;
508+
}
509+
510+
public void setResponseBodyPublished(final boolean responseBodyPublished) {
511+
this.responseBodyPublished = responseBodyPublished;
512+
}
513+
505514
public boolean isRespDataPublished() {
506515
return respDataPublished;
507516
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public class GatewayBridge {
9696
private volatile DataSubscriberInfo initialReqDataSubInfo;
9797
private volatile DataSubscriberInfo rawRequestBodySubInfo;
9898
private volatile DataSubscriberInfo requestBodySubInfo;
99+
private volatile DataSubscriberInfo responseBodySubInfo;
99100
private volatile DataSubscriberInfo pathParamsSubInfo;
100101
private volatile DataSubscriberInfo respDataSubInfo;
101102
private volatile DataSubscriberInfo grpcServerMethodSubInfo;
@@ -135,6 +136,7 @@ public void init() {
135136
subscriptionService.registerCallback(EVENTS.requestMethodUriRaw(), this::onRequestMethodUriRaw);
136137
subscriptionService.registerCallback(EVENTS.requestBodyStart(), this::onRequestBodyStart);
137138
subscriptionService.registerCallback(EVENTS.requestBodyDone(), this::onRequestBodyDone);
139+
subscriptionService.registerCallback(EVENTS.responseBody(), this::onResponseBody);
138140
subscriptionService.registerCallback(
139141
EVENTS.requestClientSocketAddress(), this::onRequestClientSocketAddress);
140142
subscriptionService.registerCallback(
@@ -174,6 +176,7 @@ public void reset() {
174176
initialReqDataSubInfo = null;
175177
rawRequestBodySubInfo = null;
176178
requestBodySubInfo = null;
179+
responseBodySubInfo = null;
177180
pathParamsSubInfo = null;
178181
respDataSubInfo = null;
179182
grpcServerMethodSubInfo = null;
@@ -627,6 +630,40 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
627630
}
628631
}
629632

633+
private Flow<Void> onResponseBody(RequestContext ctx_, Object obj) {
634+
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
635+
if (ctx == null) {
636+
return NoopFlow.INSTANCE;
637+
}
638+
639+
if (ctx.isResponseBodyPublished()) {
640+
log.debug(
641+
"Response body already published; will ignore new value of type {}", obj.getClass());
642+
return NoopFlow.INSTANCE;
643+
}
644+
ctx.setResponseBodyPublished(true);
645+
646+
while (true) {
647+
DataSubscriberInfo subInfo = responseBodySubInfo;
648+
if (subInfo == null) {
649+
subInfo = producerService.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT);
650+
responseBodySubInfo = subInfo;
651+
}
652+
if (subInfo == null || subInfo.isEmpty()) {
653+
return NoopFlow.INSTANCE;
654+
}
655+
// TODO: review schema extraction limits
656+
Object converted = ObjectIntrospection.convert(obj, ctx);
657+
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.RESPONSE_BODY_OBJECT, converted);
658+
try {
659+
GatewayContext gwCtx = new GatewayContext(false);
660+
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
661+
} catch (ExpiredSubscriberInfoException e) {
662+
responseBodySubInfo = null;
663+
}
664+
}
665+
}
666+
630667
private Flow<Void> onRequestPathParams(RequestContext ctx_, Map<String, ?> data) {
631668
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
632669
if (ctx == null || ctx.isPathParamsPublished()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package datadog.trace.instrumentation.vertx_4_0.server;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
6+
7+
import com.google.auto.service.AutoService;
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import datadog.trace.agent.tooling.InstrumenterModule;
10+
import datadog.trace.agent.tooling.muzzle.Reference;
11+
import io.vertx.ext.web.impl.RoutingContextImpl;
12+
13+
/**
14+
* @see RoutingContextImpl#getBodyAsJson(int)
15+
* @see RoutingContextImpl#getBodyAsJsonArray(int)
16+
*/
17+
@AutoService(InstrumenterModule.class)
18+
public class RoutingContextInstrumentation extends InstrumenterModule.AppSec
19+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
20+
21+
public RoutingContextInstrumentation() {
22+
super("vertx", "vertx-4.0");
23+
}
24+
25+
@Override
26+
public Reference[] additionalMuzzleReferences() {
27+
return new Reference[] {VertxVersionMatcher.HTTP_1X_SERVER_RESPONSE};
28+
}
29+
30+
@Override
31+
public String instrumentedType() {
32+
return "io.vertx.ext.web.RoutingContext";
33+
}
34+
35+
@Override
36+
public void methodAdvice(MethodTransformer transformer) {
37+
transformer.applyAdvice(
38+
named("json").and(takesArguments(1)).and(takesArgument(0, Object.class)),
39+
packageName + ".RoutingContextJsonResponseAdvice");
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package datadog.trace.instrumentation.vertx_4_0.server;
2+
3+
import static datadog.trace.api.gateway.Events.EVENTS;
4+
5+
import datadog.appsec.api.blocking.BlockingException;
6+
import datadog.trace.advice.ActiveRequestContext;
7+
import datadog.trace.advice.RequiresRequestContext;
8+
import datadog.trace.api.gateway.BlockResponseFunction;
9+
import datadog.trace.api.gateway.CallbackProvider;
10+
import datadog.trace.api.gateway.Flow;
11+
import datadog.trace.api.gateway.RequestContext;
12+
import datadog.trace.api.gateway.RequestContextSlot;
13+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
14+
import io.vertx.core.json.JsonObject;
15+
import java.util.function.BiFunction;
16+
import net.bytebuddy.asm.Advice;
17+
18+
@RequiresRequestContext(RequestContextSlot.APPSEC)
19+
class RoutingContextJsonResponseAdvice {
20+
21+
@Advice.OnMethodEnter(suppress = Throwable.class)
22+
static void before(
23+
@Advice.Argument(0) Object source, @ActiveRequestContext RequestContext reqCtx) {
24+
25+
if (source == null) {
26+
return;
27+
}
28+
29+
Object object = source;
30+
if (object instanceof JsonObject) {
31+
object = ((JsonObject) object).getMap();
32+
}
33+
34+
CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC);
35+
BiFunction<RequestContext, Object, Flow<Void>> callback =
36+
cbp.getCallback(EVENTS.responseBody());
37+
if (callback == null) {
38+
return;
39+
}
40+
41+
Flow<Void> flow = callback.apply(reqCtx, object);
42+
Flow.Action action = flow.getAction();
43+
if (action instanceof Flow.Action.RequestBlockingAction) {
44+
BlockResponseFunction blockResponseFunction = reqCtx.getBlockResponseFunction();
45+
if (blockResponseFunction == null) {
46+
return;
47+
}
48+
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action;
49+
blockResponseFunction.tryCommitBlockingResponse(
50+
reqCtx.getTraceSegment(),
51+
rba.getStatusCode(),
52+
rba.getBlockingContentType(),
53+
rba.getExtraHeaders());
54+
55+
throw new BlockingException("Blocked request (for RoutingContext/json)");
56+
}
57+
}
58+
}

dd-java-agent/instrumentation/vertx-web-4.0/src/test/groovy/server/VertxHttpServerForkedTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
8383
true
8484
}
8585

86+
@Override
87+
boolean testResponseBodyJson() {
88+
true
89+
}
90+
8691
@Override
8792
boolean testBlocking() {
8893
true

dd-java-agent/instrumentation/vertx-web-4.0/src/test/java/server/VertxTestServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ public void start(final Promise<Void> startPromise) {
127127
BODY_JSON,
128128
() -> {
129129
JsonObject json = ctx.getBodyAsJson();
130-
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
130+
ctx.response().setStatusCode(BODY_JSON.getStatus());
131+
ctx.json(json);
131132
}));
132133
router
133134
.route(QUERY_ENCODED_BOTH.getRawPath())

dd-java-agent/instrumentation/vertx-web-5.0/src/test/groovy/server/VertxHttpServerForkedTest.groovy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
6767
true
6868
}
6969

70+
@Override
71+
boolean testResponseBodyJson() {
72+
true
73+
}
74+
7075
@Override
7176
boolean testBodyUrlencoded() {
7277
true

dd-java-agent/instrumentation/vertx-web-5.0/src/test/java/server/VertxTestServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ public void start(final Promise<Void> startPromise) {
118118
BODY_JSON,
119119
() -> {
120120
JsonObject json = ctx.body().asJsonObject();
121-
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
121+
ctx.response().setStatusCode(BODY_JSON.getStatus());
122+
ctx.json(json);
122123
}));
123124
router
124125
.route(QUERY_ENCODED_BOTH.getRawPath())

dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
135135
ss.registerCallback(events.requestBodyStart(), callbacks.requestBodyStartCb)
136136
ss.registerCallback(events.requestBodyDone(), callbacks.requestBodyEndCb)
137137
ss.registerCallback(events.requestBodyProcessed(), callbacks.requestBodyObjectCb)
138+
ss.registerCallback(events.responseBody(), callbacks.responseBodyObjectCb)
138139
ss.registerCallback(events.responseStarted(), callbacks.responseStartedCb)
139140
ss.registerCallback(events.responseHeader(), callbacks.responseHeaderCb)
140141
ss.registerCallback(events.responseHeaderDone(), callbacks.responseHeaderDoneCb)
@@ -335,6 +336,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
335336
false
336337
}
337338

339+
338340
boolean isRequestBodyNoStreaming() {
339341
// if true, plain text request body tests expect the requestBodyProcessed
340342
// callback to tbe called, not requestBodyStart/requestBodyDone
@@ -353,6 +355,10 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
353355
false
354356
}
355357

358+
boolean testResponseBodyJson() {
359+
false
360+
}
361+
356362
boolean testBlocking() {
357363
false
358364
}
@@ -1581,6 +1587,39 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
15811587
true | 'text/html;q=0.8, application/json;q=0.9'
15821588
}
15831589

1590+
void 'test instrumentation gateway json response body'() {
1591+
setup:
1592+
assumeTrue(testResponseBodyJson())
1593+
def request = request(
1594+
BODY_JSON, 'POST',
1595+
RequestBody.create(MediaType.get('application/json'), '{"a": "x"}'))
1596+
.build()
1597+
def response = client.newCall(request).execute()
1598+
if (isDataStreamsEnabled()) {
1599+
TEST_DATA_STREAMS_WRITER.waitForGroups(1)
1600+
}
1601+
1602+
expect:
1603+
response.body().charStream().text == BODY_JSON.body
1604+
1605+
when:
1606+
TEST_WRITER.waitForTraces(1)
1607+
1608+
then:
1609+
TEST_WRITER.get(0).any {
1610+
it.getTag('response.body') == '[a:[x]]'
1611+
}
1612+
1613+
and:
1614+
if (isDataStreamsEnabled()) {
1615+
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
1616+
verifyAll(first) {
1617+
edgeTags.containsAll(DSM_EDGE_TAGS)
1618+
edgeTags.size() == DSM_EDGE_TAGS.size()
1619+
}
1620+
}
1621+
}
1622+
15841623
@Flaky(value = "https://github.com/DataDog/dd-trace-java/issues/4681", suites = ["GrizzlyAsyncTest", "GrizzlyTest"])
15851624
def 'test blocking of request with json response'() {
15861625
setup:
@@ -2450,6 +2489,31 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
24502489
}
24512490
} as BiFunction<RequestContext, Object, Flow<Void>>)
24522491

2492+
final BiFunction<RequestContext, Object, Flow<Void>> responseBodyObjectCb =
2493+
({ RequestContext rqCtxt, Object obj ->
2494+
if (obj instanceof Map) {
2495+
obj = obj.collectEntries {
2496+
[
2497+
it.key,
2498+
(it.value instanceof Iterable || it.value instanceof String[]) ? it.value : [it.value]
2499+
]
2500+
}
2501+
} else if (!(obj instanceof String) && !(obj instanceof List)) {
2502+
obj = obj.properties
2503+
.findAll { it.key != 'class' }
2504+
.collectEntries { [it.key, it.value instanceof Iterable ? it.value : [it.value]] }
2505+
}
2506+
rqCtxt.traceSegment.setTagTop('response.body', obj as String)
2507+
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)
2508+
if (context.responseBlock) {
2509+
new RbaFlow(
2510+
new Flow.Action.RequestBlockingAction(413, BlockingContentType.JSON)
2511+
)
2512+
} else {
2513+
Flow.ResultFlow.empty()
2514+
}
2515+
} as BiFunction<RequestContext, Object, Flow<Void>>)
2516+
24532517
final BiFunction<RequestContext, Integer, Flow<Void>> responseStartedCb =
24542518
({ RequestContext rqCtxt, Integer resultCode ->
24552519
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)

dd-smoke-tests/vertx-4.2/application/src/main/java/datadog/vertx_4_2/MainVerticle.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.vertx.core.VertxOptions;
77
import io.vertx.core.http.HttpServerOptions;
88
import io.vertx.ext.web.Router;
9+
import io.vertx.ext.web.handler.BodyHandler;
910
import java.math.BigInteger;
1011
import java.util.concurrent.ThreadLocalRandom;
1112

@@ -51,6 +52,15 @@ public void start(Promise<Void> startPromise) throws Exception {
5152
.setStatusCode(Integer.parseInt(ctx.request().getParam("status_code")))
5253
.end("EXECUTED"));
5354

55+
router.route("/api_security/response").handler(BodyHandler.create());
56+
router
57+
.route("/api_security/response")
58+
.handler(
59+
ctx -> {
60+
ctx.response().setStatusCode(200);
61+
ctx.json(ctx.getBodyAsJson());
62+
});
63+
5464
vertx
5565
.createHttpServer(new HttpServerOptions().setHandle100ContinueAutomatically(true))
5666
.requestHandler(

0 commit comments

Comments
 (0)