Skip to content

Commit 20ad1d8

Browse files
[Remote Store] Add Lock Manager in Remote Segment Store to persist data (#6787)
Signed-off-by: Harish Bhakuni <hbhakuni@amazon.com>
1 parent 96f2ffa commit 20ad1d8

22 files changed

+1297
-39
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.DataInput;
12+
import org.apache.lucene.store.OutputStreamIndexOutput;
13+
import org.opensearch.common.blobstore.BlobContainer;
14+
import org.opensearch.common.io.stream.BytesStreamOutput;
15+
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
19+
/**
20+
* Class for output to a file in a {@link RemoteBufferedOutputDirectory}. This is right now used only for writing locks
21+
* in remote store. in the future, we can use it for other operations as well.
22+
* The current limitation of this is we keep all the file content in memory till we call close(),
23+
* So this class should be used to write small files (in MBs).
24+
* TODO: extend the class to continously write to the store if content size in buffer gets higher than a specific size.
25+
* @see RemoteBufferedOutputDirectory
26+
*
27+
* @opensearch.internal
28+
*/
29+
public class RemoteBufferedIndexOutput extends RemoteIndexOutput {
30+
private final BytesStreamOutput out;
31+
private final OutputStreamIndexOutput indexOutputBuffer;
32+
// visible for testing
33+
static final int BUFFER_SIZE = 4096;
34+
35+
public RemoteBufferedIndexOutput(String name, BlobContainer blobContainer, int bufferSize) {
36+
super(name, blobContainer);
37+
out = new BytesStreamOutput();
38+
indexOutputBuffer = new OutputStreamIndexOutput(name, name, out, bufferSize);
39+
}
40+
41+
public RemoteBufferedIndexOutput(String name, BlobContainer blobContainer) {
42+
this(name, blobContainer, BUFFER_SIZE);
43+
}
44+
45+
// Visible for testing
46+
RemoteBufferedIndexOutput(String name, BlobContainer blobContainer, BytesStreamOutput out, OutputStreamIndexOutput indexOutputBuffer) {
47+
super(name, blobContainer);
48+
this.out = out;
49+
this.indexOutputBuffer = indexOutputBuffer;
50+
}
51+
52+
@Override
53+
public void copyBytes(DataInput input, long numBytes) throws IOException {
54+
indexOutputBuffer.copyBytes(input, numBytes);
55+
}
56+
57+
/**
58+
* when we trigger close() to close the stream, we will first flush the buffer to output stream and then write all
59+
* data to blob container and close the output stream.
60+
*
61+
*/
62+
@Override
63+
public void close() throws IOException {
64+
65+
try (final BytesStreamOutput outStream = out; InputStream stream = out.bytes().streamInput()) {
66+
indexOutputBuffer.close();
67+
blobContainer.writeBlob(getName(), stream, out.bytes().length(), false);
68+
}
69+
70+
}
71+
72+
/**
73+
* This method will write Bytes to the stream we are maintaining.
74+
*
75+
*/
76+
@Override
77+
public void writeByte(byte b) throws IOException {
78+
indexOutputBuffer.writeByte(b);
79+
}
80+
81+
/**
82+
* This method will write a byte array to the stream we are maintaining.
83+
*
84+
*/
85+
@Override
86+
public void writeBytes(byte[] byteArray, int offset, int length) throws IOException {
87+
indexOutputBuffer.writeBytes(byteArray, offset, length);
88+
}
89+
90+
/**
91+
* This method will return the file pointer to the current position in the stream.
92+
*
93+
*/
94+
@Override
95+
public long getFilePointer() {
96+
return indexOutputBuffer.getFilePointer();
97+
}
98+
99+
/**
100+
* This method will return checksum
101+
*
102+
*/
103+
@Override
104+
public long getChecksum() throws IOException {
105+
return indexOutputBuffer.getChecksum();
106+
}
107+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.store;
10+
11+
import org.apache.lucene.store.IOContext;
12+
import org.apache.lucene.store.IndexOutput;
13+
import org.opensearch.common.blobstore.BlobContainer;
14+
15+
/**
16+
* A {@code RemoteBufferedOutputDirectory} is an extension of RemoteDirectory which also provides an abstraction layer
17+
* for storing a list of files to a remote store.
18+
* Additionally, with this implementation, creation of new files is also allowed.
19+
* A remoteDirectory contains only files (no sub-folder hierarchy).
20+
*
21+
* @opensearch.internal
22+
*/
23+
public class RemoteBufferedOutputDirectory extends RemoteDirectory {
24+
public RemoteBufferedOutputDirectory(BlobContainer blobContainer) {
25+
super(blobContainer);
26+
}
27+
28+
@Override
29+
public IndexOutput createOutput(String name, IOContext context) {
30+
return new RemoteBufferedIndexOutput(name, this.blobContainer);
31+
}
32+
}

server/src/main/java/org/opensearch/index/store/RemoteDirectory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
*/
3636
public class RemoteDirectory extends Directory {
3737

38-
private final BlobContainer blobContainer;
38+
protected final BlobContainer blobContainer;
3939

4040
public RemoteDirectory(BlobContainer blobContainer) {
4141
this.blobContainer = blobContainer;

server/src/main/java/org/opensearch/index/store/RemoteIndexOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
*/
2929
public class RemoteIndexOutput extends IndexOutput {
3030

31-
private final BlobContainer blobContainer;
31+
protected final BlobContainer blobContainer;
3232

3333
public RemoteIndexOutput(String name, BlobContainer blobContainer) {
3434
super(name, name);

0 commit comments

Comments
 (0)