Skip to content

HTTP-122 Retry for source lookup table #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collection;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;

/**
* A client that is used to get enrichment data from external component.
Expand All @@ -15,4 +16,10 @@ public interface PollingClient<T> {
* @return an optional result of data lookup.
*/
Collection<T> pull(RowData lookupRow);

/**
* Initialize the client.
* @param ctx function context
*/
void open(FunctionContext ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import com.getindata.connectors.http.internal.status.HttpResponseChecker;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.metrics.MetricGroup;

import java.io.IOException;
import java.net.http.HttpClient;
Expand All @@ -19,26 +21,38 @@
public class HttpClientWithRetry {

private final HttpClient httpClient;
private final RetryConfig retryConfig;
@Getter
private final HttpResponseChecker responseChecker;
private final Retry retry;

@Builder
HttpClientWithRetry(HttpClient httpClient,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what happens with OIDC, the short lived bearer token may need to be regenerated if the retries occur after the token has expired). Is this regeneration check done for the retries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request is created only once, but OIDC processor (responsible for setting bearer token in request) is called on every retry.

RetryConfig retryConfig,
HttpResponseChecker responseChecker) {
this.httpClient = httpClient;
this.responseChecker = responseChecker;
this.retryConfig = RetryConfig.from(retryConfig)
retryConfig = RetryConfig.from(retryConfig)
.retryExceptions(IOException.class, RetryHttpRequestException.class)
.build();
this.retry = RetryRegistry.ofDefaults().retry("http-lookup-connector", retryConfig);
}

public void registerMetrics(MetricGroup metrics){
var group = metrics.addGroup("http_lookup_connector");
group.gauge("failed_calls_with_retry_attempt_count",
() -> retry.getMetrics().getNumberOfFailedCallsWithRetryAttempt());
group.gauge("failed_calls_without_retry_attempt_count",
() -> retry.getMetrics().getNumberOfFailedCallsWithoutRetryAttempt());
group.gauge("success_calls_with_retry_attempt",
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt());
group.gauge("success_calls_without_retry_attempt",
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt());
}

public <T> HttpResponse<T> send(
Supplier<HttpRequest> requestSupplier,
HttpResponse.BodyHandler<T> responseBodyHandler
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
var retry = Retry.of("http-lookup-connector", retryConfig);
try {
try {
return Retry.decorateCheckedSupplier(retry, () -> task(requestSupplier, responseBodyHandler)).apply();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -12,9 +12,8 @@
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class HttpTableLookupFunction extends LookupFunction {
Expand Down Expand Up @@ -59,6 +58,8 @@ public void open(FunctionContext context) throws Exception {

context.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());

client.open(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.StringUtils;

Expand Down Expand Up @@ -84,6 +85,11 @@ public JavaNetHttpPollingClient(
validateIgnoredResponseCodes(this.httpClient.getResponseChecker());
}

public void open(FunctionContext context) {
httpClient.registerMetrics(context.getMetricGroup());
}


@Override
public Collection<RowData> pull(RowData lookupRow) {
try {
Expand Down
Loading