-
Notifications
You must be signed in to change notification settings - Fork 3.5k
PQ: Add support for event-level compression using ZStandard (ZSTD) #18121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f47aca2
39c0b5c
1ed5530
a480ebf
772da72
b368e49
e90a425
31c69d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package org.logstash.ackedqueue; | ||
|
||
import com.github.luben.zstd.Zstd; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
/** | ||
* Subclasses of {@link AbstractZstdAwareCompressionCodec} are {@link CompressionCodec}s that are capable | ||
* of detecting and decompressing deflate-compressed events. When decoding byte sequences that are <em>NOT</em> | ||
* deflate-compressed, the given bytes are emitted verbatim. | ||
*/ | ||
abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec { | ||
// log from the concrete class | ||
protected final Logger logger = LogManager.getLogger(this.getClass()); | ||
|
||
@Override | ||
public byte[] decode(byte[] data) { | ||
if (!isZstd(data)) { | ||
return data; | ||
} | ||
try { | ||
final byte[] decoded = Zstd.decompress(data); | ||
logger.trace("decoded {} -> {}", data.length, decoded.length); | ||
return decoded; | ||
} catch (Exception e) { | ||
throw new RuntimeException("Exception while decoding", e); | ||
} | ||
} | ||
|
||
private static final byte[] ZSTD_FRAME_MAGIC = { (byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD }; | ||
|
||
static boolean isZstd(byte[] data) { | ||
if (data.length < 4) { return false; } | ||
|
||
for (int i = 0; i < 4; i++) { | ||
if (data[i] != ZSTD_FRAME_MAGIC[i]) { return false; } | ||
} | ||
|
||
return true; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package org.logstash.ackedqueue; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
public interface CompressionCodec { | ||
Logger LOGGER = LogManager.getLogger(CompressionCodec.class); | ||
|
||
byte[] encode(byte[] data); | ||
byte[] decode(byte[] data); | ||
|
||
/** | ||
* The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that | ||
* does nothing when encoding and decoding. It is only meant to be activated | ||
* as a safety-latch in the event of compression being broken. | ||
*/ | ||
CompressionCodec NOOP = new CompressionCodec() { | ||
@Override | ||
public byte[] encode(byte[] data) { | ||
return data; | ||
} | ||
|
||
@Override | ||
public byte[] decode(byte[] data) { | ||
return data; | ||
} | ||
}; | ||
|
||
static CompressionCodec fromConfigValue(final String configValue) { | ||
return fromConfigValue(configValue, LOGGER); | ||
} | ||
|
||
static CompressionCodec fromConfigValue(final String configValue, final Logger logger) { | ||
return switch (configValue) { | ||
case "disabled" -> { | ||
logger.warn("compression support has been disabled"); | ||
yield CompressionCodec.NOOP; | ||
} | ||
case "none" -> { | ||
logger.info("compression support is enabled (read-only)"); | ||
yield ZstdAwareCompressionCodec.getInstance(); | ||
} | ||
case "speed" -> { | ||
logger.info("compression support is enabled (goal: speed)"); | ||
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED); | ||
} | ||
case "balanced" -> { | ||
logger.info("compression support is enabled (goal: balanced)"); | ||
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED); | ||
} | ||
case "size" -> { | ||
logger.info("compression support is enabled (goal: size)"); | ||
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE); | ||
} | ||
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue)); | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ public class SettingsImpl implements Settings { | |
private final int checkpointMaxAcks; | ||
private final int checkpointMaxWrites; | ||
private final boolean checkpointRetry; | ||
private final CompressionCodec compressionCodec; | ||
|
||
public static Builder builder(final Settings settings) { | ||
return new BuilderImpl(settings); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a suggestion, this Builder refactoring could have been a separate PR as it doesn't require the compression settings at all and is still a significant part of the changeset in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another reason to introduce this change ASAP: yet another parameter is coming in https://github.com/elastic/logstash/pull/18000/files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pared off as #18180 |
||
|
@@ -49,6 +50,7 @@ private SettingsImpl(final BuilderImpl builder) { | |
this.checkpointMaxAcks = builder.checkpointMaxAcks; | ||
this.checkpointMaxWrites = builder.checkpointMaxWrites; | ||
this.checkpointRetry = builder.checkpointRetry; | ||
this.compressionCodec = builder.compressionCodec; | ||
} | ||
|
||
@Override | ||
|
@@ -91,6 +93,11 @@ public boolean getCheckpointRetry() { | |
return this.checkpointRetry; | ||
} | ||
|
||
@Override | ||
public CompressionCodec getCompressionCodec() { | ||
return this.compressionCodec; | ||
} | ||
|
||
/** | ||
* Default implementation for Setting's Builder | ||
* */ | ||
|
@@ -140,6 +147,8 @@ private static final class BuilderImpl implements Builder { | |
|
||
private boolean checkpointRetry; | ||
|
||
private CompressionCodec compressionCodec; | ||
|
||
private BuilderImpl(final String dirForFiles) { | ||
this.dirForFiles = dirForFiles; | ||
this.elementClass = null; | ||
|
@@ -148,6 +157,7 @@ private BuilderImpl(final String dirForFiles) { | |
this.maxUnread = DEFAULT_MAX_UNREAD; | ||
this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS; | ||
this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES; | ||
this.compressionCodec = CompressionCodec.NOOP; | ||
this.checkpointRetry = false; | ||
} | ||
|
||
|
@@ -160,6 +170,7 @@ private BuilderImpl(final Settings settings) { | |
this.checkpointMaxAcks = settings.getCheckpointMaxAcks(); | ||
this.checkpointMaxWrites = settings.getCheckpointMaxWrites(); | ||
this.checkpointRetry = settings.getCheckpointRetry(); | ||
this.compressionCodec = settings.getCompressionCodec(); | ||
} | ||
|
||
@Override | ||
|
@@ -204,6 +215,12 @@ public Builder checkpointRetry(final boolean checkpointRetry) { | |
return this; | ||
} | ||
|
||
@Override | ||
public Builder compressionCodec(CompressionCodec compressionCodec) { | ||
this.compressionCodec = compressionCodec; | ||
return this; | ||
} | ||
|
||
@Override | ||
public Settings build() { | ||
return Settings.ensureValid(new SettingsImpl(this)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package org.logstash.ackedqueue; | ||
|
||
/** | ||
* A {@link ZstdAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed | ||
* bytes, but performs no compression when encoding. | ||
*/ | ||
class ZstdAwareCompressionCodec extends AbstractZstdAwareCompressionCodec { | ||
private static final ZstdAwareCompressionCodec INSTANCE = new ZstdAwareCompressionCodec(); | ||
|
||
static ZstdAwareCompressionCodec getInstance() { | ||
return INSTANCE; | ||
} | ||
|
||
@Override | ||
public byte[] encode(byte[] data) { | ||
return data; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package org.logstash.ackedqueue; | ||
|
||
import com.github.luben.zstd.Zstd; | ||
|
||
/** | ||
* A {@link ZstdEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed | ||
* bytes and performs deflate compression when encoding. | ||
*/ | ||
class ZstdEnabledCompressionCodec extends AbstractZstdAwareCompressionCodec implements CompressionCodec { | ||
public enum Goal { | ||
FASTEST(-7), | ||
SPEED(-1), | ||
BALANCED(3), | ||
HIGH(14), | ||
SIZE(22), | ||
; | ||
|
||
private int internalLevel; | ||
|
||
Goal(final int internalLevel) { | ||
this.internalLevel = internalLevel; | ||
} | ||
} | ||
|
||
private final int internalLevel; | ||
|
||
ZstdEnabledCompressionCodec(final Goal internalLevel) { | ||
this.internalLevel = internalLevel.internalLevel; | ||
} | ||
|
||
@Override | ||
public byte[] encode(byte[] data) { | ||
try { | ||
final byte[] encoded = Zstd.compress(data, internalLevel); | ||
logger.trace("encoded {} -> {}", data.length, encoded.length); | ||
return encoded; | ||
} catch (Exception e) { | ||
throw new RuntimeException("Exception while encoding", e); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.