Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the compression goal. Valid values are `none`, `speed`, `balanced`, and `size`.
# The default `none` is able to decompress previously-written events, even if they were compressed.
#
# queue.compression: none
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
Expand Down
1 change: 1 addition & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var validSettings = []string{
"queue.checkpoint.acks",
"queue.checkpoint.writes",
"queue.checkpoint.interval", // remove it for #17155
"queue.compression",
"queue.drain",
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
Expand Down
1 change: 1 addition & 0 deletions docs/reference/logstash-settings-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The `logstash.yml` file includes these settings.
| `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` |
| `queue.compression` {applies_to}`stack: ga 9.2` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
| `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` |
| `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` |
| `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` |
Expand Down
11 changes: 11 additions & 0 deletions docs/reference/persistent-queues.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
---

Check notice on line 1 in docs/reference/persistent-queues.md

View workflow job for this annotation

GitHub Actions / docs-preview / build

Irregular whitespace character detected: U+200B (Zero Width Space (ZWSP)). This may impair Markdown rendering.
mapped_pages:
- https://www.elastic.co/guide/en/logstash/current/persistent-queues.html
---
Expand Down Expand Up @@ -84,6 +84,17 @@
`queue.checkpoint.interval` {applies_to}`stack: deprecated 9.1`
: Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints.

`queue.compression` {applies_to}`stack: ga 9.2`
: Sets the event compression level for use with the Persisted Queue. Default is `none`. Possible values are:
* `none`: does not perform compression, but reads compressed events
* `speed`: optimize for fastest compression operation
* `size`: optimize for smallest possible size on disk, spending more CPU
* `balanced`: a balance between the `speed` and `size` settings
:::{important}
Compression can be enabled for an existing PQ, but once compressed elements have been added to a PQ, that PQ cannot be read by previous Logstash releases that did not support compression.
If you need to downgrade Logstash after enabling the PQ, you will need to either delete the PQ or run the pipeline with `queue.drain: true` first to ensure that no compressed elements remain.
:::

## Configuration notes [pq-config-notes]

Every situation and environment is different, and the "ideal" configuration varies. If you optimize for performance, you may increase your risk of losing data. If you optimize for data protection, you may impact performance.
Expand Down
1 change: 1 addition & 0 deletions logstash-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ dependencies {
implementation 'commons-codec:commons-codec:1.17.0' // transitively required by httpclient
// Jackson version moved to versions.yml in the project root (the JrJackson version is there too)
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
implementation "com.github.luben:zstd-jni:1.5.7-4"
api "com.fasterxml.jackson.core:jackson-databind:${jacksonDatabindVersion}"
api "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
implementation 'org.codehaus.janino:janino:3.1.0'
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def self.as_java_range(r)
Setting::NumericSetting.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::NumericSetting.new("queue.checkpoint.interval", 1000), # remove it for #17155
Setting::BooleanSetting.new("queue.checkpoint.retry", true),
Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
Setting::BooleanSetting.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def self.included(base)
"queue.checkpoint.interval", # remove it for #17155
"queue.checkpoint.writes",
"queue.checkpoint.retry",
"queue.compression",
"queue.drain",
"queue.max_bytes",
"queue.max_events",
Expand Down
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/queue_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
LogStash::Setting::NumericSetting.new("queue.checkpoint.acks", 1024),
LogStash::Setting::NumericSetting.new("queue.checkpoint.writes", 1024),
LogStash::Setting::BooleanSetting.new("queue.checkpoint.retry", false),
LogStash::Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
LogStash::Setting::StringSetting.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.logstash.ackedqueue;

import com.github.luben.zstd.Zstd;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Subclasses of {@link AbstractZstdAwareCompressionCodec} are {@link CompressionCodec}s that are capable
* of detecting and decompressing deflate-compressed events. When decoding byte sequences that are <em>NOT</em>
* deflate-compressed, the given bytes are emitted verbatim.
*/
abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec {
// log from the concrete class
protected final Logger logger = LogManager.getLogger(this.getClass());

@Override
public byte[] decode(byte[] data) {
if (!isZstd(data)) {
return data;
}
try {
final byte[] decoded = Zstd.decompress(data);
logger.trace("decoded {} -> {}", data.length, decoded.length);
return decoded;
} catch (Exception e) {
throw new RuntimeException("Exception while decoding", e);
}
}

private static final byte[] ZSTD_FRAME_MAGIC = { (byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD };

static boolean isZstd(byte[] data) {
if (data.length < 4) { return false; }

for (int i = 0; i < 4; i++) {
if (data[i] != ZSTD_FRAME_MAGIC[i]) { return false; }
}

return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.logstash.ackedqueue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public interface CompressionCodec {
Logger LOGGER = LogManager.getLogger(CompressionCodec.class);

byte[] encode(byte[] data);
byte[] decode(byte[] data);

/**
* The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that
* does nothing when encoding and decoding. It is only meant to be activated
* as a safety-latch in the event of compression being broken.
*/
CompressionCodec NOOP = new CompressionCodec() {
@Override
public byte[] encode(byte[] data) {
return data;
}

@Override
public byte[] decode(byte[] data) {
return data;
}
};

static CompressionCodec fromConfigValue(final String configValue) {
return fromConfigValue(configValue, LOGGER);
}

static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
return switch (configValue) {
case "disabled" -> {
logger.warn("compression support has been disabled");
yield CompressionCodec.NOOP;
}
case "none" -> {
logger.info("compression support is enabled (read-only)");
yield ZstdAwareCompressionCodec.getInstance();
}
case "speed" -> {
logger.info("compression support is enabled (goal: speed)");
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED);
}
case "balanced" -> {
logger.info("compression support is enabled (goal: balanced)");
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED);
}
case "size" -> {
logger.info("compression support is enabled (goal: size)");
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE);
}
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public final class Queue implements Closeable {
// deserialization
private final Class<? extends Queueable> elementClass;
private final Method deserializeMethod;
private final CompressionCodec compressionCodec;

// thread safety
private final ReentrantLock lock = new ReentrantLock();
Expand Down Expand Up @@ -112,6 +113,7 @@ public Queue(Settings settings) {
this.maxBytes = settings.getQueueMaxBytes();
this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry());
this.elementClass = settings.getElementClass();
this.compressionCodec = settings.getCompressionCodec();
this.tailPages = new ArrayList<>();
this.unreadTailPages = new ArrayList<>();
this.closed = new AtomicBoolean(true); // not yet opened
Expand Down Expand Up @@ -414,7 +416,8 @@ public long write(Queueable element) throws IOException {
throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE);
}

byte[] data = element.serialize();
byte[] serializedBytes = element.serialize();
byte[] data = compressionCodec.encode(serializedBytes);

// the write strategy with regard to the isFull() state is to assume there is space for this element
// and write it, then after write verify if we just filled the queue and wait on the notFull condition
Expand Down Expand Up @@ -767,7 +770,8 @@ public CheckpointIO getCheckpointIO() {
*/
public Queueable deserialize(byte[] bytes) {
try {
return (Queueable)this.deserializeMethod.invoke(this.elementClass, bytes);
byte[] decodedBytes = compressionCodec.decode(bytes);
return (Queueable)this.deserializeMethod.invoke(this.elementClass, decodedBytes);
} catch (IllegalAccessException|InvocationTargetException e) {
throw new QueueRuntimeException("deserialize invocation error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
Expand Down Expand Up @@ -63,6 +66,8 @@ public final class QueueFactoryExt extends RubyBasicObject {

private static final long serialVersionUID = 1L;

private static final Logger LOGGER = LogManager.getLogger(QueueFactoryExt.class);

public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
Expand Down Expand Up @@ -123,6 +128,13 @@ private static Settings extractQueueSettings(final IRubyObject settings) {
.checkpointMaxAcks(getSetting(context, settings, QUEUE_CHECKPOINT_ACKS).toJava(Integer.class))
.checkpointRetry(getSetting(context, settings, QUEUE_CHECKPOINT_RETRY).isTrue())
.queueMaxBytes(getSetting(context, settings, QUEUE_MAX_BYTES).toJava(Integer.class))
.compressionCodec(extractConfiguredCodec(settings))
.build();
}

private static CompressionCodec extractConfiguredCodec(final IRubyObject settings) {
final ThreadContext context = settings.getRuntime().getCurrentContext();
final String compressionSetting = getSetting(context, settings, QUEUE_COMPRESSION).asJavaString();
return CompressionCodec.fromConfigValue(compressionSetting, LOGGER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface Settings {

boolean getCheckpointRetry();

CompressionCodec getCompressionCodec();

/**
* Validate and return the settings, or throw descriptive {@link QueueRuntimeException}
* @param settings the settings to validate
Expand Down Expand Up @@ -89,6 +91,8 @@ interface Builder {

Builder checkpointRetry(boolean checkpointRetry);

Builder compressionCodec(CompressionCodec compressionCodec);

Settings build();

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class SettingsImpl implements Settings {
private final int checkpointMaxAcks;
private final int checkpointMaxWrites;
private final boolean checkpointRetry;
private final CompressionCodec compressionCodec;

public static Builder builder(final Settings settings) {
return new BuilderImpl(settings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion, this Builder refactoring could have been a separate PR as it doesn't require the compression settings at all and is still a significant part of the changeset in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to introduce this change ASAP: yet another parameter is coming in https://github.com/elastic/logstash/pull/18000/files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pared off as #18180

Expand All @@ -49,6 +50,7 @@ private SettingsImpl(final BuilderImpl builder) {
this.checkpointMaxAcks = builder.checkpointMaxAcks;
this.checkpointMaxWrites = builder.checkpointMaxWrites;
this.checkpointRetry = builder.checkpointRetry;
this.compressionCodec = builder.compressionCodec;
}

@Override
Expand Down Expand Up @@ -91,6 +93,11 @@ public boolean getCheckpointRetry() {
return this.checkpointRetry;
}

@Override
public CompressionCodec getCompressionCodec() {
return this.compressionCodec;
}

/**
* Default implementation for Setting's Builder
* */
Expand Down Expand Up @@ -140,6 +147,8 @@ private static final class BuilderImpl implements Builder {

private boolean checkpointRetry;

private CompressionCodec compressionCodec;

private BuilderImpl(final String dirForFiles) {
this.dirForFiles = dirForFiles;
this.elementClass = null;
Expand All @@ -148,6 +157,7 @@ private BuilderImpl(final String dirForFiles) {
this.maxUnread = DEFAULT_MAX_UNREAD;
this.checkpointMaxAcks = DEFAULT_CHECKPOINT_MAX_ACKS;
this.checkpointMaxWrites = DEFAULT_CHECKPOINT_MAX_WRITES;
this.compressionCodec = CompressionCodec.NOOP;
this.checkpointRetry = false;
}

Expand All @@ -160,6 +170,7 @@ private BuilderImpl(final Settings settings) {
this.checkpointMaxAcks = settings.getCheckpointMaxAcks();
this.checkpointMaxWrites = settings.getCheckpointMaxWrites();
this.checkpointRetry = settings.getCheckpointRetry();
this.compressionCodec = settings.getCompressionCodec();
}

@Override
Expand Down Expand Up @@ -204,6 +215,12 @@ public Builder checkpointRetry(final boolean checkpointRetry) {
return this;
}

@Override
public Builder compressionCodec(CompressionCodec compressionCodec) {
this.compressionCodec = compressionCodec;
return this;
}

@Override
public Settings build() {
return Settings.ensureValid(new SettingsImpl(this));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.logstash.ackedqueue;

/**
* A {@link ZstdAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed
* bytes, but performs no compression when encoding.
*/
class ZstdAwareCompressionCodec extends AbstractZstdAwareCompressionCodec {
private static final ZstdAwareCompressionCodec INSTANCE = new ZstdAwareCompressionCodec();

static ZstdAwareCompressionCodec getInstance() {
return INSTANCE;
}

@Override
public byte[] encode(byte[] data) {
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.logstash.ackedqueue;

import com.github.luben.zstd.Zstd;

/**
* A {@link ZstdEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed
* bytes and performs deflate compression when encoding.
*/
class ZstdEnabledCompressionCodec extends AbstractZstdAwareCompressionCodec implements CompressionCodec {
public enum Goal {
FASTEST(-7),
SPEED(-1),
BALANCED(3),
HIGH(14),
SIZE(22),
;

private int internalLevel;

Goal(final int internalLevel) {
this.internalLevel = internalLevel;
}
}

private final int internalLevel;

ZstdEnabledCompressionCodec(final Goal internalLevel) {
this.internalLevel = internalLevel.internalLevel;
}

@Override
public byte[] encode(byte[] data) {
try {
final byte[] encoded = Zstd.compress(data, internalLevel);
logger.trace("encoded {} -> {}", data.length, encoded.length);
return encoded;
} catch (Exception e) {
throw new RuntimeException("Exception while encoding", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ public class SettingKeyDefinitions {
public static final String QUEUE_CHECKPOINT_RETRY = "queue.checkpoint.retry";

public static final String QUEUE_MAX_BYTES = "queue.max_bytes";

public static final String QUEUE_COMPRESSION = "queue.compression";
}
Loading
Loading