Skip to content

Commit 4df347c

Browse files
Add ZSTD compression for snapshotting (#2996)
Changes: - Added ZSTD compressor for snapshotting - 2 JSON repository settings: - readonly - compression were moved into the BlobStoreRepository class and removed from other repos classes where they were used. Signed-off-by: Andrey Pleskach <ples@aiven.io>
1 parent e3740f7 commit 4df347c

File tree

30 files changed

+729
-459
lines changed

30 files changed

+729
-459
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4141
- Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.3.0
4242
- Bump `com.diffplug.spotless` from 6.17.0 to 6.18.0
4343
- Bump `io.opencensus:opencensus-api` from 0.18.0 to 0.31.1 ([#7291](https://github.com/opensearch-project/OpenSearch/pull/7291))
44+
- Add `com.github.luben:zstd-jni` version 1.5.5-3 ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))
4445

4546
### Changed
4647
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
@@ -50,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5051
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
5152
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
5253
- Reduce memory copy in zstd compression ([#7681](https://github.com/opensearch-project/OpenSearch/pull/7681))
54+
- Add ZSTD compression for snapshotting ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996))
5355

5456
### Deprecated
5557

buildSrc/version.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,6 @@ bytebuddy = 1.14.3
5757

5858
# benchmark dependencies
5959
jmh = 1.35
60+
61+
# compression
62+
zstd = 1.5.5-3

distribution/tools/plugin-cli/build.gradle

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,6 @@ thirdPartyAudit.ignoreViolations(
8282
)
8383

8484
thirdPartyAudit.ignoreMissingClasses(
85-
'com.github.luben.zstd.BufferPool',
86-
'com.github.luben.zstd.ZstdInputStream',
87-
'com.github.luben.zstd.ZstdOutputStream',
8885
'org.brotli.dec.BrotliInputStream',
8986
'org.objectweb.asm.AnnotationVisitor',
9087
'org.objectweb.asm.Attribute',

modules/transport-netty4/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ thirdPartyAudit {
185185
'org.slf4j.LoggerFactory',
186186
'org.slf4j.spi.LocationAwareLogger',
187187

188-
'com.github.luben.zstd.Zstd',
189188
'com.google.protobuf.nano.CodedOutputByteBufferNano',
190189
'com.google.protobuf.nano.MessageNano',
191190
'com.jcraft.jzlib.Deflater',

plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ public static final class Repository {
100100
MAX_CHUNK_SIZE,
101101
Property.NodeScope
102102
);
103-
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope);
104-
public static final Setting<Boolean> READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope);
105103
}
106104

107105
private final BlobPath basePath;
@@ -118,7 +116,7 @@ public AzureRepository(
118116
) {
119117
super(
120118
metadata,
121-
Repository.COMPRESS_SETTING.get(metadata.settings()),
119+
COMPRESS_SETTING.get(metadata.settings()),
122120
namedXContentRegistry,
123121
clusterService,
124122
recoverySettings,
@@ -142,8 +140,8 @@ public AzureRepository(
142140
// If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting.
143141
// For secondary_only setting, the repository should be read only
144142
final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
145-
if (Repository.READONLY_SETTING.exists(metadata.settings())) {
146-
this.readonly = Repository.READONLY_SETTING.get(metadata.settings());
143+
if (READONLY_SETTING.exists(metadata.settings())) {
144+
this.readonly = READONLY_SETTING.get(metadata.settings());
147145
} else {
148146
this.readonly = locationMode == LocationMode.SECONDARY_ONLY;
149147
}

plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import java.util.function.Function;
5151

5252
import static org.opensearch.common.settings.Setting.Property;
53-
import static org.opensearch.common.settings.Setting.boolSetting;
5453
import static org.opensearch.common.settings.Setting.byteSizeSetting;
5554
import static org.opensearch.common.settings.Setting.simpleString;
5655

@@ -70,7 +69,6 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
7069

7170
static final Setting<String> BUCKET = simpleString("bucket", Property.NodeScope, Property.Dynamic);
7271
static final Setting<String> BASE_PATH = simpleString("base_path", Property.NodeScope, Property.Dynamic);
73-
static final Setting<Boolean> COMPRESS = boolSetting("compress", false, Property.NodeScope, Property.Dynamic);
7472
static final Setting<ByteSizeValue> CHUNK_SIZE = byteSizeSetting(
7573
"chunk_size",
7674
MAX_CHUNK_SIZE,
@@ -94,7 +92,14 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
9492
final ClusterService clusterService,
9593
final RecoverySettings recoverySettings
9694
) {
97-
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
95+
super(
96+
metadata,
97+
getSetting(COMPRESS_SETTING, metadata),
98+
namedXContentRegistry,
99+
clusterService,
100+
recoverySettings,
101+
buildLocation(metadata)
102+
);
98103
this.storageService = storageService;
99104

100105
String basePath = BASE_PATH.get(metadata.settings());

plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public HdfsRepository(
8383
final ClusterService clusterService,
8484
final RecoverySettings recoverySettings
8585
) {
86-
super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService, recoverySettings);
86+
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);
8787

8888
this.environment = environment;
8989
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,6 @@ class S3Repository extends MeteredBlobStoreRepository {
156156
new ByteSizeValue(5, ByteSizeUnit.TB)
157157
);
158158

159-
/**
160-
* When set to true metadata files are stored in compressed format. This setting doesn’t affect index
161-
* files that are already compressed by default. Defaults to false.
162-
*/
163-
static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", false);
164-
165159
/**
166160
* Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
167161
* standard_ia, onezone_ia and intelligent_tiering. Defaults to standard.

plugins/transport-nio/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ thirdPartyAudit {
111111
'org.slf4j.LoggerFactory',
112112
'org.slf4j.spi.LocationAwareLogger',
113113

114-
'com.github.luben.zstd.Zstd',
115114
'com.google.protobuf.nano.CodedOutputByteBufferNano',
116115
'com.google.protobuf.nano.MessageNano',
117116
'com.jcraft.jzlib.Deflater',

sandbox/plugins/custom-codecs/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ opensearchplugin {
2121
}
2222

2323
dependencies {
24-
api "com.github.luben:zstd-jni:1.5.5-1"
24+
api "com.github.luben:zstd-jni:${versions.zstd}"
2525
}
2626

2727
yamlRestTest.enabled = false;

sandbox/plugins/custom-codecs/licenses/zstd-jni-1.5.5-1.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.

server/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ dependencies {
150150
api "com.google.protobuf:protobuf-java:${versions.protobuf}"
151151
api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}"
152152

153+
//zstd
154+
api "com.github.luben:zstd-jni:${versions.zstd}"
155+
153156
testImplementation(project(":test:framework")) {
154157
// tests use the locally compiled version of server
155158
exclude group: 'org.opensearch', module: 'server'
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
488dd9b15c9e8cf87d857f65f5cd6359c2853381

server/src/main/java/org/opensearch/common/compress/CompressorFactory.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,14 @@
4747
*/
4848
public class CompressorFactory {
4949

50-
public static final Compressor COMPRESSOR = new DeflateCompressor();
50+
public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor();
51+
52+
@Deprecated
53+
public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR;
54+
55+
public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor();
56+
57+
public static final Compressor NONE_COMPRESSOR = new NoneCompressor();
5158

5259
public static boolean isCompressed(BytesReference bytes) {
5360
return compressor(bytes) != null;
@@ -61,6 +68,9 @@ public static Compressor compressor(BytesReference bytes) {
6168
// as a xcontent, we have a problem
6269
assert XContentHelper.xContentType(bytes) == null;
6370
return COMPRESSOR;
71+
} else if (ZSTD_COMPRESSOR.isCompressed(bytes)) {
72+
assert XContentHelper.xContentType(bytes) == null;
73+
return ZSTD_COMPRESSOR;
6474
}
6575

6676
XContentType contentType = XContentHelper.xContentType(bytes);
@@ -81,7 +91,6 @@ private static boolean isAncient(BytesReference bytes) {
8191

8292
/**
8393
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
84-
* @throws NullPointerException a NullPointerException will be thrown when bytes is null
8594
*/
8695
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
8796
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.compress;
10+
11+
/**
12+
* Supported compression types
13+
*
14+
* @opensearch.internal
15+
*/
16+
public enum CompressorType {
17+
18+
DEFLATE {
19+
@Override
20+
public Compressor compressor() {
21+
return CompressorFactory.DEFLATE_COMPRESSOR;
22+
}
23+
},
24+
25+
ZSTD {
26+
@Override
27+
public Compressor compressor() {
28+
return CompressorFactory.ZSTD_COMPRESSOR;
29+
}
30+
},
31+
32+
NONE {
33+
@Override
34+
public Compressor compressor() {
35+
return CompressorFactory.NONE_COMPRESSOR;
36+
}
37+
};
38+
39+
public abstract Compressor compressor();
40+
}

server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,9 @@ public InputStream threadLocalInputStream(InputStream in) throws IOException {
157157
* @return decompressing stream
158158
*/
159159
public static InputStream inputStream(InputStream in, boolean threadLocal) throws IOException {
160-
final byte[] headerBytes = new byte[HEADER.length];
161-
int len = 0;
162-
while (len < headerBytes.length) {
163-
final int read = in.read(headerBytes, len, headerBytes.length - len);
164-
if (read == -1) {
165-
break;
166-
}
167-
len += read;
168-
}
169-
if (len != HEADER.length || Arrays.equals(headerBytes, HEADER) == false) {
160+
final byte[] header = in.readNBytes(HEADER.length);
161+
162+
if (Arrays.equals(header, HEADER) == false) {
170163
throw new IllegalArgumentException("Input stream is not compressed with DEFLATE!");
171164
}
172165

@@ -252,9 +245,11 @@ public BytesReference uncompress(BytesReference bytesReference) throws IOExcepti
252245
} finally {
253246
inflater.reset();
254247
}
255-
final BytesReference res = buffer.copyBytes();
256-
buffer.reset();
257-
return res;
248+
try {
249+
return buffer.copyBytes();
250+
} finally {
251+
buffer.reset();
252+
}
258253
}
259254

260255
// Reusable Deflater reference. Note: This is a separate instance from the one used for the compressing stream wrapper because we
@@ -271,8 +266,10 @@ public BytesReference compress(BytesReference bytesReference) throws IOException
271266
} finally {
272267
deflater.reset();
273268
}
274-
final BytesReference res = buffer.copyBytes();
275-
buffer.reset();
276-
return res;
269+
try {
270+
return buffer.copyBytes();
271+
} finally {
272+
buffer.reset();
273+
}
277274
}
278275
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.compress;
10+
11+
import org.opensearch.common.bytes.BytesReference;
12+
13+
import java.io.IOException;
14+
import java.io.InputStream;
15+
import java.io.OutputStream;
16+
17+
/**
18+
* {@link Compressor} no compressor implementation.
19+
*
20+
* @opensearch.internal
21+
*/
22+
public class NoneCompressor implements Compressor {
23+
@Override
24+
public boolean isCompressed(BytesReference bytes) {
25+
return false;
26+
}
27+
28+
@Override
29+
public int headerLength() {
30+
return 0;
31+
}
32+
33+
@Override
34+
public InputStream threadLocalInputStream(InputStream in) throws IOException {
35+
return in;
36+
}
37+
38+
@Override
39+
public OutputStream threadLocalOutputStream(OutputStream out) throws IOException {
40+
return out;
41+
}
42+
43+
@Override
44+
public BytesReference uncompress(BytesReference bytesReference) throws IOException {
45+
return bytesReference;
46+
}
47+
48+
@Override
49+
public BytesReference compress(BytesReference bytesReference) throws IOException {
50+
return bytesReference;
51+
}
52+
53+
}

0 commit comments

Comments
 (0)