Skip to content

Commit d0f4937

Browse files
committed
Reapply "[Remote Store] Permit backed futures to prevent timeouts during uploa…" (opensearch-project#13663)
This reverts commit 9de5c4f.
1 parent de3cd87 commit d0f4937

26 files changed

+1555
-132
lines changed

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,22 @@ protected S3Repository createRepository(
312312
ClusterService clusterService,
313313
RecoverySettings recoverySettings
314314
) {
315-
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
315+
return new S3Repository(
316+
metadata,
317+
registry,
318+
service,
319+
clusterService,
320+
recoverySettings,
321+
null,
322+
null,
323+
null,
324+
null,
325+
null,
326+
false,
327+
null,
328+
null,
329+
null
330+
) {
316331

317332
@Override
318333
public BlobStore blobStore() {
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import java.util.concurrent.atomic.AtomicInteger;
14+
import java.util.concurrent.atomic.AtomicLong;
15+
16+
/**
17+
* Generic stats of repository-s3 plugin.
18+
*/
19+
public class GenericStatsMetricPublisher {
20+
21+
private final AtomicLong normalPriorityQSize = new AtomicLong();
22+
private final AtomicInteger normalPriorityPermits = new AtomicInteger();
23+
private final AtomicLong lowPriorityQSize = new AtomicLong();
24+
private final AtomicInteger lowPriorityPermits = new AtomicInteger();
25+
private final long normalPriorityQCapacity;
26+
private final int maxNormalPriorityPermits;
27+
private final long lowPriorityQCapacity;
28+
private final int maxLowPriorityPermits;
29+
30+
public GenericStatsMetricPublisher(
31+
long normalPriorityQCapacity,
32+
int maxNormalPriorityPermits,
33+
long lowPriorityQCapacity,
34+
int maxLowPriorityPermits
35+
) {
36+
this.normalPriorityQCapacity = normalPriorityQCapacity;
37+
this.maxNormalPriorityPermits = maxNormalPriorityPermits;
38+
this.lowPriorityQCapacity = lowPriorityQCapacity;
39+
this.maxLowPriorityPermits = maxLowPriorityPermits;
40+
}
41+
42+
public void updateNormalPriorityQSize(long qSize) {
43+
normalPriorityQSize.addAndGet(qSize);
44+
}
45+
46+
public void updateLowPriorityQSize(long qSize) {
47+
lowPriorityQSize.addAndGet(qSize);
48+
}
49+
50+
public void updateNormalPermits(boolean increment) {
51+
if (increment) {
52+
normalPriorityPermits.incrementAndGet();
53+
} else {
54+
normalPriorityPermits.decrementAndGet();
55+
}
56+
}
57+
58+
public void updateLowPermits(boolean increment) {
59+
if (increment) {
60+
lowPriorityPermits.incrementAndGet();
61+
} else {
62+
lowPriorityPermits.decrementAndGet();
63+
}
64+
}
65+
66+
public long getNormalPriorityQSize() {
67+
return normalPriorityQSize.get();
68+
}
69+
70+
public int getAcquiredNormalPriorityPermits() {
71+
return normalPriorityPermits.get();
72+
}
73+
74+
public long getLowPriorityQSize() {
75+
return lowPriorityQSize.get();
76+
}
77+
78+
public int getAcquiredLowPriorityPermits() {
79+
return lowPriorityPermits.get();
80+
}
81+
82+
Map<String, Long> stats() {
83+
final Map<String, Long> results = new HashMap<>();
84+
results.put("NormalPriorityQUtilization", (normalPriorityQSize.get() * 100) / normalPriorityQCapacity);
85+
results.put("LowPriorityQUtilization", (lowPriorityQSize.get() * 100) / lowPriorityQCapacity);
86+
results.put("NormalPriorityPermitsUtilization", (normalPriorityPermits.get() * 100L) / maxNormalPriorityPermits);
87+
results.put("LowPriorityPermitsUtilization", (lowPriorityPermits.get() * 100L) / maxLowPriorityPermits);
88+
return results;
89+
}
90+
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.opensearch.core.common.Strings;
8989
import org.opensearch.core.common.unit.ByteSizeUnit;
9090
import org.opensearch.core.common.unit.ByteSizeValue;
91+
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
9192
import org.opensearch.repositories.s3.async.UploadRequest;
9293
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
9394

@@ -193,7 +194,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
193194
blobStore.isUploadRetryEnabled()
194195
);
195196
try {
196-
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
197+
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
198+
// Therefore, redirecting it to slow client.
199+
if ((uploadRequest.getWritePriority() == WritePriority.LOW
200+
&& blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)
201+
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
202+
&& uploadRequest.getWritePriority() != WritePriority.URGENT
203+
&& blobStore.getNormalPrioritySizeBasedBlockingQ()
204+
.isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) {
197205
StreamContext streamContext = SocketAccess.doPrivileged(
198206
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
199207
);
@@ -232,23 +240,55 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
232240
} else {
233241
s3AsyncClient = amazonS3Reference.get().client();
234242
}
235-
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
236-
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
237-
completableFuture.whenComplete((response, throwable) -> {
238-
if (throwable == null) {
239-
completionListener.onResponse(response);
240-
} else {
241-
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
242-
completionListener.onFailure(ex);
243-
}
244-
});
243+
244+
if (writeContext.getWritePriority() == WritePriority.URGENT
245+
|| writeContext.getWritePriority() == WritePriority.HIGH
246+
|| blobStore.isPermitBackedTransferEnabled() == false) {
247+
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
248+
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
249+
blobStore.getLowPrioritySizeBasedBlockingQ()
250+
.produce(
251+
new SizeBasedBlockingQ.Item(
252+
writeContext.getFileSize(),
253+
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
254+
)
255+
);
256+
} else if (writeContext.getWritePriority() == WritePriority.NORMAL) {
257+
blobStore.getNormalPrioritySizeBasedBlockingQ()
258+
.produce(
259+
new SizeBasedBlockingQ.Item(
260+
writeContext.getFileSize(),
261+
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
262+
)
263+
);
264+
} else {
265+
throw new IllegalStateException("Cannot perform upload for other priority types.");
266+
}
245267
}
246268
} catch (Exception e) {
247269
logger.info("exception error from blob container for file {}", writeContext.getFileName());
248270
throw new IOException(e);
249271
}
250272
}
251273

274+
private CompletableFuture<Void> createFileCompletableFuture(
275+
S3AsyncClient s3AsyncClient,
276+
UploadRequest uploadRequest,
277+
StreamContext streamContext,
278+
ActionListener<Void> completionListener
279+
) {
280+
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
281+
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
282+
return completableFuture.whenComplete((response, throwable) -> {
283+
if (throwable == null) {
284+
completionListener.onResponse(response);
285+
} else {
286+
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
287+
completionListener.onFailure(ex);
288+
}
289+
});
290+
}
291+
252292
@ExperimentalApi
253293
@Override
254294
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.opensearch.core.common.unit.ByteSizeValue;
4646
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
4747
import org.opensearch.repositories.s3.async.AsyncTransferManager;
48+
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
4849

4950
import java.io.IOException;
5051
import java.util.Collections;
@@ -56,6 +57,7 @@
5657
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
5758
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
5859
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
60+
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
5961
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
6062
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
6163
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
@@ -77,6 +79,8 @@ class S3BlobStore implements BlobStore {
7779

7880
private volatile boolean uploadRetryEnabled;
7981

82+
private volatile boolean permitBackedTransferEnabled;
83+
8084
private volatile boolean serverSideEncryption;
8185

8286
private volatile ObjectCannedACL cannedACL;
@@ -94,6 +98,9 @@ class S3BlobStore implements BlobStore {
9498
private final AsyncExecutorContainer priorityExecutorBuilder;
9599
private final AsyncExecutorContainer normalExecutorBuilder;
96100
private final boolean multipartUploadEnabled;
101+
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
102+
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
103+
private final GenericStatsMetricPublisher genericStatsMetricPublisher;
97104

98105
S3BlobStore(
99106
S3Service service,
@@ -109,7 +116,10 @@ class S3BlobStore implements BlobStore {
109116
AsyncTransferManager asyncTransferManager,
110117
AsyncExecutorContainer urgentExecutorBuilder,
111118
AsyncExecutorContainer priorityExecutorBuilder,
112-
AsyncExecutorContainer normalExecutorBuilder
119+
AsyncExecutorContainer normalExecutorBuilder,
120+
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
121+
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
122+
GenericStatsMetricPublisher genericStatsMetricPublisher
113123
) {
114124
this.service = service;
115125
this.s3AsyncService = s3AsyncService;
@@ -128,6 +138,10 @@ class S3BlobStore implements BlobStore {
128138
// Settings to initialize blobstore with.
129139
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
130140
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
141+
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
142+
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
143+
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
144+
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
131145
}
132146

133147
@Override
@@ -141,6 +155,7 @@ public void reload(RepositoryMetadata repositoryMetadata) {
141155
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
142156
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
143157
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
158+
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
144159
}
145160

146161
@Override
@@ -168,6 +183,10 @@ public boolean isUploadRetryEnabled() {
168183
return uploadRetryEnabled;
169184
}
170185

186+
public boolean isPermitBackedTransferEnabled() {
187+
return permitBackedTransferEnabled;
188+
}
189+
171190
public String bucket() {
172191
return bucket;
173192
}
@@ -184,6 +203,14 @@ public int getBulkDeletesSize() {
184203
return bulkDeletesSize;
185204
}
186205

206+
public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() {
207+
return normalPrioritySizeBasedBlockingQ;
208+
}
209+
210+
public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
211+
return lowPrioritySizeBasedBlockingQ;
212+
}
213+
187214
@Override
188215
public BlobContainer blobContainer(BlobPath path) {
189216
return new S3BlobContainer(path, this);
@@ -201,7 +228,9 @@ public void close() throws IOException {
201228

202229
@Override
203230
public Map<String, Long> stats() {
204-
return statsMetricPublisher.getStats().toMap();
231+
Map<String, Long> stats = statsMetricPublisher.getStats().toMap();
232+
stats.putAll(genericStatsMetricPublisher.stats());
233+
return stats;
205234
}
206235

207236
@Override
@@ -211,6 +240,7 @@ public Map<Metric, Map<String, Long>> extendedStats() {
211240
}
212241
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
213242
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
243+
extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats());
214244
return extendedStats;
215245
}
216246

0 commit comments

Comments
 (0)