Skip to content

Commit 7bd1291

Browse files
authored
[Remote Translog] Use InputStream that supports mark and reset while uploading translog files (opensearch-project#5868)
* Use stream that supports mark and reset for translog upload Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent 6da16e1 commit 7bd1291

File tree

2 files changed

+84
-11
lines changed

2 files changed

+84
-11
lines changed

server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,19 @@
99
package org.opensearch.index.translog.transfer;
1010

1111
import org.opensearch.common.Nullable;
12-
import org.opensearch.common.io.stream.BytesStreamInput;
13-
import org.opensearch.common.io.stream.InputStreamStreamInput;
12+
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
13+
import org.opensearch.common.lucene.store.InputStreamIndexInput;
1414
import org.opensearch.core.internal.io.IOUtils;
15-
import org.opensearch.index.translog.BufferedChecksumStreamInput;
1615

16+
import java.io.BufferedInputStream;
1717
import java.io.Closeable;
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020
import java.nio.channels.Channels;
2121
import java.nio.channels.FileChannel;
2222
import java.nio.file.Path;
2323
import java.nio.file.StandardOpenOption;
24+
import java.util.Arrays;
2425
import java.util.Objects;
2526

2627
/**
@@ -66,11 +67,8 @@ public long getContentLength() throws IOException {
6667

6768
public InputStream inputStream() throws IOException {
6869
return fileChannel != null
69-
? new BufferedChecksumStreamInput(
70-
new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()),
71-
path.toString()
72-
)
73-
: new BufferedChecksumStreamInput(new BytesStreamInput(content), name);
70+
? new BufferedInputStream(Channels.newInputStream(fileChannel))
71+
: new InputStreamIndexInput(new ByteArrayIndexInput(this.name, content), content.length);
7472
}
7573

7674
@Override
@@ -83,9 +81,7 @@ public boolean equals(Object o) {
8381
if (this == o) return true;
8482
if (o == null || getClass() != o.getClass()) return false;
8583
FileSnapshot other = (FileSnapshot) o;
86-
return Objects.equals(this.name, other.name)
87-
&& Objects.equals(this.content, other.content)
88-
&& Objects.equals(this.path, other.path);
84+
return Objects.equals(this.name, other.name) && Arrays.equals(this.content, other.content) && Objects.equals(this.path, other.path);
8985
}
9086

9187
@Override
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.translog.transfer;
10+
11+
import org.junit.After;
12+
import org.opensearch.test.OpenSearchTestCase;
13+
14+
import java.io.IOException;
15+
import java.nio.file.Files;
16+
import java.nio.file.Path;
17+
18+
public class FileSnapshotTests extends OpenSearchTestCase {
19+
20+
FileSnapshot fileSnapshot;
21+
22+
@After
23+
public void tearDown() throws Exception {
24+
super.tearDown();
25+
fileSnapshot.close();
26+
}
27+
28+
public void testFileSnapshotPath() throws IOException {
29+
Path file = createTempFile();
30+
Files.writeString(file, "hello");
31+
fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12);
32+
33+
assertFileSnapshotProperties(file);
34+
35+
try (FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12)) {
36+
assertEquals(sameFileSnapshot, fileSnapshot);
37+
}
38+
39+
try (FileSnapshot sameFileDiffPTSnapshot = new FileSnapshot.TransferFileSnapshot(file, 34)) {
40+
assertNotEquals(sameFileDiffPTSnapshot, fileSnapshot);
41+
}
42+
}
43+
44+
public void testFileSnapshotContent() throws IOException {
45+
Path file = createTempFile();
46+
Files.writeString(file, "hello");
47+
fileSnapshot = new FileSnapshot.TransferFileSnapshot(file.getFileName().toString(), Files.readAllBytes(file), 23);
48+
49+
assertFileSnapshotProperties(file);
50+
51+
try (
52+
FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(
53+
file.getFileName().toString(),
54+
Files.readAllBytes(file),
55+
23
56+
)
57+
) {
58+
assertEquals(sameFileSnapshot, fileSnapshot);
59+
}
60+
61+
try (
62+
FileSnapshot anotherFileSnapshot = new FileSnapshot.TransferFileSnapshot(
63+
file.getFileName().toString(),
64+
Files.readAllBytes(createTempFile()),
65+
23
66+
)
67+
) {
68+
assertNotEquals(anotherFileSnapshot, fileSnapshot);
69+
}
70+
}
71+
72+
private void assertFileSnapshotProperties(Path file) throws IOException {
73+
assertEquals(file.getFileName().toString(), fileSnapshot.getName());
74+
assertEquals(Files.size(file), fileSnapshot.getContentLength());
75+
assertTrue(fileSnapshot.inputStream().markSupported());
76+
}
77+
}

0 commit comments

Comments
 (0)