Skip to content

Commit c42ada8

Browse files
authored
Handle null partSize in OnDemandBlockSnapshotIndexInput (opensearch-project#9470)
The `partSize()` value can be null if the underlying repository implementation does not implement file chunking. Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent 0c839c3 commit c42ada8

File tree

3 files changed

+54
-10
lines changed

3 files changed

+54
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
162162
- Fix flaky ResourceAwareTasksTests.testBasicTaskResourceTracking test ([#8993](https://github.com/opensearch-project/OpenSearch/pull/8993))
163163
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))
164164
- Fix range reads in respository-s3 ([9512](https://github.com/opensearch-project/OpenSearch/issues/9512))
165+
- Handle null partSize in OnDemandBlockSnapshotIndexInput ([#9291](https://github.com/opensearch-project/OpenSearch/issues/9291))
165166

166167
### Security
167168

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88

99
package org.opensearch.index.store.remote.file;
1010

11-
import org.apache.logging.log4j.LogManager;
12-
import org.apache.logging.log4j.Logger;
1311
import org.apache.lucene.store.FSDirectory;
1412
import org.apache.lucene.store.IndexInput;
1513
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
@@ -26,8 +24,6 @@
2624
* @opensearch.internal
2725
*/
2826
public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
29-
private static final Logger logger = LogManager.getLogger(OnDemandBlockSnapshotIndexInput.class);
30-
3127
/**
3228
* Where this class fetches IndexInput parts from
3329
*/
@@ -48,7 +44,7 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
4844
protected final String fileName;
4945

5046
/**
51-
* part size in bytes
47+
* Maximum size in bytes of snapshot file parts.
5248
*/
5349
protected final long partSize;
5450

@@ -104,7 +100,15 @@ public OnDemandBlockSnapshotIndexInput(
104100
super(builder);
105101
this.transferManager = transferManager;
106102
this.fileInfo = fileInfo;
107-
this.partSize = fileInfo.partSize().getBytes();
103+
if (fileInfo.partSize() != null) {
104+
this.partSize = fileInfo.partSize().getBytes();
105+
} else {
106+
// Repository implementations can define a size at which to split files
107+
// into multiple objects in the repository. If partSize() is null, then
108+
// no splitting happens, so default to Long.MAX_VALUE here to have the
109+
// same effect. See {@code BlobStoreRepository#chunkSize()}.
110+
this.partSize = Long.MAX_VALUE;
111+
}
108112
this.fileName = fileInfo.physicalName();
109113
this.directory = directory;
110114
this.originalFileSize = fileInfo.length();
@@ -131,6 +135,10 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
131135

132136
final long blockStart = getBlockStart(blockId);
133137
final long blockEnd = blockStart + getActualBlockSize(blockId);
138+
139+
// If the snapshot file is chunked, we must account for this by
140+
// choosing the appropriate file part and updating the position
141+
// accordingly.
134142
final int part = (int) (blockStart / partSize);
135143
final long partStart = part * partSize;
136144

server/src/test/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInputTests.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import org.apache.lucene.store.SimpleFSLockFactory;
2020
import org.apache.lucene.util.Constants;
2121
import org.apache.lucene.util.Version;
22+
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
23+
import org.opensearch.core.common.unit.ByteSizeUnit;
2224
import org.opensearch.core.common.unit.ByteSizeValue;
2325
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
2426
import org.opensearch.index.store.StoreFileMetadata;
@@ -31,9 +33,12 @@
3133
import java.io.IOException;
3234
import java.nio.file.Path;
3335

36+
import static org.mockito.ArgumentMatchers.argThat;
3437
import static org.mockito.Mockito.any;
3538
import static org.mockito.Mockito.doAnswer;
3639
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.verify;
41+
import static org.mockito.Mockito.when;
3742

3843
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
3944
public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase {
@@ -43,7 +48,6 @@ public class OnDemandBlockSnapshotIndexInputTests extends OpenSearchTestCase {
4348
private static final String FILE_NAME = "File_Name";
4449
private static final String BLOCK_FILE_PREFIX = FILE_NAME;
4550
private static final boolean IS_CLONE = false;
46-
private static final ByteSizeValue BYTE_SIZE_VALUE = new ByteSizeValue(1L);
4751
private static final int FILE_SIZE = 29360128;
4852
private TransferManager transferManager;
4953
private LockFactory lockFactory;
@@ -74,7 +78,38 @@ public void test4MBBlock() throws Exception {
7478
runAllTestsFor(22);
7579
}
7680

77-
public void runAllTestsFor(int blockSizeShift) throws Exception {
81+
public void testChunkedRepository() throws IOException {
82+
final long blockSize = new ByteSizeValue(1, ByteSizeUnit.KB).getBytes();
83+
final long repositoryChunkSize = new ByteSizeValue(2, ByteSizeUnit.KB).getBytes();
84+
final long fileSize = new ByteSizeValue(3, ByteSizeUnit.KB).getBytes();
85+
86+
when(transferManager.fetchBlob(any())).thenReturn(new ByteArrayIndexInput("test", new byte[(int) blockSize]));
87+
try (
88+
FSDirectory directory = new MMapDirectory(path, lockFactory);
89+
IndexInput indexInput = new OnDemandBlockSnapshotIndexInput(
90+
OnDemandBlockIndexInput.builder()
91+
.resourceDescription(RESOURCE_DESCRIPTION)
92+
.offset(BLOCK_SNAPSHOT_FILE_OFFSET)
93+
.length(FILE_SIZE)
94+
.blockSizeShift((int) (Math.log(blockSize) / Math.log(2)))
95+
.isClone(IS_CLONE),
96+
new BlobStoreIndexShardSnapshot.FileInfo(
97+
FILE_NAME,
98+
new StoreFileMetadata(FILE_NAME, fileSize, "", Version.LATEST),
99+
new ByteSizeValue(repositoryChunkSize)
100+
),
101+
directory,
102+
transferManager
103+
)
104+
) {
105+
// Seek to the position past the first repository chunk
106+
indexInput.seek(repositoryChunkSize);
107+
}
108+
// Verify the second chunk is requested (i.e. ".part1")
109+
verify(transferManager).fetchBlob(argThat(request -> request.getBlobName().equals("File_Name.part1")));
110+
}
111+
112+
private void runAllTestsFor(int blockSizeShift) throws Exception {
78113
final OnDemandBlockSnapshotIndexInput blockedSnapshotFile = createOnDemandBlockSnapshotIndexInput(blockSizeShift);
79114
final int blockSize = 1 << blockSizeShift;
80115
TestGroup.testGetBlock(blockedSnapshotFile, blockSize, FILE_SIZE);
@@ -106,7 +141,7 @@ private OnDemandBlockSnapshotIndexInput createOnDemandBlockSnapshotIndexInput(in
106141
fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(
107142
FILE_NAME,
108143
new StoreFileMetadata(FILE_NAME, FILE_SIZE, "", Version.LATEST),
109-
BYTE_SIZE_VALUE
144+
null
110145
);
111146

112147
int blockSize = 1 << blockSizeShift;
@@ -182,7 +217,7 @@ private void initBlockFiles(int blockSize, FSDirectory fsDirectory) {
182217

183218
}
184219

185-
public static class TestGroup {
220+
private static class TestGroup {
186221

187222
public static void testGetBlock(OnDemandBlockSnapshotIndexInput blockedSnapshotFile, int blockSize, int fileSize) {
188223
// block 0

0 commit comments

Comments
 (0)