Skip to content
Closed
4 changes: 4 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the compression goal. Valid values are `none`, `speed`, `balanced`, and `size`.
# The default `none` is able to decompress previously-written events, even if they were compressed.
#
# queue.compression: none
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
Expand Down
1 change: 1 addition & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ var validSettings = []string{
"queue.checkpoint.acks",
"queue.checkpoint.writes",
"queue.checkpoint.interval", // remove it for #17155
"queue.compression",
"queue.drain",
"dead_letter_queue.enable",
"dead_letter_queue.max_bytes",
Expand Down
1 change: 1 addition & 0 deletions docs/reference/logstash-settings-file.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The `logstash.yml` file includes these settings.
| `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 |
| `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` |
| `queue.compression` | Set a persisted queue compression goal, which allows the pipeline to spend CPU to reduce the serialized size on disk. Acceptable values are `speed`, `balanced`, and `size`. | `none` |
| `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` |
| `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` |
| `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` |
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/persistent-queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref

To avoid losing data in the persistent queue, you can set `queue.checkpoint.writes: 1` to force a checkpoint after each event is written. Keep in mind that disk writes have a resource cost. Setting this value to `1` ensures maximum durability, but can severely impact performance. See [Controlling durability](#durability-persistent-queues) to better understand the trade-offs.

`queue.compression`
: Sets the event compression goal for use with the persisted queue. Default is `none`. Acceptable values include:
* `speed`: optimize for fastest compression operation
* `size`: optimize for smallest size on disk, spending more CPU
* `balanced`: a balance between the `speed` and `size` settings
Comment on lines +84 to +88
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we provide some guidance here on what the users should be able to see in their metrics to decide when to turn this on?

For example, we provide an example of observing mmap operations on multiple consecutive api calls to hot_threads. or/and a combination of low worker utilization and high queue backpressure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll be addressing this with #18107



## Configuration notes [pq-config-notes]
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ module Environment
Setting::SettingNumeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::SettingNumeric.new("queue.checkpoint.interval", 1000), # remove it for #17155
Setting::Boolean.new("queue.checkpoint.retry", true),
Setting::SettingString.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::SettingNumeric.new("dead_letter_queue.flush_interval", 5000),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def self.included(base)
"queue.checkpoint.interval", # remove it for #17155
"queue.checkpoint.writes",
"queue.checkpoint.retry",
"queue.compression",
"queue.drain",
"queue.max_bytes",
"queue.max_events",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@
let(:output_strings) { [] }
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }

let(:queue_settings) do
LogStash::AckedQueue.file_settings_builder(path)
.capacity(page_capacity)
.checkpointMaxAcks(queue_checkpoint_acks)
.checkpointMaxWrites(queue_checkpoint_writes)
.queueMaxBytes(queue_capacity)
.build
end

let(:queue) do
described_class.new(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, false, queue_capacity)
described_class.new(queue_settings)
end

let(:writer_threads) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,18 @@ def threaded_read_client

context "WrappedAckedQueue" do
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) }

let(:queue_settings) do
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.build
Comment on lines +130 to +136
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.build
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.build

end

let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }

before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
Expand Down
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/queue_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
LogStash::Setting::SettingNumeric.new("queue.checkpoint.acks", 1024),
LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
LogStash::Setting::SettingString.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
LogStash::Setting::SettingString.new("pipeline.id", pipeline_id),
LogStash::Setting::PositiveInteger.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
Expand Down
13 changes: 12 additions & 1 deletion logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@
let(:checkpoint_acks) { 1024 }
let(:checkpoint_writes) { 1024 }
let(:path) { Stud::Temporary.directory }
let(:queue) { LogStash::WrappedAckedQueue.new(path, page_capacity, max_events, checkpoint_acks, checkpoint_writes, false, max_bytes) }

let(:queue_settings) do
LogStash::AckedQueue.file_settings_builder(path)
.capacity(page_capacity)
.maxUnread(max_events)
.checkpointMaxAcks(checkpoint_acks)
.checkpointMaxWrites(checkpoint_writes)
.queueMaxBytes(max_bytes)
.build
end

let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }

after do
queue.close
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package org.logstash.ackedqueue;

import org.logstash.util.SetOnceReference;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

/**
* Subclasses of {@link AbstractDeflateAwareCompressionCodec} are {@link CompressionCodec}s that are capable
* of detecting and decompressing deflate-compressed events. When decoding byte sequences that are <em>NOT</em>
* deflate-compressed, the given bytes are emitted verbatim.
*/
abstract class AbstractDeflateAwareCompressionCodec implements CompressionCodec {

static final int BAOS_SHAREABLE_THRESHOLD_BYTES = 4096;

private final ThreadLocal<BufferedInflater> bufferedInflaterThreadLocal;

public AbstractDeflateAwareCompressionCodec() {
this.bufferedInflaterThreadLocal = ThreadLocal.withInitial(BufferedInflater::new);
}

@Override
public byte[] decode(byte[] data) {
if (!isDeflate(data)) {
return data;
}
final BufferedInflater bufferedInflater = bufferedInflaterThreadLocal.get();
try {
return bufferedInflater.decode(data);
} catch (IOException e) {
throw new RuntimeException("IOException while decoding", e);
}
}

static boolean isDeflate(byte[] data) {
if (data.length < 2) { return false; }

// parse first two bytes as big-endian short
short header = (short) (((data[0] & 0xFF) << 8) | (data[1] & 0xFF));

/*
* RFC-1950: ZLIB Compressed Data Format Specification version 3.3
* https://www.ietf.org/rfc/rfc1950.txt
* ┏━━━━ CMF ━━━━━┳━━━━━━━━━━ FLG ━━━━━━━━━━┓
* ┠─CINFO─┬──CM──╂─FLEVEL─┬─FDICT─┬─FCHECK─┨
* ┃ 0XXX │ 1000 ┃ XX │ 0 │ XXXXX ┃
* ┗━━━━━━━┷━━━━━━┻━━━━━━━━┷━━━━━━━┷━━━━━━━━┛
* CINFO: 0XXX // always LTE 7 (7 is 32KB window)
* CM: 1000 // always 8 for deflate
* DFICT: 0 // always unset (no dictionary)
*
*/// 0XXX_1000_XX_0_XXXXX
short mask = (short) 0b1000_1111_00_1_00000; // bits to keep
short flip = (short) 0b0000_1000_00_0_00000; // bits to flip
short goal = (short) 0b0000_0000_00_0_00000; // goal state
if (((header & mask) ^ flip) != goal) {
return false;
}

// additionally the FCHECK ensures that
// the big-endian header is a multiple of 31
return header % 31 == 0;
}

/**
* A {@link BufferedInflater} is a convenience wrapper around the complexities
* of managing an {@link Inflater}, an intermediate {@code byte[]} buffer, and
* a {@link ByteArrayOutputStream}. It enables internal reuse of small buffers
* to reduce allocations.
*/
static class BufferedInflater {
private final Inflater inflater;
private final byte[] intermediateBuffer;
private final SetOnceReference<ByteArrayOutputStream> reusableBaosRef;

public BufferedInflater() {
this.inflater = new Inflater();
this.intermediateBuffer = new byte[1024];
this.reusableBaosRef = SetOnceReference.unset();
}

public byte[] decode(final byte[] data) throws IOException {
final ByteArrayOutputStream baos = getBaos(data.length);
try {
inflater.setInput(data);

do {
if (inflater.needsInput()) {
throw new IOException(String.format("prematurely reached end of encoded value (%s/%s)", inflater.getBytesRead(), inflater.getTotalIn()));
}
try {
int count = inflater.inflate(intermediateBuffer);
baos.write(intermediateBuffer, 0, count);
} catch (DataFormatException e) {
throw new IOException("Failed to decode", e);
}
} while (!inflater.finished());

return baos.toByteArray();
} finally {
inflater.reset();
baos.reset();
}
}

public void release() {
inflater.end();
}

private ByteArrayOutputStream getBaos(final int encodedSize) {
if (encodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) {
return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES));
}
Comment on lines +114 to +116
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: if the thread never encounters a compressed payload that is small enough to safely use a shared (and therefore thread-permanent) BAOS, this is the bit that prevents us from taking on overhead of that shareable BAOS.

return new ByteArrayOutputStream(encodedSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.logstash.ackedqueue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.zip.Deflater;

public interface CompressionCodec {
Logger LOGGER = LogManager.getLogger(CompressionCodec.class);

byte[] encode(byte[] data);
byte[] decode(byte[] data);

/**
* The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that
* does nothing when encoding and decoding. It is only meant to be activated
* as a safety-latch in the event of compression being broken.
*/
CompressionCodec NOOP = new CompressionCodec() {
@Override
public byte[] encode(byte[] data) {
return data;
}

@Override
public byte[] decode(byte[] data) {
return data;
}
};

static CompressionCodec fromConfigValue(final String configValue) {
return fromConfigValue(configValue, LOGGER);
}

static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
return switch (configValue) {
case "disabled" -> {
logger.warn("compression support has been disabled");
yield CompressionCodec.NOOP;
}
case "none" -> {
logger.info("compression support is enabled (read-only)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at a log entry saying "read-only" can confuse users that the PQ is in a read only mode

Suggested change
logger.info("compression support is enabled (read-only)");
logger.info("compression support is enabled (decompression only)");

yield DeflateAwareCompressionCodec.getInstance();
}
case "speed" -> {
logger.info("compression support is enabled (goal: speed)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal is a very comp sci term, and provides a sense of "we'll do our best but it's no promises". I don't think it's necessary to pass this subjectivity to the users, so maybe something like:

Suggested change
logger.info("compression support is enabled (goal: speed)");
logger.info("Compression is enabled - level: \"speed\"");

Or logger.info("Compression level set to: speed");

yield new DeflateEnabledCompressionCodec(Deflater.BEST_SPEED);
}
case "balanced" -> {
logger.info("compression support is enabled (goal: balanced)");
yield new DeflateEnabledCompressionCodec(Deflater.DEFAULT_COMPRESSION);
}
case "size" -> {
logger.info("compression support is enabled (goal: size)");
yield new DeflateEnabledCompressionCodec(Deflater.BEST_COMPRESSION);
}
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.logstash.ackedqueue;

/**
* A {@link DeflateAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed
* bytes, but performs no compression when encoding.
*/
class DeflateAwareCompressionCodec extends AbstractDeflateAwareCompressionCodec {
private static final DeflateAwareCompressionCodec INSTANCE = new DeflateAwareCompressionCodec();

static DeflateAwareCompressionCodec getInstance() {
return INSTANCE;
}

@Override
public byte[] encode(byte[] data) {
return data;
}
}
Loading
Loading