-
Notifications
You must be signed in to change notification settings - Fork 50
HTTP-122 Retry for source lookup table #148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
610d4b9
16ffc35
a547bde
e0c7bcc
b67567a
680723b
9485f27
853d23d
0c5097b
75eaa2d
65de8b8
9753dea
c3ed544
0a56ba0
3cf56da
b08575d
e497587
dfb1133
d9c4257
195f4c3
95aa14e
72dc8ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,3 +13,4 @@ bin | |
/src/test/test.iml | ||
/flink-http-connector.iml | ||
/dependency-reduced-pom.xml | ||
/.java-version |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,10 @@ | |
|
||
## [Unreleased] | ||
|
||
- Added support for auto-retry for source table. Auto retry on IOException and user-defined http codes - parameter `gid.connector.http.source.lookup.retry-codes`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. normally one pr would be one line here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, set as one item with subitems |
||
- Parameters `gid.connector.http.source.lookup.error.code.exclude"` and `gid.connector.http.source.lookup.error.code` are replaced by `gid.connector.http.source.lookup.ignored-response-codes`. | ||
- Added connection timeout for source table - `gid.connector.http.source.lookup.connection.timeout`. | ||
|
||
## [0.19.0] - 2025-03-20 | ||
|
||
- OIDC token request to not flow during explain | ||
|
@@ -227,9 +231,7 @@ | |
|
||
- Implement basic support for Http connector for Flink SQL | ||
|
||
[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.19.0...HEAD | ||
|
||
[0.19.0]: https://github.com/getindata/flink-http-connector/compare/0.18.0...0.19.0 | ||
[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.18.0...HEAD | ||
maciejmaciejko-gid marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
[0.18.0]: https://github.com/getindata/flink-http-connector/compare/0.17.0...0.18.0 | ||
|
||
|
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,6 @@ under the License. | |
<log4j.version>2.17.2</log4j.version> | ||
<lombok.version>1.18.22</lombok.version> | ||
<jackson.version>2.18.1</jackson.version> | ||
<junit4.version>4.13.2</junit4.version> | ||
<junit5.version>5.10.1</junit5.version> | ||
<junit.jupiter.version>${junit5.version}</junit.jupiter.version> | ||
<assertj.core.version>3.21.0</assertj.core.version> | ||
|
@@ -87,6 +86,8 @@ under the License. | |
<jacoco.plugin.version>0.8.12</jacoco.plugin.version> | ||
<maven.shade.plugin.version>3.1.1</maven.shade.plugin.version> | ||
<mockito-inline.version>4.6.1</mockito-inline.version> | ||
<resilence4j.version>1.7.1</resilence4j.version> | ||
<slf4j.version>2.0.17</slf4j.version> | ||
</properties> | ||
|
||
<repositories> | ||
|
@@ -119,25 +120,17 @@ under the License. | |
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- Add logging framework, to produce console output when running in the IDE. --> | ||
<!-- These dependencies are excluded from the application JAR by default. --> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-slf4j-impl</artifactId> | ||
<version>${log4j.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-api</artifactId> | ||
<version>${log4j.version}</version> | ||
<scope>provided</scope> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
<version>${slf4j.version}</version> | ||
</dependency> | ||
<!-- Add logging framework, to produce console output when running in the IDE. --> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have the logging change in a separate PR - it is easier to track the history then please There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change is needed anyways? One of the "rule of thumbs" when we were starting this connector was to try not add any external libraries to the connector, that my or may not clash with any user code -> i.e that is why we use Java's 11 http client. You need the resilence4j for retry functionality right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dedicated lib for retries might be an overkill now, but I think we can benefit in long term. The library provides Rate Limiter or Circuit Breaker. Both features might be worth adding. Or at least Rate Limiter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Why this change is needed anyways? Yes, I had to change to compile project with resilence4j. Notice that Flink use the same API: "You need the resilence4j for retry functionality right?"
Do you think it's ok to add them? Another option is to shadow them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-core</artifactId> | ||
<version>${log4j.version}</version> | ||
<scope>provided</scope> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-simple</artifactId> | ||
<version>${slf4j.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
|
@@ -167,6 +160,12 @@ under the License. | |
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>io.github.resilience4j</groupId> | ||
<artifactId>resilience4j-retry</artifactId> | ||
<version>${resilence4j.version}</version> | ||
</dependency> | ||
|
||
<!--TEST--> | ||
<dependency> | ||
<groupId>org.apache.httpcomponents</groupId> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package com.getindata.connectors.http; | ||
|
||
import lombok.Getter; | ||
|
||
import java.net.http.HttpResponse; | ||
|
||
@Getter | ||
public class HttpStatusCodeValidationFailedException extends Exception { | ||
private final HttpResponse<?> response; | ||
|
||
public HttpStatusCodeValidationFailedException(String message, HttpResponse<?> response) { | ||
super(message); | ||
this.response = response; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
package com.getindata.connectors.http.internal.retry; | ||
|
||
import com.getindata.connectors.http.HttpStatusCodeValidationFailedException; | ||
import com.getindata.connectors.http.internal.status.HttpResponseChecker; | ||
import io.github.resilience4j.retry.Retry; | ||
import io.github.resilience4j.retry.RetryConfig; | ||
import io.github.resilience4j.retry.RetryRegistry; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.flink.metrics.MetricGroup; | ||
|
||
import java.io.IOException; | ||
import java.net.http.HttpClient; | ||
import java.net.http.HttpRequest; | ||
import java.net.http.HttpResponse; | ||
import java.util.function.Supplier; | ||
|
||
@Slf4j | ||
public class HttpClientWithRetry { | ||
|
||
private final HttpClient httpClient; | ||
@Getter | ||
private final HttpResponseChecker responseChecker; | ||
private final Retry retry; | ||
|
||
@Builder | ||
HttpClientWithRetry(HttpClient httpClient, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am wondering what happens with OIDC, the short lived bearer token may need to be regenerated if the retries occur after the token has expired). Is this regeneration check done for the retries? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The request is created only once, but OIDC processor (responsible for setting bearer token in request) is called on every retry. |
||
RetryConfig retryConfig, | ||
HttpResponseChecker responseChecker) { | ||
this.httpClient = httpClient; | ||
this.responseChecker = responseChecker; | ||
retryConfig = RetryConfig.from(retryConfig) | ||
.retryExceptions(IOException.class, RetryHttpRequestException.class) | ||
.build(); | ||
this.retry = RetryRegistry.ofDefaults().retry("http-lookup-connector", retryConfig); | ||
} | ||
|
||
public void registerMetrics(MetricGroup metrics){ | ||
var group = metrics.addGroup("http_lookup_connector"); | ||
group.gauge("successfulCallsWithRetryAttempt", | ||
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithRetryAttempt()); | ||
group.gauge("successfulCallsWithoutRetryAttempt", | ||
() -> retry.getMetrics().getNumberOfSuccessfulCallsWithoutRetryAttempt()); | ||
} | ||
|
||
public <T> HttpResponse<T> send( | ||
Supplier<HttpRequest> requestSupplier, | ||
HttpResponse.BodyHandler<T> responseBodyHandler | ||
) throws IOException, InterruptedException, HttpStatusCodeValidationFailedException { | ||
try { | ||
try { | ||
return Retry.decorateCheckedSupplier(retry, () -> task(requestSupplier, responseBodyHandler)).apply(); | ||
} catch (RetryHttpRequestException retryException) { | ||
throw retryException.getCausedBy(); | ||
} | ||
} catch (IOException | InterruptedException | HttpStatusCodeValidationFailedException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why IOException and InterruptedException are special here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are checked exceptions from |
||
throw e; | ||
} catch (Throwable t) { | ||
throw new RuntimeException("Unexpected exception", t); | ||
} | ||
} | ||
|
||
private <T> HttpResponse<T> task( | ||
Supplier<HttpRequest> requestSupplier, | ||
HttpResponse.BodyHandler<T> responseBodyHandler | ||
) throws IOException, InterruptedException, RetryHttpRequestException, HttpStatusCodeValidationFailedException { | ||
var request = requestSupplier.get(); | ||
var response = httpClient.send(request, responseBodyHandler); | ||
if (responseChecker.isSuccessful(response)) { | ||
return response; | ||
} | ||
var validationFailedException = new HttpStatusCodeValidationFailedException( | ||
"Incorrect response code: " + response.statusCode(), response); | ||
if (responseChecker.isTemporalError(response)) { | ||
log.debug("Retrying... Received response with code {} for request {}", response.statusCode(), request); | ||
throw new RetryHttpRequestException(validationFailedException); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we need an exception to communicate a retry is needed? I'm really not a fan of "communication via exceptions". Exception are very costly... plus this looks not right to me. Similar thing was discussed here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. You're right. It's better to retry based on response. I fixed the code. |
||
} | ||
throw validationFailedException; | ||
} | ||
} | ||
|
||
@Getter | ||
@RequiredArgsConstructor | ||
class RetryHttpRequestException extends Exception { | ||
private final HttpStatusCodeValidationFailedException causedBy; | ||
} |
Uh oh!
There was an error while loading. Please reload this page.