Skip to content

Commit 14673de

Browse files
Add async blob read and download support using multiple streams (#9592) (#9696)
(cherry picked from commit 6765b16) Signed-off-by: Kunal Kotwani <kkotwani@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 2e0bc40 commit 14673de

File tree

15 files changed

+745
-0
lines changed

15 files changed

+745
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- [Remote State] Create service to publish cluster state to remote store ([#9160](https://github.com/opensearch-project/OpenSearch/pull/9160))
2929
- Core crypto library to perform encryption and decryption of source content ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466))
3030
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
31+
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
3132

3233
### Dependencies
3334
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.opensearch.common.blobstore.BlobStoreException;
7070
import org.opensearch.common.blobstore.DeleteResult;
7171
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
72+
import org.opensearch.common.blobstore.stream.read.ReadContext;
7273
import org.opensearch.common.blobstore.stream.write.WriteContext;
7374
import org.opensearch.common.blobstore.stream.write.WritePriority;
7475
import org.opensearch.common.blobstore.support.AbstractBlobContainer;
@@ -211,6 +212,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
211212
}
212213
}
213214

215+
@Override
216+
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
217+
throw new UnsupportedOperationException();
218+
}
219+
214220
// package private for testing
215221
long getLargeBlobThresholdInBytes() {
216222
return blobStore.bufferSizeInBytes();

plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
6262
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
6363

64+
import org.opensearch.action.support.PlainActionFuture;
6465
import org.opensearch.common.blobstore.BlobContainer;
6566
import org.opensearch.common.blobstore.BlobMetadata;
6667
import org.opensearch.common.blobstore.BlobPath;
@@ -881,6 +882,17 @@ public void onFailure(Exception e) {}
881882
}
882883
}
883884

885+
public void testAsyncBlobDownload() {
886+
final S3BlobStore blobStore = mock(S3BlobStore.class);
887+
final BlobPath blobPath = mock(BlobPath.class);
888+
final String blobName = "test-blob";
889+
890+
final UnsupportedOperationException e = expectThrows(UnsupportedOperationException.class, () -> {
891+
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
892+
blobContainer.readBlobAsync(blobName, new PlainActionFuture<>());
893+
});
894+
}
895+
884896
public void testListBlobsByPrefixInLexicographicOrderWithNegativeLimit() throws IOException {
885897
testListBlobsByPrefixInLexicographicOrder(-5, 0, BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC);
886898
}

server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsVerifyingBlobContainer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.common.blobstore.VerifyingMultiStreamBlobContainer;
1515
import org.opensearch.common.blobstore.fs.FsBlobContainer;
1616
import org.opensearch.common.blobstore.fs.FsBlobStore;
17+
import org.opensearch.common.blobstore.stream.read.ReadContext;
1718
import org.opensearch.common.blobstore.stream.write.WriteContext;
1819
import org.opensearch.common.io.InputStreamContainer;
1920
import org.opensearch.core.action.ActionListener;
@@ -24,6 +25,8 @@
2425
import java.nio.file.Files;
2526
import java.nio.file.Path;
2627
import java.nio.file.StandardOpenOption;
28+
import java.util.ArrayList;
29+
import java.util.List;
2730
import java.util.concurrent.CountDownLatch;
2831
import java.util.concurrent.TimeUnit;
2932
import java.util.concurrent.atomic.AtomicLong;
@@ -114,6 +117,27 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
114117

115118
}
116119

120+
@Override
121+
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
122+
new Thread(() -> {
123+
try {
124+
long contentLength = listBlobs().get(blobName).length();
125+
long partSize = contentLength / 10;
126+
int numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1);
127+
List<InputStreamContainer> blobPartStreams = new ArrayList<>();
128+
for (int partNumber = 0; partNumber < numberOfParts; partNumber++) {
129+
long offset = partNumber * partSize;
130+
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
131+
blobPartStreams.add(blobPartStream);
132+
}
133+
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
134+
listener.onResponse(blobReadContext);
135+
} catch (Exception e) {
136+
listener.onFailure(e);
137+
}
138+
}).start();
139+
}
140+
117141
private boolean isSegmentFile(String filename) {
118142
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
119143
}

server/src/main/java/org/opensearch/common/blobstore/VerifyingMultiStreamBlobContainer.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,15 @@
88

99
package org.opensearch.common.blobstore;
1010

11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.common.blobstore.stream.read.ReadContext;
13+
import org.opensearch.common.blobstore.stream.read.listener.ReadContextListener;
1114
import org.opensearch.common.blobstore.stream.write.WriteContext;
1215
import org.opensearch.core.action.ActionListener;
16+
import org.opensearch.threadpool.ThreadPool;
1317

1418
import java.io.IOException;
19+
import java.nio.file.Path;
1520

1621
/**
1722
* An extension of {@link BlobContainer} that adds {@link VerifyingMultiStreamBlobContainer#asyncBlobUpload} to allow
@@ -31,4 +36,25 @@ public interface VerifyingMultiStreamBlobContainer extends BlobContainer {
3136
* @throws IOException if any of the input streams could not be read, or the target blob could not be written to
3237
*/
3338
void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> completionListener) throws IOException;
39+
40+
/**
41+
* Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container.
42+
* @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched.
43+
* @param listener Async listener for {@link ReadContext} object which serves the input streams and other metadata for the blob
44+
*/
45+
@ExperimentalApi
46+
void readBlobAsync(String blobName, ActionListener<ReadContext> listener);
47+
48+
/**
49+
* Asynchronously downloads the blob to the specified location using an executor from the thread pool.
50+
* @param blobName The name of the blob for which needs to be downloaded.
51+
* @param fileLocation The path on local disk where the blob needs to be downloaded.
52+
* @param threadPool The threadpool instance which will provide the executor for performing a multipart download.
53+
* @param completionListener Listener which will be notified when the download is complete.
54+
*/
55+
@ExperimentalApi
56+
default void asyncBlobDownload(String blobName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
57+
ReadContextListener readContextListener = new ReadContextListener(blobName, fileLocation, threadPool, completionListener);
58+
readBlobAsync(blobName, readContextListener);
59+
}
3460
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore.stream.read;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.common.io.InputStreamContainer;
13+
14+
import java.util.List;
15+
16+
/**
17+
* ReadContext is used to encapsulate all data needed by <code>BlobContainer#readBlobAsync</code>
18+
*/
19+
@ExperimentalApi
20+
public class ReadContext {
21+
private final long blobSize;
22+
private final List<InputStreamContainer> partStreams;
23+
private final String blobChecksum;
24+
25+
public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String blobChecksum) {
26+
this.blobSize = blobSize;
27+
this.partStreams = partStreams;
28+
this.blobChecksum = blobChecksum;
29+
}
30+
31+
public String getBlobChecksum() {
32+
return blobChecksum;
33+
}
34+
35+
public int getNumberOfParts() {
36+
return partStreams.size();
37+
}
38+
39+
public long getBlobSize() {
40+
return blobSize;
41+
}
42+
43+
public List<InputStreamContainer> getPartStreams() {
44+
return partStreams;
45+
}
46+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore.stream.read.listener;
10+
11+
import org.opensearch.common.annotation.InternalApi;
12+
import org.opensearch.core.action.ActionListener;
13+
14+
import java.util.concurrent.atomic.AtomicInteger;
15+
16+
/**
17+
* FileCompletionListener listens for completion of fetch on all the streams for a file, where
18+
* individual streams are handled using {@link FilePartWriter}. The {@link FilePartWriter}(s)
19+
* hold a reference to the file completion listener to be notified.
20+
*/
21+
@InternalApi
22+
class FileCompletionListener implements ActionListener<Integer> {
23+
24+
private final int numberOfParts;
25+
private final String fileName;
26+
private final AtomicInteger completedPartsCount;
27+
private final ActionListener<String> completionListener;
28+
29+
public FileCompletionListener(int numberOfParts, String fileName, ActionListener<String> completionListener) {
30+
this.completedPartsCount = new AtomicInteger();
31+
this.numberOfParts = numberOfParts;
32+
this.fileName = fileName;
33+
this.completionListener = completionListener;
34+
}
35+
36+
@Override
37+
public void onResponse(Integer unused) {
38+
if (completedPartsCount.incrementAndGet() == numberOfParts) {
39+
completionListener.onResponse(fileName);
40+
}
41+
}
42+
43+
@Override
44+
public void onFailure(Exception e) {
45+
completionListener.onFailure(e);
46+
}
47+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore.stream.read.listener;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.annotation.InternalApi;
14+
import org.opensearch.common.io.Channels;
15+
import org.opensearch.common.io.InputStreamContainer;
16+
import org.opensearch.core.action.ActionListener;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.nio.channels.FileChannel;
21+
import java.nio.file.Files;
22+
import java.nio.file.Path;
23+
import java.nio.file.StandardOpenOption;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
26+
/**
27+
* FilePartWriter transfers the provided stream into the specified file path using a {@link FileChannel}
28+
* instance. It performs offset based writes to the file and notifies the {@link FileCompletionListener} on completion.
29+
*/
30+
@InternalApi
31+
class FilePartWriter implements Runnable {
32+
33+
private final int partNumber;
34+
private final InputStreamContainer blobPartStreamContainer;
35+
private final Path fileLocation;
36+
private final AtomicBoolean anyPartStreamFailed;
37+
private final ActionListener<Integer> fileCompletionListener;
38+
private static final Logger logger = LogManager.getLogger(FilePartWriter.class);
39+
40+
// 8 MB buffer for transfer
41+
private static final int BUFFER_SIZE = 8 * 1024 * 2024;
42+
43+
public FilePartWriter(
44+
int partNumber,
45+
InputStreamContainer blobPartStreamContainer,
46+
Path fileLocation,
47+
AtomicBoolean anyPartStreamFailed,
48+
ActionListener<Integer> fileCompletionListener
49+
) {
50+
this.partNumber = partNumber;
51+
this.blobPartStreamContainer = blobPartStreamContainer;
52+
this.fileLocation = fileLocation;
53+
this.anyPartStreamFailed = anyPartStreamFailed;
54+
this.fileCompletionListener = fileCompletionListener;
55+
}
56+
57+
@Override
58+
public void run() {
59+
// Ensures no writes to the file if any stream fails.
60+
if (anyPartStreamFailed.get() == false) {
61+
try (FileChannel outputFileChannel = FileChannel.open(fileLocation, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
62+
try (InputStream inputStream = blobPartStreamContainer.getInputStream()) {
63+
long streamOffset = blobPartStreamContainer.getOffset();
64+
final byte[] buffer = new byte[BUFFER_SIZE];
65+
int bytesRead;
66+
while ((bytesRead = inputStream.read(buffer)) != -1) {
67+
Channels.writeToChannel(buffer, 0, bytesRead, outputFileChannel, streamOffset);
68+
streamOffset += bytesRead;
69+
}
70+
}
71+
} catch (IOException e) {
72+
processFailure(e);
73+
return;
74+
}
75+
fileCompletionListener.onResponse(partNumber);
76+
}
77+
}
78+
79+
void processFailure(Exception e) {
80+
try {
81+
Files.deleteIfExists(fileLocation);
82+
} catch (IOException ex) {
83+
// Die silently
84+
logger.info("Failed to delete file {} on stream failure: {}", fileLocation, ex);
85+
}
86+
if (anyPartStreamFailed.getAndSet(true) == false) {
87+
fileCompletionListener.onFailure(e);
88+
}
89+
}
90+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.blobstore.stream.read.listener;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.annotation.InternalApi;
14+
import org.opensearch.common.blobstore.stream.read.ReadContext;
15+
import org.opensearch.core.action.ActionListener;
16+
import org.opensearch.threadpool.ThreadPool;
17+
18+
import java.nio.file.Path;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
21+
/**
22+
* ReadContextListener orchestrates the async file fetch from the {@link org.opensearch.common.blobstore.BlobContainer}
23+
* using a {@link ReadContext} callback. On response, it spawns off the download using multiple streams which are
24+
* spread across a {@link ThreadPool} executor.
25+
*/
26+
@InternalApi
27+
public class ReadContextListener implements ActionListener<ReadContext> {
28+
29+
private final String fileName;
30+
private final Path fileLocation;
31+
private final ThreadPool threadPool;
32+
private final ActionListener<String> completionListener;
33+
private static final Logger logger = LogManager.getLogger(ReadContextListener.class);
34+
35+
public ReadContextListener(String fileName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) {
36+
this.fileName = fileName;
37+
this.fileLocation = fileLocation;
38+
this.threadPool = threadPool;
39+
this.completionListener = completionListener;
40+
}
41+
42+
@Override
43+
public void onResponse(ReadContext readContext) {
44+
logger.trace("Streams received for blob {}", fileName);
45+
final int numParts = readContext.getNumberOfParts();
46+
final AtomicBoolean anyPartStreamFailed = new AtomicBoolean();
47+
FileCompletionListener fileCompletionListener = new FileCompletionListener(numParts, fileName, completionListener);
48+
49+
for (int partNumber = 0; partNumber < numParts; partNumber++) {
50+
FilePartWriter filePartWriter = new FilePartWriter(
51+
partNumber,
52+
readContext.getPartStreams().get(partNumber),
53+
fileLocation,
54+
anyPartStreamFailed,
55+
fileCompletionListener
56+
);
57+
threadPool.executor(ThreadPool.Names.GENERIC).submit(filePartWriter);
58+
}
59+
}
60+
61+
@Override
62+
public void onFailure(Exception e) {
63+
completionListener.onFailure(e);
64+
}
65+
}

0 commit comments

Comments
 (0)