Skip to content

HTTP-122 Retry for source lookup table #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bin
/src/test/test.iml
/flink-http-connector.iml
/dependency-reduced-pom.xml
/.java-version
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

- Added support for auto-retry for source table. Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normally one pr would be one line here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, set as one item with subitems

- Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` are replaced by `gid.connector.http.source.lookup.ignored-response-codes`.
- Added connection timeout for source table - `gid.connector.http.source.lookup.connection.timeout`.

## [0.19.0] - 2025-03-20

- OIDC token request to not flow during explain
Expand Down Expand Up @@ -227,9 +231,7 @@

- Implement basic support for Http connector for Flink SQL

[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.19.0...HEAD

[0.19.0]: https://github.com/getindata/flink-http-connector/compare/0.18.0...0.19.0
[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.18.0...HEAD

[0.18.0]: https://github.com/getindata/flink-http-connector/compare/0.17.0...0.18.0

Expand Down
134 changes: 100 additions & 34 deletions README.md

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions dev/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
</module>

<module name="LineLength">
<property name="max" value="100"/>
<property name="max" value="120"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>

Expand Down Expand Up @@ -163,11 +163,11 @@
import com.getindata.connectors.*
import com.getindata.connectors.internal.*
-->
<module name="ImportOrder">
<property name="separated" value="true"/>
<property name="ordered" value="true"/>
<property name="groups" value="java.,javax.,scala,*,com.getindata.connectors,com.getindata.connectors.internal"/>
</module>
<!-- <module name="ImportOrder">-->
<!-- <property name="separated" value="true"/>-->
<!-- <property name="ordered" value="true"/>-->
<!-- <property name="groups" value="java.,javax.,scala,*,com.getindata.connectors,com.getindata.connectors.internal"/>-->
<!-- </module>-->

<!--
As per https://checkstyle.sourceforge.io/config_imports.html, "There is no flexibility to
Expand Down
33 changes: 16 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ under the License.
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.22</lombok.version>
<jackson.version>2.18.1</jackson.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.10.1</junit5.version>
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
<assertj.core.version>3.21.0</assertj.core.version>
Expand All @@ -87,6 +86,8 @@ under the License.
<jacoco.plugin.version>0.8.12</jacoco.plugin.version>
<maven.shade.plugin.version>3.1.1</maven.shade.plugin.version>
<mockito-inline.version>4.6.1</mockito-inline.version>
<resilence4j.version>1.7.1</resilence4j.version>
<slf4j.version>2.0.17</slf4j.version>
</properties>

<repositories>
Expand Down Expand Up @@ -119,25 +120,17 @@ under the License.
<scope>provided</scope>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have the logging change in a separate PR - it is easier to track the history then please

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change is needed anyways?
Its because of resilence4j?

One of the "rule of thumbs" when we were starting this connector was to try not add any external libraries to the connector, that my or may not clash with any user code -> i.e that is why we use Java's 11 http client.

You need the resilence4j for retry functionality right?
Which in essence is -> schedule a task on Java's scheduled thread executor and make sure to do a good job around error/exception handling.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dedicated lib for retries might be an overkill now, but I think we can benefit in long term. The library provides Rate Limiter or Circuit Breaker. Both features might be worth adding. Or at least Rate Limiter.

Copy link
Contributor Author

@maciejmaciejko-gid maciejmaciejko-gid Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Why this change is needed anyways?
Its because of resilence4j?"

Yes, I had to change to compile project with resilence4j. Notice that Flink use the same API:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/

"You need the resilence4j for retry functionality right?"
I thought it's better to use mature library instead of reimplementing it. From the other side,as you said, it's additional dependency. Below part resilience4j dependencies based on mvn dependency:tree:

[INFO] +- io.github.resilience4j:resilience4j-retry:jar:1.7.1:compile
[INFO] |  +- io.vavr:vavr:jar:0.10.2:compile
[INFO] |  |  \- io.vavr:vavr-match:jar:0.10.2:compile
[INFO] |  \- io.github.resilience4j:resilience4j-core:jar:1.7.1:compile

Do you think it's ok to add them? Another option is to shadow them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Logging lib replacement is a consequence of introducing resilence4j, so I think we should keep this replacement as a part of this PR.

  2. I understand Krzysztof's approach to avoid adding dependencies for simple features that can be easily implemented on our own. But in a long term I'm planning to add rate limiter (and possibly circuit breaker), so I think adding resilence4j is acceptable.

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -167,6 +160,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>${resilence4j.version}</version>
</dependency>

<!--TEST-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.getindata.connectors.http;

import lombok.Getter;

import java.net.http.HttpResponse;

@Getter
public class HttpStatusCodeValidationFailedException extends Exception {
private final HttpResponse<?> response;

public HttpStatusCodeValidationFailedException(String message, HttpResponse<?> response) {
super(message);
this.response = response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collection;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;

/**
* A client that is used to get enrichment data from external component.
Expand All @@ -15,4 +16,10 @@ public interface PollingClient<T> {
* @return an optional result of data lookup.
*/
Collection<T> pull(RowData lookupRow);

/**
* Initialize the client.
* @param ctx function context
*/
void open(FunctionContext ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupConfig;
import org.apache.flink.util.ConfigurationException;

public interface PollingClientFactory<OUT> extends Serializable {

PollingClient<OUT> createPollClient(
HttpLookupConfig options,
DeserializationSchema<OUT> schemaDecoder
);
) throws ConfigurationException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ public final class HttpConnectorConfigConstants {
* A property prefix for http connector.
*/
public static final String GID_CONNECTOR_HTTP = "gid.connector.http.";
private static final String SOURCE_LOOKUP_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.";

/**
* A property prefix for http connector header properties
*/
public static final String SINK_HEADER_PREFIX = GID_CONNECTOR_HTTP + "sink.header.";

public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
+ "source.lookup.header.";
public static final String LOOKUP_SOURCE_HEADER_PREFIX = SOURCE_LOOKUP_PREFIX + "header.";

public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ "security.oidc.token.request";
Expand All @@ -40,33 +40,24 @@ public final class HttpConnectorConfigConstants {
* the special treatment of the header for Basic Authentication, thus preserving the passed
* raw value. Defaults to false.
*/
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
+ "source.lookup.use-raw-authorization-header";
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = SOURCE_LOOKUP_PREFIX + "use-raw-authorization-header";

public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
+ "source.lookup.result-type";
public static final String RESULT_TYPE = SOURCE_LOOKUP_PREFIX + "result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST = GID_CONNECTOR_HTTP + "sink.error.code.exclude";

public static final String HTTP_ERROR_SINK_CODES_LIST = GID_CONNECTOR_HTTP + "sink.error.code";

public static final String HTTP_ERROR_SOURCE_LOOKUP_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code.exclude";

public static final String HTTP_ERROR_SOURCE_LOOKUP_CODES_LIST =
GID_CONNECTOR_HTTP + "source.lookup.error.code";
// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "source.lookup.request-callback";
SOURCE_LOOKUP_PREFIX + "request-callback";

public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "sink.request-callback";

public static final String SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER =
GID_CONNECTOR_HTTP + "source.lookup.query-creator";
SOURCE_LOOKUP_PREFIX + "query-creator";

// -------------- HTTPS security settings --------------
public static final String ALLOW_SELF_SIGNED =
Expand All @@ -92,16 +83,19 @@ public final class HttpConnectorConfigConstants {
// ------ HTTPS timeouts and thread pool settings ------

public static final String LOOKUP_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "source.lookup.request.timeout";
SOURCE_LOOKUP_PREFIX + "request.timeout";

public static final String SOURCE_CONNECTION_TIMEOUT =
SOURCE_LOOKUP_PREFIX + "connection.timeout";

public static final String SINK_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "sink.request.timeout";

public static final String LOOKUP_HTTP_PULING_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "source.lookup.request.thread-pool.size";
SOURCE_LOOKUP_PREFIX + "request.thread-pool.size";

public static final String LOOKUP_HTTP_RESPONSE_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "source.lookup.response.thread-pool.size";
SOURCE_LOOKUP_PREFIX + "response.thread-pool.size";

public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
Expand All @@ -117,4 +111,21 @@ public final class HttpConnectorConfigConstants {
GID_CONNECTOR_HTTP + "sink.request.batch.size";

// ---------------------------------------------
public static final String SOURCE_RETRY_SUCCESS_CODES = SOURCE_LOOKUP_PREFIX + "success-codes";
public static final String SOURCE_RETRY_RETRY_CODES = SOURCE_LOOKUP_PREFIX + "retry-codes";
public static final String SOURCE_IGNORE_RESPONSE_CODES = SOURCE_LOOKUP_PREFIX + "ignored-response-codes";

public static final String SOURCE_RETRY_STRATEGY_PREFIX = SOURCE_LOOKUP_PREFIX + "retry-strategy.";
public static final String SOURCE_RETRY_STRATEGY_TYPE = SOURCE_RETRY_STRATEGY_PREFIX + "type";

private static final String SOURCE_RETRY_FIXED_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "fixed-delay.";
public static final String SOURCE_RETRY_FIXED_DELAY_DELAY = SOURCE_RETRY_FIXED_DELAY_PREFIX + "delay";

private static final String SOURCE_RETRY_EXP_DELAY_PREFIX = SOURCE_RETRY_STRATEGY_PREFIX + "exponential-delay.";
public static final String SOURCE_RETRY_EXP_DELAY_INITIAL_BACKOFF =
SOURCE_RETRY_EXP_DELAY_PREFIX + "initial-backoff";
public static final String SOURCE_RETRY_EXP_DELAY_MAX_BACKOFF =
SOURCE_RETRY_EXP_DELAY_PREFIX + "max-backoff";
public static final String SOURCE_RETRY_EXP_DELAY_MULTIPLIER =
SOURCE_RETRY_EXP_DELAY_PREFIX + "backoff-multiplier";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.getindata.connectors.http.internal.retry;

import com.getindata.connectors.http.HttpStatusCodeValidationFailedException;
import com.getindata.connectors.http.internal.status.HttpResponseChecker;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.metrics.MetricGroup;

import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.function.Supplier;

@Slf4j
public class HttpClientWithRetry {

private final HttpClient httpClient;
@Getter
private final HttpResponseChecker responseChecker;
private final Retry retry;

@Builder
HttpClientWithRetry(HttpClient httpClient,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering what happens with OIDC, the short lived bearer token may need to be regenerated if the retries occur after the token has expired). Is this regeneration check done for the retries?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request is created only once, but OIDC processor (responsible for setting bearer token in request) is called on every retry.

RetryConfig retryConfig,
HttpResponseChecker responseChecker) {
this.httpClient = httpClient;
this.responseChecker = responseChecker;
retryConfig = RetryConfig.from(retryConfig)
.retryExceptions(IOException.class, RetryHttpRequestException.class)
.build();
this.retry = RetryRegistry.ofDefaults().retry("http-lookup-connector", retryConfig);
}

public void registerMetrics(MetricGroup metrics){
var group = metrics.addGroup("http_lookup_connector");
group.gauge("successfulCallsWithRetryAttempt",
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt());
group.gauge("successfulCallsWithoutRetryAttempt",
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt());
}

public <T> HttpResponse<T> send(
Supplier<HttpRequest> requestSupplier,
HttpResponse.BodyHandler<T> responseBodyHandler
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException {
try {
try {
return Retry.decorateCheckedSupplier(retry, () -> task(requestSupplier, responseBodyHandler)).apply();
} catch (RetryHttpRequestException retryException) {
throw retryException.getCausedBy();
}
} catch (IOException | InterruptedException | HttpStatusCodeValidationFailedException e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why IOException and InterruptedException are special here?
Why you need then on the send signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are checked exceptions from HttpClient.send method. I don't want to repack them.

throw e;
} catch (Throwable t) {
throw new RuntimeException("Unexpected exception", t);
}
}

private <T> HttpResponse<T> task(
Supplier<HttpRequest> requestSupplier,
HttpResponse.BodyHandler<T> responseBodyHandler
) throws IOException, InterruptedException, RetryHttpRequestException, HttpStatusCodeValidationFailedException {
var request = requestSupplier.get();
var response = httpClient.send(request, responseBodyHandler);
if (responseChecker.isSuccessful(response)) {
return response;
}
var validationFailedException = new HttpStatusCodeValidationFailedException(
"Incorrect response code: " + response.statusCode(), response);
if (responseChecker.isTemporalError(response)) {
log.debug("Retrying... Received response with code {} for request {}", response.statusCode(), request);
throw new RetryHttpRequestException(validationFailedException);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need an exception to communicate a retry is needed?
I would say ok, if that would be an exception from Java's client but here you throwing your own exceptuion.

I'm really not a fan of "communication via exceptions". Exception are very costly... plus this looks not right to me.
Better would be some status object, flag etc.

Similar thing was discussed here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. You're right. It's better to retry based on response. I fixed the code.

}
throw validationFailedException;
}
}

@Getter
@RequiredArgsConstructor
class RetryHttpRequestException extends Exception {
private final HttpStatusCodeValidationFailedException causedBy;
}
Loading
Loading