Skip to content

Alternative API Security sampling algorithm #8961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions benchmark/load/petclinic/benchmark.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
"JAVA_OPTS": "-javaagent:${TRACER} -Ddd.appsec.enabled=true"
}
},
"apisec": {
"env": {
"VARIANT": "apisec",
"JAVA_OPTS": "-javaagent:${TRACER} -Ddd.appsec.enabled=true -Ddd.api-security.enabled=true"
}
},
"iast": {
"env": {
"VARIANT": "iast",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.datadog.appsec;

import com.datadog.appsec.api.security.ApiSecurityProcessor;
import com.datadog.appsec.api.security.ApiSecuritySampler;
import com.datadog.appsec.api.security.ApiSecuritySamplerImpl;
import com.datadog.appsec.api.security.AppSecSpanPostProcessor;
import com.datadog.appsec.blocking.BlockingServiceImpl;
import com.datadog.appsec.config.AppSecConfigService;
import com.datadog.appsec.config.AppSecConfigServiceImpl;
import com.datadog.appsec.config.TraceSegmentPostProcessor;
import com.datadog.appsec.ddwaf.WAFModule;
import com.datadog.appsec.event.EventDispatcher;
import com.datadog.appsec.event.ReplaceableEventProducerService;
Expand All @@ -23,7 +23,6 @@
import datadog.trace.api.telemetry.ProductChange;
import datadog.trace.api.telemetry.ProductChangeCollector;
import datadog.trace.bootstrap.ActiveSubsystems;
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -69,18 +68,6 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
EventDispatcher eventDispatcher = new EventDispatcher();
REPLACEABLE_EVENT_PRODUCER.replaceEventProducerService(eventDispatcher);

ApiSecuritySampler requestSampler;
if (Config.get().isApiSecurityEnabled()) {
requestSampler = new ApiSecuritySamplerImpl();
// When DD_API_SECURITY_ENABLED=true, ths post-processor is set even when AppSec is inactive.
// This should be low overhead since the post-processor exits early if there's no AppSec
// context.
SpanPostProcessor.Holder.INSTANCE =
new AppSecSpanPostProcessor(requestSampler, REPLACEABLE_EVENT_PRODUCER);
} else {
requestSampler = new ApiSecuritySampler.NoOp();
}

ConfigurationPoller configurationPoller = sco.configurationPoller(config);
// may throw and abort startup
APP_SEC_CONFIG_SERVICE =
Expand All @@ -90,11 +77,15 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s

sco.createRemaining(config);

TraceSegmentPostProcessor apiSecurityPostProcessor =
Config.get().isApiSecurityEnabled()
? new ApiSecurityProcessor(new ApiSecuritySampler(), REPLACEABLE_EVENT_PRODUCER)
: null;
GatewayBridge gatewayBridge =
new GatewayBridge(
gw,
REPLACEABLE_EVENT_PRODUCER,
requestSampler,
apiSecurityPostProcessor,
APP_SEC_CONFIG_SERVICE.getTraceSegmentPostProcessors());

loadModules(eventDispatcher, sco.monitoring);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.datadog.appsec.api.security;

import com.datadog.appsec.config.TraceSegmentPostProcessor;
import com.datadog.appsec.event.EventProducerService;
import com.datadog.appsec.event.ExpiredSubscriberInfoException;
import com.datadog.appsec.event.data.DataBundle;
import com.datadog.appsec.event.data.KnownAddresses;
import com.datadog.appsec.event.data.SingletonDataBundle;
import com.datadog.appsec.gateway.AppSecRequestContext;
import com.datadog.appsec.gateway.GatewayContext;
import com.datadog.appsec.report.AppSecEvent;
import datadog.trace.api.Config;
import datadog.trace.api.ProductTraceSource;
import datadog.trace.api.internal.TraceSegment;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApiSecurityProcessor implements TraceSegmentPostProcessor {

private static final Logger log = LoggerFactory.getLogger(ApiSecurityProcessor.class);
private final ApiSecuritySampler sampler;
private final EventProducerService producerService;

public ApiSecurityProcessor(ApiSecuritySampler sampler, EventProducerService producerService) {
this.sampler = sampler;
this.producerService = producerService;
}

@Override
public void processTraceSegment(
TraceSegment segment, AppSecRequestContext ctx, Collection<AppSecEvent> collectedEvents) {
if (segment == null || ctx == null) {
return;
}
if (!sampler.sample(ctx)) {
log.debug("Request not sampled, skipping API security post-processing");
return;
}
log.debug("Request sampled, processing API security post-processing");
extractSchemas(ctx, segment);
}

private void extractSchemas(
final @Nonnull AppSecRequestContext ctx, final @Nonnull TraceSegment traceSegment) {
final EventProducerService.DataSubscriberInfo sub =
producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR);
if (sub == null || sub.isEmpty()) {
log.debug("No subscribers for schema extraction");
return;
}

final DataBundle bundle =
new SingletonDataBundle<>(
KnownAddresses.WAF_CONTEXT_PROCESSOR, Collections.singletonMap("extract-schema", true));
try {
GatewayContext gwCtx = new GatewayContext(false);
producerService.publishDataEvent(sub, ctx, bundle, gwCtx);
// TODO: Perhaps do this if schemas have actually been extracted (check when committing
// derivatives)
traceSegment.setTagTop(Tags.ASM_KEEP, true);
if (!Config.get().isApmTracingEnabled()) {
traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM);
}
} catch (ExpiredSubscriberInfoException e) {
log.debug("Subscriber info expired", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,218 @@
package com.datadog.appsec.api.security;

import com.datadog.appsec.gateway.AppSecRequestContext;
import javax.annotation.Nonnull;
import datadog.trace.util.AgentTaskScheduler;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public interface ApiSecuritySampler {
/**
* Prepare a request context for later sampling decision. This method should be called at request
* end, and is thread-safe. If a request can potentially be sampled, this method will return true.
* If this method returns true, the caller MUST call {@link #releaseOne()} once the context is not
* needed anymore.
*/
boolean preSampleRequest(final @Nonnull AppSecRequestContext ctx);
/**
* Internal map for API Security sampling. See "[RFC-1021] API Security Sampling Algorithm for
* thread-based concurrency".
*/
public class ApiSecuritySampler {

/** Get the final sampling decision. This method is NOT required to be thread-safe. */
boolean sampleRequest(AppSecRequestContext ctx);
private static final int DEFAULT_MAX_ITEM_COUNT = 4096;
private static final int DEFAULT_INTERVAL_SECONDS = 30;

/** Release one permit for the sampler. This must be called after processing a span. */
void releaseOne();
private final MonotonicClock clock;
private final Executor executor;
private final int intervalSeconds;
private final AtomicReference<Table> table;
private final AtomicBoolean rebuild = new AtomicBoolean(false);
private final long zero;
private final long maxItemCount;

final class NoOp implements ApiSecuritySampler {
@Override
public boolean preSampleRequest(@Nonnull AppSecRequestContext ctx) {
public ApiSecuritySampler() {
this(
DEFAULT_MAX_ITEM_COUNT,
DEFAULT_INTERVAL_SECONDS,
new Random().nextLong(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Vulnerability

Use of insecure random values (...read more)

Functions as Math.random() and objects like java.util.Random() do not provide strong enough randomness. Consider using java.security.SecureRandom() instead.

View in Datadog  Leave us feedback  Documentation

new DefaultMonotonicClock(),
AgentTaskScheduler.INSTANCE);
}

public ApiSecuritySampler(
final int maxItemCount,
final int intervalSeconds,
final long zero,
final MonotonicClock clock,
Executor executor) {
table = new AtomicReference<>(new Table(maxItemCount));
this.maxItemCount = maxItemCount;
this.intervalSeconds = intervalSeconds;
this.zero = zero;
this.clock = clock != null ? clock : new DefaultMonotonicClock();
this.executor = executor != null ? executor : AgentTaskScheduler.INSTANCE;
}

public boolean sample(AppSecRequestContext ctx) {
final String route = ctx.getRoute();
if (route == null) {
return false;
}

@Override
public boolean sampleRequest(AppSecRequestContext ctx) {
final String method = ctx.getMethod();
if (method == null) {
return false;
}
final int statusCode = ctx.getResponseStatus();
if (statusCode <= 0) {
return false;
}
final long hash = computeApiHash(route, method, statusCode);
return sample(hash);
}

public boolean sample(long key) {
if (key == 0L) {
key = zero;
}
final int now = clock.now();
final Table table = this.table.get();
Table.FindSlotResult findSlotResult;
while (true) {
findSlotResult = table.findSlot(key);
if (!findSlotResult.exists) {
final int newCount = table.count.incrementAndGet();
if (newCount > maxItemCount && rebuild.compareAndSet(false, true)) {
runRebuild();
}
if (newCount > maxItemCount * 2) {
table.count.decrementAndGet();
return false;
}
if (!findSlotResult.entry.key.compareAndSet(0, key)) {
if (findSlotResult.entry.key.get() == key) {
// Another thread just added this entry
return false;
}
// This entry was just claimed for another key, try another slot.
table.count.decrementAndGet();
continue;
}
final long newEntryData = buildDataEntry(now, now);
if (findSlotResult.entry.data.compareAndSet(0, newEntryData)) {
return true;
} else {
return false;
}
}
break;
}
long curData = findSlotResult.entry.data.get();
final int stime = getStime(curData);
final int deadline = now - intervalSeconds;
if (stime <= deadline) {
final long newData = buildDataEntry(now, now);
while (!findSlotResult.entry.data.compareAndSet(curData, newData)) {
curData = findSlotResult.entry.data.get();
if (getStime(curData) == getAtime(curData)) {
// Another thread just issued a keep decision
return false;
}
if (getStime(curData) > now) {
// Another thread is in our fugure, but did not issue a keep decision.
return true;
}
}
return true;
}
final long newData = buildDataEntry(getStime(curData), now);
while (getAtime(curData) < now) {
if (!findSlotResult.entry.data.compareAndSet(curData, newData)) {
curData = findSlotResult.entry.data.get();
}
}
return false;
}

private void runRebuild() {
// TODO
}

private static class Table {
private final Entry[] table;
private final AtomicInteger count = new AtomicInteger(0);
private final int maxItemCount;

public Table(int maxItemCount) {
this.maxItemCount = maxItemCount;
final int size = 2 * maxItemCount + 1;
table = new Entry[size];
for (int i = 0; i < size; i++) {
table[i] = new Entry();
}
}

public FindSlotResult findSlot(final long key) {
final int startIndex = (int) (key % (2L * maxItemCount));
int index = startIndex;
do {
final Entry slot = table[index];
final long slotKey = slot.key.get();
if (slotKey == key) {
return new FindSlotResult(slot, true);
} else if (slotKey == 0L) {
return new FindSlotResult(slot, false);
}
index++;
if (index >= table.length) {
index = 0;
}
} while (index != startIndex);
return new FindSlotResult(table[(int) (maxItemCount * 2)], false);
}

static class FindSlotResult {
public final Entry entry;
public final boolean exists;

public FindSlotResult(final Entry entry, final boolean exists) {
this.entry = entry;
this.exists = exists;
}
}

static class Entry {
private final AtomicLong key = new AtomicLong(0L);
private final AtomicLong data = new AtomicLong(0L);
}
}

interface MonotonicClock {
int now();
}

static class DefaultMonotonicClock implements MonotonicClock {
@Override
public void releaseOne() {}
public int now() {
return (int) (System.nanoTime() / 1_000_000);
}
}

long buildDataEntry(final int stime, final int atime) {
long result = stime;
result <<= 32;
result |= atime & 0xFFFFFFFFL;
return result;
}

int getStime(final long data) {
return (int) (data >> 32);
}

int getAtime(final long data) {
return (int) (data & 0xFFFFFFFFL);
}

private long computeApiHash(final String route, final String method, final int statusCode) {
long result = 17;
result = 31 * result + route.hashCode();
result = 31 * result + method.hashCode();
result = 31 * result + statusCode;
return result;
}
}
Loading
Loading