Skip to content

Commit e018c73

Browse files
vikasvb90fahadshamiinsta
authored andcommitted
Stream read pool and default s3 timeouts tuning (opensearch-project#10912)
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
1 parent f98d964 commit e018c73

File tree

6 files changed

+51
-19
lines changed

6 files changed

+51
-19
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3ClientSettings.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ final class S3ClientSettings {
177177
static final Setting.AffixSetting<TimeValue> REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting(
178178
PREFIX,
179179
"request_timeout",
180-
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
180+
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(5), Property.NodeScope)
181181
);
182182

183183
/** The connection timeout for connecting to s3. */
@@ -198,14 +198,14 @@ final class S3ClientSettings {
198198
static final Setting.AffixSetting<Integer> MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
199199
PREFIX,
200200
"max_connections",
201-
key -> Setting.intSetting(key, 100, Property.NodeScope)
201+
key -> Setting.intSetting(key, 500, Property.NodeScope)
202202
);
203203

204204
/** Connection acquisition timeout for new connections to S3. */
205205
static final Setting.AffixSetting<TimeValue> CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting(
206206
PREFIX,
207207
"connection_acquisition_timeout",
208-
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
208+
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(15), Property.NodeScope)
209209
);
210210

211211
/** The maximum pending connections to S3. */

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,23 +99,32 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
9999
@Override
100100
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
101101
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
102-
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
102+
int halfProc = halfNumberOfProcessors(allocatedProcessors(settings));
103103
executorBuilders.add(
104104
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
105105
);
106-
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
106+
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));
107107
executorBuilders.add(
108-
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
108+
new ScalingExecutorBuilder(PRIORITY_FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
109109
);
110-
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
110+
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));
111111

112-
executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
113-
executorBuilders.add(new ScalingExecutorBuilder(STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
112+
executorBuilders.add(
113+
new ScalingExecutorBuilder(FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
114+
);
115+
executorBuilders.add(
116+
new ScalingExecutorBuilder(
117+
STREAM_READER,
118+
allocatedProcessors(settings),
119+
4 * allocatedProcessors(settings),
120+
TimeValue.timeValueMinutes(5)
121+
)
122+
);
114123
return executorBuilders;
115124
}
116125

117-
static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
118-
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
126+
static int halfNumberOfProcessors(int numberOfProcessors) {
127+
return (numberOfProcessors + 1) / 2;
119128
}
120129

121130
S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import software.amazon.awssdk.metrics.MetricPublisher;
1313
import software.amazon.awssdk.metrics.MetricRecord;
1414

15+
import org.apache.logging.log4j.LogManager;
16+
import org.apache.logging.log4j.Logger;
1517
import org.opensearch.common.blobstore.BlobStore;
1618

1719
import java.time.Duration;
@@ -21,6 +23,7 @@
2123

2224
public class StatsMetricPublisher {
2325

26+
private static final Logger LOGGER = LogManager.getLogger(StatsMetricPublisher.class);
2427
private final Stats stats = new Stats();
2528

2629
private final Map<BlobStore.Metric, Stats> extendedStats = new HashMap<>() {
@@ -35,6 +38,7 @@ public class StatsMetricPublisher {
3538
public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() {
3639
@Override
3740
public void publish(MetricCollection metricCollection) {
41+
LOGGER.debug(() -> "List objects request metrics: " + metricCollection);
3842
for (MetricRecord<?> metricRecord : metricCollection) {
3943
switch (metricRecord.metric().name()) {
4044
case "ApiCallDuration":
@@ -64,6 +68,7 @@ public void close() {}
6468
public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() {
6569
@Override
6670
public void publish(MetricCollection metricCollection) {
71+
LOGGER.debug(() -> "Delete objects request metrics: " + metricCollection);
6772
for (MetricRecord<?> metricRecord : metricCollection) {
6873
switch (metricRecord.metric().name()) {
6974
case "ApiCallDuration":
@@ -93,6 +98,7 @@ public void close() {}
9398
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
9499
@Override
95100
public void publish(MetricCollection metricCollection) {
101+
LOGGER.debug(() -> "Get object request metrics: " + metricCollection);
96102
for (MetricRecord<?> metricRecord : metricCollection) {
97103
switch (metricRecord.metric().name()) {
98104
case "ApiCallDuration":
@@ -122,6 +128,7 @@ public void close() {}
122128
public MetricPublisher putObjectMetricPublisher = new MetricPublisher() {
123129
@Override
124130
public void publish(MetricCollection metricCollection) {
131+
LOGGER.debug(() -> "Put object request metrics: " + metricCollection);
125132
for (MetricRecord<?> metricRecord : metricCollection) {
126133
switch (metricRecord.metric().name()) {
127134
case "ApiCallDuration":
@@ -151,6 +158,7 @@ public void close() {}
151158
public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() {
152159
@Override
153160
public void publish(MetricCollection metricCollection) {
161+
LOGGER.debug(() -> "Multi-part request metrics: " + metricCollection);
154162
for (MetricRecord<?> metricRecord : metricCollection) {
155163
switch (metricRecord.metric().name()) {
156164
case "ApiCallDuration":

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.common.io.InputStreamContainer;
2626
import org.opensearch.core.common.unit.ByteSizeUnit;
2727
import org.opensearch.repositories.s3.SocketAccess;
28+
import org.opensearch.repositories.s3.StatsMetricPublisher;
2829
import org.opensearch.repositories.s3.io.CheckedContainer;
2930

3031
import java.io.BufferedInputStream;
@@ -55,6 +56,7 @@ public class AsyncPartsHandler {
5556
* @param completedParts Reference of completed parts
5657
* @param inputStreamContainers Checksum containers
5758
* @return list of completable futures
59+
* @param statsMetricPublisher sdk metric publisher
5860
* @throws IOException thrown in case of an IO error
5961
*/
6062
public static List<CompletableFuture<CompletedPart>> uploadParts(
@@ -66,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
6668
StreamContext streamContext,
6769
String uploadId,
6870
AtomicReferenceArray<CompletedPart> completedParts,
69-
AtomicReferenceArray<CheckedContainer> inputStreamContainers
71+
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
72+
StatsMetricPublisher statsMetricPublisher
7073
) throws IOException {
7174
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
7275
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
@@ -77,6 +80,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
7780
.partNumber(partIdx + 1)
7881
.key(uploadRequest.getKey())
7982
.uploadId(uploadId)
83+
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
8084
.contentLength(inputStreamContainer.getContentLength());
8185
if (uploadRequest.doRemoteDataIntegrityCheck()) {
8286
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,14 @@ private void uploadInParts(
146146
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
147147
} else {
148148
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
149-
doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, createMultipartUploadResponse.uploadId());
149+
doUploadInParts(
150+
s3AsyncClient,
151+
uploadRequest,
152+
streamContext,
153+
returnFuture,
154+
createMultipartUploadResponse.uploadId(),
155+
statsMetricPublisher
156+
);
150157
}
151158
});
152159
}
@@ -156,7 +163,8 @@ private void doUploadInParts(
156163
UploadRequest uploadRequest,
157164
StreamContext streamContext,
158165
CompletableFuture<Void> returnFuture,
159-
String uploadId
166+
String uploadId,
167+
StatsMetricPublisher statsMetricPublisher
160168
) {
161169

162170
// The list of completed parts must be sorted
@@ -174,7 +182,8 @@ private void doUploadInParts(
174182
streamContext,
175183
uploadId,
176184
completedParts,
177-
inputStreamContainers
185+
inputStreamContainers,
186+
statsMetricPublisher
178187
);
179188
} catch (Exception ex) {
180189
try {
@@ -198,7 +207,7 @@ private void doUploadInParts(
198207
}
199208
return null;
200209
})
201-
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts))
210+
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher))
202211
.handle(handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId))
203212
.exceptionally(throwable -> {
204213
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
@@ -245,7 +254,8 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
245254
S3AsyncClient s3AsyncClient,
246255
UploadRequest uploadRequest,
247256
String uploadId,
248-
AtomicReferenceArray<CompletedPart> completedParts
257+
AtomicReferenceArray<CompletedPart> completedParts,
258+
StatsMetricPublisher statsMetricPublisher
249259
) {
250260

251261
log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", uploadId));
@@ -254,6 +264,7 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
254264
.bucket(uploadRequest.getBucket())
255265
.key(uploadRequest.getKey())
256266
.uploadId(uploadId)
267+
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
257268
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
258269
.build();
259270

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3ClientSettingsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ public void testThereIsADefaultClientByDefault() {
7070
assertThat(defaultSettings.protocol, is(Protocol.HTTPS));
7171
assertThat(defaultSettings.proxySettings, is(ProxySettings.NO_PROXY_SETTINGS));
7272
assertThat(defaultSettings.readTimeoutMillis, is(50 * 1000));
73-
assertThat(defaultSettings.requestTimeoutMillis, is(120 * 1000));
73+
assertThat(defaultSettings.requestTimeoutMillis, is(5 * 60 * 1000));
7474
assertThat(defaultSettings.connectionTimeoutMillis, is(10 * 1000));
7575
assertThat(defaultSettings.connectionTTLMillis, is(5 * 1000));
76-
assertThat(defaultSettings.maxConnections, is(100));
76+
assertThat(defaultSettings.maxConnections, is(500));
7777
assertThat(defaultSettings.maxRetries, is(3));
7878
assertThat(defaultSettings.throttleRetries, is(true));
7979
}

0 commit comments

Comments
 (0)