27
27
@ Slf4j
28
28
public class JavaNetHttpPollingClient implements PollingClient <RowData > {
29
29
30
+ public static final String DEFAULT_REQUEST_MAX_RETRIES = "3" ;
31
+
32
+ public static final String DEFAULT_REQUEST_RETRY_TIMEOUT_MS = "1000" ;
33
+
30
34
private final HttpClient httpClient ;
31
35
32
36
private final HttpStatusCodeChecker statusCodeChecker ;
@@ -37,6 +41,10 @@ public class JavaNetHttpPollingClient implements PollingClient<RowData> {
37
41
38
42
private final HttpPostRequestCallback <HttpLookupSourceRequestEntry > httpPostRequestCallback ;
39
43
44
+ protected final int httpRequestMaxRetries ;
45
+
46
+ protected final int httpRequestRetryTimeoutMs ;
47
+
40
48
public JavaNetHttpPollingClient (
41
49
HttpClient httpClient ,
42
50
DeserializationSchema <RowData > responseBodyDecoder ,
@@ -62,6 +70,20 @@ public JavaNetHttpPollingClient(
62
70
.build ();
63
71
64
72
this .statusCodeChecker = new ComposeHttpStatusCodeChecker (checkerConfig );
73
+
74
+ this .httpRequestMaxRetries = Integer .parseInt (
75
+ options .getProperties ().getProperty (
76
+ HttpConnectorConfigConstants .LOOKUP_HTTP_MAX_RETRIES ,
77
+ DEFAULT_REQUEST_MAX_RETRIES
78
+ )
79
+ );
80
+
81
+ this .httpRequestRetryTimeoutMs = Integer .parseInt (
82
+ options .getProperties ().getProperty (
83
+ HttpConnectorConfigConstants .LOOKUP_HTTP_RETRY_TIMEOUT_MS ,
84
+ DEFAULT_REQUEST_RETRY_TIMEOUT_MS
85
+ )
86
+ );
65
87
}
66
88
67
89
@ Override
@@ -74,15 +96,43 @@ public Optional<RowData> pull(RowData lookupRow) {
74
96
}
75
97
}
76
98
77
- // TODO Add Retry Policy And configure TimeOut from properties
78
- private Optional <RowData > queryAndProcess (RowData lookupData ) throws Exception {
79
-
99
+ private Optional <RowData > queryAndProcess (RowData lookupData ) {
80
100
HttpLookupSourceRequestEntry request = requestFactory .buildLookupRequest (lookupData );
81
- HttpResponse <String > response = httpClient .send (
82
- request .getHttpRequest (),
83
- BodyHandlers .ofString ()
84
- );
85
- return processHttpResponse (response , request );
101
+ HttpResponse <String > response = null ;
102
+
103
+ int retryCount = 0 ;
104
+
105
+ while (retryCount < this .httpRequestMaxRetries ) {
106
+ try {
107
+ response = httpClient .send (
108
+ request .getHttpRequest (),
109
+ BodyHandlers .ofString ()
110
+ );
111
+ break ;
112
+ } catch (IOException e ) {
113
+ log .error ("IOException during HTTP request. Retrying..." , e );
114
+ retryCount ++;
115
+ if (retryCount == this .httpRequestMaxRetries ) {
116
+ log .error ("Maximum retries reached. Aborting..." );
117
+ return Optional .empty ();
118
+ }
119
+ try {
120
+ Thread .sleep (this .httpRequestRetryTimeoutMs );
121
+ } catch (InterruptedException ie ) {
122
+ Thread .currentThread ().interrupt ();
123
+ }
124
+ } catch (InterruptedException e ) {
125
+ Thread .currentThread ().interrupt ();
126
+ log .error ("HTTP request interrupted. Aborting..." , e );
127
+ return Optional .empty ();
128
+ }
129
+ }
130
+ try {
131
+ return processHttpResponse (response , request );
132
+ } catch (IOException e ) {
133
+ log .error ("IOException during HTTP response processing." , e );
134
+ return Optional .empty ();
135
+ }
86
136
}
87
137
88
138
private Optional <RowData > processHttpResponse (
0 commit comments