Skip to content

Commit 9de5c4f

Browse files
authored
Revert "[Remote Store] Permit backed futures to prevent timeouts during uploa…" (#13663)
This reverts commit 06945b2.
1 parent 06945b2 commit 9de5c4f

26 files changed

+132
-1555
lines changed

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -312,22 +312,7 @@ protected S3Repository createRepository(
312312
ClusterService clusterService,
313313
RecoverySettings recoverySettings
314314
) {
315-
return new S3Repository(
316-
metadata,
317-
registry,
318-
service,
319-
clusterService,
320-
recoverySettings,
321-
null,
322-
null,
323-
null,
324-
null,
325-
null,
326-
false,
327-
null,
328-
null,
329-
null
330-
) {
315+
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
331316

332317
@Override
333318
public BlobStore blobStore() {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/GenericStatsMetricPublisher.java

Lines changed: 0 additions & 90 deletions
This file was deleted.

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@
8888
import org.opensearch.core.common.Strings;
8989
import org.opensearch.core.common.unit.ByteSizeUnit;
9090
import org.opensearch.core.common.unit.ByteSizeValue;
91-
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
9291
import org.opensearch.repositories.s3.async.UploadRequest;
9392
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
9493

@@ -194,14 +193,7 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
194193
blobStore.isUploadRetryEnabled()
195194
);
196195
try {
197-
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
198-
// Therefore, redirecting it to slow client.
199-
if ((uploadRequest.getWritePriority() == WritePriority.LOW
200-
&& blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)
201-
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
202-
&& uploadRequest.getWritePriority() != WritePriority.URGENT
203-
&& blobStore.getNormalPrioritySizeBasedBlockingQ()
204-
.isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) {
196+
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
205197
StreamContext streamContext = SocketAccess.doPrivileged(
206198
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
207199
);
@@ -240,55 +232,23 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
240232
} else {
241233
s3AsyncClient = amazonS3Reference.get().client();
242234
}
243-
244-
if (writeContext.getWritePriority() == WritePriority.URGENT
245-
|| writeContext.getWritePriority() == WritePriority.HIGH
246-
|| blobStore.isPermitBackedTransferEnabled() == false) {
247-
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
248-
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
249-
blobStore.getLowPrioritySizeBasedBlockingQ()
250-
.produce(
251-
new SizeBasedBlockingQ.Item(
252-
writeContext.getFileSize(),
253-
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
254-
)
255-
);
256-
} else if (writeContext.getWritePriority() == WritePriority.NORMAL) {
257-
blobStore.getNormalPrioritySizeBasedBlockingQ()
258-
.produce(
259-
new SizeBasedBlockingQ.Item(
260-
writeContext.getFileSize(),
261-
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
262-
)
263-
);
264-
} else {
265-
throw new IllegalStateException("Cannot perform upload for other priority types.");
266-
}
235+
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
236+
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
237+
completableFuture.whenComplete((response, throwable) -> {
238+
if (throwable == null) {
239+
completionListener.onResponse(response);
240+
} else {
241+
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
242+
completionListener.onFailure(ex);
243+
}
244+
});
267245
}
268246
} catch (Exception e) {
269247
logger.info("exception error from blob container for file {}", writeContext.getFileName());
270248
throw new IOException(e);
271249
}
272250
}
273251

274-
private CompletableFuture<Void> createFileCompletableFuture(
275-
S3AsyncClient s3AsyncClient,
276-
UploadRequest uploadRequest,
277-
StreamContext streamContext,
278-
ActionListener<Void> completionListener
279-
) {
280-
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
281-
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
282-
return completableFuture.whenComplete((response, throwable) -> {
283-
if (throwable == null) {
284-
completionListener.onResponse(response);
285-
} else {
286-
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
287-
completionListener.onFailure(ex);
288-
}
289-
});
290-
}
291-
292252
@ExperimentalApi
293253
@Override
294254
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import org.opensearch.core.common.unit.ByteSizeValue;
4646
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
4747
import org.opensearch.repositories.s3.async.AsyncTransferManager;
48-
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
4948

5049
import java.io.IOException;
5150
import java.util.Collections;
@@ -57,7 +56,6 @@
5756
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
5857
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
5958
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
60-
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
6159
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
6260
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
6361
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
@@ -79,8 +77,6 @@ class S3BlobStore implements BlobStore {
7977

8078
private volatile boolean uploadRetryEnabled;
8179

82-
private volatile boolean permitBackedTransferEnabled;
83-
8480
private volatile boolean serverSideEncryption;
8581

8682
private volatile ObjectCannedACL cannedACL;
@@ -98,9 +94,6 @@ class S3BlobStore implements BlobStore {
9894
private final AsyncExecutorContainer priorityExecutorBuilder;
9995
private final AsyncExecutorContainer normalExecutorBuilder;
10096
private final boolean multipartUploadEnabled;
101-
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
102-
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
103-
private final GenericStatsMetricPublisher genericStatsMetricPublisher;
10497

10598
S3BlobStore(
10699
S3Service service,
@@ -116,10 +109,7 @@ class S3BlobStore implements BlobStore {
116109
AsyncTransferManager asyncTransferManager,
117110
AsyncExecutorContainer urgentExecutorBuilder,
118111
AsyncExecutorContainer priorityExecutorBuilder,
119-
AsyncExecutorContainer normalExecutorBuilder,
120-
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
121-
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
122-
GenericStatsMetricPublisher genericStatsMetricPublisher
112+
AsyncExecutorContainer normalExecutorBuilder
123113
) {
124114
this.service = service;
125115
this.s3AsyncService = s3AsyncService;
@@ -138,10 +128,6 @@ class S3BlobStore implements BlobStore {
138128
// Settings to initialize blobstore with.
139129
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
140130
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
141-
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
142-
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
143-
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
144-
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
145131
}
146132

147133
@Override
@@ -155,7 +141,6 @@ public void reload(RepositoryMetadata repositoryMetadata) {
155141
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
156142
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
157143
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
158-
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
159144
}
160145

161146
@Override
@@ -183,10 +168,6 @@ public boolean isUploadRetryEnabled() {
183168
return uploadRetryEnabled;
184169
}
185170

186-
public boolean isPermitBackedTransferEnabled() {
187-
return permitBackedTransferEnabled;
188-
}
189-
190171
public String bucket() {
191172
return bucket;
192173
}
@@ -203,14 +184,6 @@ public int getBulkDeletesSize() {
203184
return bulkDeletesSize;
204185
}
205186

206-
public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() {
207-
return normalPrioritySizeBasedBlockingQ;
208-
}
209-
210-
public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
211-
return lowPrioritySizeBasedBlockingQ;
212-
}
213-
214187
@Override
215188
public BlobContainer blobContainer(BlobPath path) {
216189
return new S3BlobContainer(path, this);
@@ -228,9 +201,7 @@ public void close() throws IOException {
228201

229202
@Override
230203
public Map<String, Long> stats() {
231-
Map<String, Long> stats = statsMetricPublisher.getStats().toMap();
232-
stats.putAll(genericStatsMetricPublisher.stats());
233-
return stats;
204+
return statsMetricPublisher.getStats().toMap();
234205
}
235206

236207
@Override
@@ -240,7 +211,6 @@ public Map<Metric, Map<String, Long>> extendedStats() {
240211
}
241212
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
242213
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
243-
extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats());
244214
return extendedStats;
245215
}
246216

0 commit comments

Comments
 (0)