|
7 | 7 |
|
8 | 8 | import com.github.tomakehurst.wiremock.WireMockServer;
|
9 | 9 | import com.github.tomakehurst.wiremock.client.MappingBuilder;
|
| 10 | +import com.github.tomakehurst.wiremock.http.Fault; |
10 | 11 | import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder;
|
| 12 | +import com.github.tomakehurst.wiremock.stubbing.Scenario; |
11 | 13 | import com.github.tomakehurst.wiremock.stubbing.StubMapping;
|
12 | 14 | import org.apache.flink.api.common.serialization.DeserializationSchema;
|
13 | 15 | import org.apache.flink.api.common.serialization.SerializationSchema;
|
@@ -94,6 +96,7 @@ public void setUp() {
|
94 | 96 | int[][] lookupKey = {{}};
|
95 | 97 | this.dynamicTableSourceContext = new LookupRuntimeProviderContext(lookupKey);
|
96 | 98 |
|
| 99 | + wireMockServer.resetAll(); |
97 | 100 | this.lookupRowData = GenericRowData.of(
|
98 | 101 | StringData.fromString("1"),
|
99 | 102 | StringData.fromString("2")
|
@@ -290,6 +293,45 @@ public void shouldConnectWithBasicAuth(String authorizationHeaderValue,
|
290 | 293 | assertThat(nestedDetailsRow.getString(0).toString()).isEqualTo("$1,729.34");
|
291 | 294 | }
|
292 | 295 |
|
| 296 | + @Test |
| 297 | + void shouldRetryOnIOExceptionAndSucceedOnSecondAttempt() { |
| 298 | + // GIVEN |
| 299 | + this.stubMapping = setUpServerStubForIOExceptionOnFirstAttempt(); |
| 300 | + Properties properties = new Properties(); |
| 301 | + properties.setProperty( |
| 302 | + HttpConnectorConfigConstants.LOOKUP_HTTP_MAX_RETRIES, |
| 303 | + "3" |
| 304 | + ); |
| 305 | + JavaNetHttpPollingClient pollingClient = setUpPollingClient( |
| 306 | + getBaseUrl(), properties, setUpGetRequestFactory(properties)); |
| 307 | + |
| 308 | + // WHEN |
| 309 | + Optional<RowData> poll = pollingClient.pull(lookupRowData); |
| 310 | + |
| 311 | + // THEN |
| 312 | + wireMockServer.verify(2, RequestPatternBuilder.forCustomMatcher(stubMapping.getRequest())); |
| 313 | + |
| 314 | + assertThat(poll.isPresent()).isTrue(); |
| 315 | + } |
| 316 | + |
| 317 | + private StubMapping setUpServerStubForIOExceptionOnFirstAttempt() { |
| 318 | + wireMockServer.stubFor( |
| 319 | + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) |
| 320 | + .inScenario("Retry Scenario") |
| 321 | + .whenScenarioStateIs(Scenario.STARTED) // Initial state |
| 322 | + .willReturn(aResponse() |
| 323 | + .withFault(Fault.CONNECTION_RESET_BY_PEER)) // Fail the first request |
| 324 | + .willSetStateTo("Second Attempt")); // Set the next state |
| 325 | + |
| 326 | + return wireMockServer.stubFor( |
| 327 | + get(urlEqualTo(ENDPOINT + "?id=1&uuid=2")) |
| 328 | + .inScenario("Retry Scenario") |
| 329 | + .whenScenarioStateIs("Second Attempt") // When the state is "Second Attempt" |
| 330 | + .willReturn(aResponse() |
| 331 | + .withStatus(200) |
| 332 | + .withBody(readTestFile(SAMPLES_FOLDER + "HttpResult.json")))); |
| 333 | + } |
| 334 | + |
293 | 335 | private String getBaseUrl() {
|
294 | 336 | return wireMockServer.baseUrl() + ENDPOINT;
|
295 | 337 | }
|
|
0 commit comments