Skip to content

Commit 4261a21

Browse files
Adding connection timeout override for 10 seconds (#1273)
* Adding connection timeout override for 10 seconds Signed-off-by: Sachin Sriramagiri <srirasac@amazon.com> * chaining request config builder options Signed-off-by: Sachin Sriramagiri <srirasac@amazon.com> --------- Signed-off-by: Sachin Sriramagiri <srirasac@amazon.com> (cherry picked from commit 9b5c6ed) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 8668b93 commit 4261a21

File tree

4 files changed

+21
-2
lines changed

4 files changed

+21
-2
lines changed

docs/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
556556
- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '<rootDir>/<indexName>/<UUID>' to isolate checkpoint data.
557557
- `spark.flint.index.checkpoint.mandatory`: default is true.
558558
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
559+
- `spark.datasource.flint.connection_timeout_millis`: default value is 10000.
559560
- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 2000 if using aoss service, otherwise 0.
560561
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
561562
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.

flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ public class FlintOptions implements Serializable {
8787

8888
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;
8989

90+
public static final String CONNECTION_TIMEOUT_MILLIS = "connection_timeout_millis";
91+
92+
public static final int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 10000;
93+
9094
public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000;
9195

9296
public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis";
@@ -207,6 +211,10 @@ public int getSocketTimeoutMillis() {
207211
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
208212
}
209213

214+
public int getConnectionTimeoutMillis() {
215+
return Integer.parseInt(options.getOrDefault(CONNECTION_TIMEOUT_MILLIS, String.valueOf(DEFAULT_CONNECTION_TIMEOUT_MILLIS)));
216+
}
217+
210218
public int getRequestCompletionDelayMillis() {
211219
int defaultValue = SERVICE_NAME_AOSS.equals(getServiceName())
212220
? DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS

flint-core/src/main/scala/org/opensearch/flint/core/storage/RequestConfigurator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
/**
1313
* allows override default socket timeout in RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS
14+
* allows override of default connection timeout in RestClientBuilder.DEFAULT_CONNECT_TIMEOUT_MILLIS
1415
*/
1516
public class RequestConfigurator implements RestClientBuilder.RequestConfigCallback {
1617

@@ -22,8 +23,9 @@ public RequestConfigurator(FlintOptions options) {
2223

2324
@Override
2425
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
25-
// Set the socket timeout in milliseconds
26-
return requestConfigBuilder.setSocketTimeout(options.getSocketTimeoutMillis());
26+
// Set the connection and socket timeouts in milliseconds
27+
return requestConfigBuilder.setConnectTimeout(options.getConnectionTimeoutMillis())
28+
.setSocketTimeout(options.getSocketTimeoutMillis());
2729
}
2830
}
2931

flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,13 @@ object FlintSparkConf {
251251
.datasourceOption()
252252
.doc("socket duration in milliseconds")
253253
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))
254+
255+
val CONNECTION_TIMEOUT_MILLIS =
256+
FlintConfig(s"spark.datasource.flint.${FlintOptions.CONNECTION_TIMEOUT_MILLIS}")
257+
.datasourceOption()
258+
.doc("connection duration in milliseconds")
259+
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_CONNECTION_TIMEOUT_MILLIS))
260+
254261
val REQUEST_COMPLETION_DELAY_MILLIS =
255262
FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}")
256263
.datasourceOption()
@@ -408,6 +415,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
408415
USERNAME,
409416
PASSWORD,
410417
SOCKET_TIMEOUT_MILLIS,
418+
CONNECTION_TIMEOUT_MILLIS,
411419
JOB_TYPE,
412420
REPL_INACTIVITY_TIMEOUT_MILLIS,
413421
BATCH_BYTES)

0 commit comments

Comments
 (0)