Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 88 additions & 17 deletions src/main/java/org/prebid/server/cache/CoreCacheService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.prebid.server.auction.model.AuctionContext;
import org.prebid.server.auction.model.BidInfo;
import org.prebid.server.auction.model.CachedDebugLog;
Expand All @@ -22,6 +23,7 @@
import org.prebid.server.cache.model.DebugHttpCall;
import org.prebid.server.cache.proto.request.bid.BidCacheRequest;
import org.prebid.server.cache.proto.request.bid.BidPutObject;
import org.prebid.server.cache.proto.response.CacheErrorResponse;
import org.prebid.server.cache.proto.response.bid.BidCacheResponse;
import org.prebid.server.cache.proto.response.bid.CacheObject;
import org.prebid.server.cache.utils.CacheServiceUtil;
Expand All @@ -45,6 +47,7 @@
import org.prebid.server.vertx.httpclient.HttpClient;
import org.prebid.server.vertx.httpclient.model.HttpClientResponse;

import java.net.URISyntaxException;
import java.net.URL;
import java.time.Clock;
import java.util.ArrayList;
Expand All @@ -66,6 +69,8 @@ public class CoreCacheService {
private static final String BID_WURL_ATTRIBUTE = "wurl";
private static final String TRACE_INFO_SEPARATOR = "-";
private static final int MAX_DATACENTER_REGION_LENGTH = 4;
private static final String UUID_QUERY_PARAMETER = "uuid";
private static final String CH_QUERY_PARAMETER = "ch";

private final HttpClient httpClient;
private final URL externalEndpointUrl;
Expand Down Expand Up @@ -186,29 +191,32 @@ private Future<BidCacheResponse> makeRequest(BidCacheRequest bidCacheRequest,
cacheHeaders,
mapper.encodeToString(bidCacheRequest),
remainingTimeout)
.map(response -> toBidCacheResponse(
.map(response -> processVtrackWriteCacheResponse(
response.getStatusCode(), response.getBody(), bidCount, accountId, startTime))
.recover(exception -> failResponse(exception, accountId, startTime));
.recover(exception -> failVtrackCacheWriteResponse(exception, accountId, startTime));
}

private Future<BidCacheResponse> failResponse(Throwable exception, String accountId, long startTime) {
metrics.updateCacheRequestFailedTime(accountId, clock.millis() - startTime);
private BidCacheResponse processVtrackWriteCacheResponse(int statusCode,
String responseBody,
int bidCount,
String accountId,
long startTime) {

logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

return Future.failedFuture(exception);
final BidCacheResponse bidCacheResponse = toBidCacheResponse(statusCode, responseBody, bidCount);
metrics.updateVtrackCacheWriteRequestTime(accountId, clock.millis() - startTime, MetricName.ok);
return bidCacheResponse;
}

public Future<BidCacheResponse> cachePutObjects(List<BidPutObject> bidPutObjects,
Boolean isEventsEnabled,
Set<String> biddersAllowingVastUpdate,
String accountId,
Integer accountTtl,
String integration,
Timeout timeout) {

final List<CachedCreative> cachedCreatives =
updatePutObjects(bidPutObjects, isEventsEnabled, biddersAllowingVastUpdate, accountId, integration);
final List<CachedCreative> cachedCreatives = updatePutObjects(
bidPutObjects, isEventsEnabled, biddersAllowingVastUpdate, accountId, accountTtl, integration);

updateCreativeMetrics(accountId, cachedCreatives);

Expand All @@ -219,6 +227,7 @@ private List<CachedCreative> updatePutObjects(List<BidPutObject> bidPutObjects,
Boolean isEventsEnabled,
Set<String> allowedBidders,
String accountId,
Integer accountTtl,
String integration) {

return bidPutObjects.stream()
Expand All @@ -233,6 +242,7 @@ private List<CachedCreative> updatePutObjects(List<BidPutObject> bidPutObjects,
putObject,
accountId,
integration))
.ttlseconds(ObjectUtils.min(putObject.getTtlseconds(), accountTtl))
.build())
.map(payload -> CachedCreative.of(payload, creativeSizeFromTextNode(payload.getValue())))
.toList();
Expand Down Expand Up @@ -343,8 +353,8 @@ private CacheServiceResult processResponseOpenrtb(HttpClientResponse response,
externalEndpointUrl.toString(), httpRequest, httpResponse, startTime);
final BidCacheResponse bidCacheResponse;
try {
bidCacheResponse = toBidCacheResponse(
responseStatusCode, response.getBody(), bidCount, accountId, startTime);
bidCacheResponse = toBidCacheResponse(responseStatusCode, response.getBody(), bidCount);
metrics.updateAuctionCacheRequestTime(accountId, clock.millis() - startTime, MetricName.ok);
} catch (PreBidException e) {
return CacheServiceResult.of(httpCall, e, Collections.emptyMap());
}
Expand All @@ -361,7 +371,7 @@ private CacheServiceResult failResponseOpenrtb(Throwable exception,
logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

metrics.updateCacheRequestFailedTime(accountId, clock.millis() - startTime);
metrics.updateAuctionCacheRequestTime(accountId, clock.millis() - startTime, MetricName.err);

final DebugHttpCall httpCall = makeDebugHttpCall(externalEndpointUrl.toString(), request, null, startTime);
return CacheServiceResult.of(httpCall, exception, Collections.emptyMap());
Expand Down Expand Up @@ -460,9 +470,7 @@ private String generateWinUrl(String bidId,

private BidCacheResponse toBidCacheResponse(int statusCode,
String responseBody,
int bidCount,
String accountId,
long startTime) {
int bidCount) {

if (statusCode != 200) {
throw new PreBidException("HTTP status code " + statusCode);
Expand All @@ -480,7 +488,6 @@ private BidCacheResponse toBidCacheResponse(int statusCode,
throw new PreBidException("The number of response cache objects doesn't match with bids");
}

metrics.updateCacheRequestSuccessTime(accountId, clock.millis() - startTime);
return bidCacheResponse;
}

Expand Down Expand Up @@ -627,4 +634,68 @@ private static String normalizeDatacenterRegion(String datacenterRegion) {
? trimmedDatacenterRegion.substring(0, MAX_DATACENTER_REGION_LENGTH)
: trimmedDatacenterRegion;
}

public Future<HttpClientResponse> getCachedObject(String key, String ch, Timeout timeout) {
final long remainingTimeout = timeout.remaining();
if (remainingTimeout <= 0) {
return Future.failedFuture(new TimeoutException("Timeout has been exceeded"));
}

final URL endpointUrl = ObjectUtils.firstNonNull(internalEndpointUrl, externalEndpointUrl);
final String url;
try {
final URIBuilder uriBuilder = new URIBuilder(endpointUrl.toString());
uriBuilder.addParameter(UUID_QUERY_PARAMETER, key);
if (StringUtils.isNotBlank(ch)) {
uriBuilder.addParameter(CH_QUERY_PARAMETER, ch);
}
url = uriBuilder.build().toString();
} catch (URISyntaxException e) {
return Future.failedFuture(new IllegalArgumentException("Configured cache url is malformed", e));
}

final long startTime = clock.millis();
return httpClient.get(url, cacheHeaders, remainingTimeout)
.map(response -> processVtrackReadResponse(response, startTime))
.recover(exception -> failVtrackCacheReadResponse(exception, startTime));
}

private HttpClientResponse processVtrackReadResponse(HttpClientResponse response, long startTime) {
final int statusCode = response.getStatusCode();
final String body = response.getBody();

if (statusCode == 200) {
metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.ok);
return response;
}

try {
final CacheErrorResponse errorResponse = mapper.decodeValue(body, CacheErrorResponse.class);
metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.err);
return HttpClientResponse.of(statusCode, response.getHeaders(), errorResponse.getMessage());
} catch (DecodeException e) {
throw new PreBidException("Cannot parse response: " + body, e);
}
}

private <T> Future<T> failVtrackCacheWriteResponse(Throwable exception, String accountId, long startTime) {
if (exception instanceof PreBidException) {
metrics.updateVtrackCacheWriteRequestTime(accountId, clock.millis() - startTime, MetricName.err);
}
return failResponse(exception);
}

private <T> Future<T> failVtrackCacheReadResponse(Throwable exception, long startTime) {
if (exception instanceof PreBidException) {
metrics.updateVtrackCacheReadRequestTime(clock.millis() - startTime, MetricName.err);
}
return failResponse(exception);
}

private static <T> Future<T> failResponse(Throwable exception) {
logger.warn("Error occurred while interacting with cache service: {}", exception.getMessage());
logger.debug("Error occurred while interacting with cache service", exception);

return Future.failedFuture(exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.prebid.server.cache.proto.response;

import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class CacheErrorResponse {

String error;

Integer status;

String path;

String message;

Long timestamp;
}
105 changes: 105 additions & 0 deletions src/main/java/org/prebid/server/handler/GetVtrackHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.prebid.server.handler;

import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AsyncResult;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.RoutingContext;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.cache.CoreCacheService;
import org.prebid.server.execution.timeout.Timeout;
import org.prebid.server.execution.timeout.TimeoutFactory;
import org.prebid.server.log.Logger;
import org.prebid.server.log.LoggerFactory;
import org.prebid.server.model.Endpoint;
import org.prebid.server.util.HttpUtil;
import org.prebid.server.vertx.httpclient.model.HttpClientResponse;
import org.prebid.server.vertx.verticles.server.HttpEndpoint;
import org.prebid.server.vertx.verticles.server.application.ApplicationResource;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class GetVtrackHandler implements ApplicationResource {

private static final Logger logger = LoggerFactory.getLogger(GetVtrackHandler.class);

private static final String UUID_PARAMETER = "uuid";
private static final String CH_PARAMETER = "ch";

private final long defaultTimeout;
private final CoreCacheService coreCacheService;
private final TimeoutFactory timeoutFactory;

public GetVtrackHandler(long defaultTimeout, CoreCacheService coreCacheService, TimeoutFactory timeoutFactory) {
this.defaultTimeout = defaultTimeout;
this.coreCacheService = Objects.requireNonNull(coreCacheService);
this.timeoutFactory = Objects.requireNonNull(timeoutFactory);
}

@Override
public List<HttpEndpoint> endpoints() {
return Collections.singletonList(HttpEndpoint.of(HttpMethod.GET, Endpoint.vtrack.value()));
}

@Override
public void handle(RoutingContext routingContext) {
final String uuid = routingContext.request().getParam(UUID_PARAMETER);
final String ch = routingContext.request().getParam(CH_PARAMETER);
if (StringUtils.isBlank(uuid)) {
respondWith(
routingContext,
HttpResponseStatus.BAD_REQUEST,
"'%s' is a required query parameter and can't be empty".formatted(UUID_PARAMETER));
return;
}

final Timeout timeout = timeoutFactory.create(defaultTimeout);

coreCacheService.getCachedObject(uuid, ch, timeout)
.onComplete(asyncCache -> handleCacheResult(asyncCache, routingContext));
}

private static void respondWithServerError(RoutingContext routingContext, Throwable exception) {
logger.error("Error occurred while sending request to cache", exception);
respondWith(routingContext, HttpResponseStatus.INTERNAL_SERVER_ERROR,
"%s: %s".formatted("Error occurred while sending request to cache", exception.getMessage()));
}

private static void respondWith(RoutingContext routingContext,
HttpResponseStatus status,
MultiMap headers,
String body) {

HttpUtil.executeSafely(routingContext, Endpoint.vtrack,
response -> {
headers.forEach(response::putHeader);
response.setStatusCode(status.code())
.end(body);
});
}

private static void respondWith(RoutingContext routingContext, HttpResponseStatus status, String body) {
HttpUtil.executeSafely(routingContext, Endpoint.vtrack,
response -> response
.putHeader(HttpUtil.CONTENT_TYPE_HEADER, HttpHeaderValues.APPLICATION_JSON)
.setStatusCode(status.code())
.end(body));
}

private void handleCacheResult(AsyncResult<HttpClientResponse> async, RoutingContext routingContext) {
if (async.failed()) {
respondWithServerError(routingContext, async.cause());
} else {
final HttpClientResponse response = async.result();
final HttpResponseStatus status = HttpResponseStatus.valueOf(response.getStatusCode());
if (status == HttpResponseStatus.OK) {
respondWith(routingContext, status, response.getHeaders(), response.getBody());
} else {
respondWith(routingContext, status, response.getBody());
}
}
}
}
Loading