Skip to content

Conversation

yaauie
Copy link
Member

@yaauie yaauie commented Sep 4, 2025

Release notes

Adds support for event compression in the persisted queue, controlled by the per-pipeline queue.compression setting, which defaults to none.

What does this PR do?

Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting queue.compression, which supports:

  • none (default): no compression is performed, but if compressed events are encountered in the queue they will be decompressed
  • speed: compression optimized for speed (minimal overhead, but less compression)
  • balanced: compression balancing speed against result size
  • size: compression optimized for maximum reduction of size (minimal size, but more resource-intensive)
  • disabled: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash.

This PR does necessary refactors as no-op stand-alone commits to make reviewing more straight-forward. It is best reviewed in commit order.

Why is it important/What is the impact to the user?

Disk IO is often a performance bottleneck when using the PQ. This feature allows users to spend available resources to reduce the size of events on disk, and therefore also the Disk IO.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • [ ]

How to test this PR locally

  • Add a ndjson file named example-input.ndjson with event contents
  • Run Logstash with trace-logging enabled, using -S to set queue.type=persisted, queue.drain=true, and queue.compression=size:
    bin/logstash --log.level=trace \
    -Squeue.type=persisted \
    -Squeue.drain=true \
    -Squeue.compression=size \
    --config.string 'input { stdin { codec => json_lines } } output { sink {} }' < example-input.ndjson
    
  • Observe trace logs showing compression and decompression:
    [2025-09-04T22:36:07,055][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 9000->3645
    [2025-09-04T22:36:07,056][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 9448->3838
    [2025-09-04T22:36:07,057][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 7160->2666
    [2025-09-04T22:36:07,059][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 8642->3572
    [2025-09-04T22:36:07,060][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 8714->3739
    [2025-09-04T22:36:07,061][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 8048->3500
    [2025-09-04T22:36:07,063][TRACE][org.logstash.ackedqueue.Queue][main][454a7f73ae57dfec89e89329c9e0eba182f7780fd885c7bf2f17d8afba2bba67] serialized: 10007->3871
    

    ...

    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3594->8060
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3571->8622
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3673->9051
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3538->8343
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3729->8943
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 2683->7460
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3928->10059
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 3563->8329
    [2025-09-04T22:36:09,428][TRACE][org.logstash.ackedqueue.Queue][main] deserialized: 2708->7285
    
  • Inspect the page(s) left behind with lsq-pagedump:
    2099    3851    0EB311A1        page.0  ZSTD(9552)
    2100    3961    D497496F        page.0  ZSTD(10416)
    2101    2667    59F1903D        page.0  ZSTD(6978)
    2102    3677    B442D62D        page.0  ZSTD(9006)
    2103    3748    3EEF1737        page.0  ZSTD(8791)
    2104    2746    DE697FE9        page.0  ZSTD(7903)
    

Related issues

Use cases

  • Constrained or metered disk IO
  • Limited Disk capacity

The `ackedqueue.SettingsImpl` uses an _immutable_ builder, which makes
adding options cumbersome; each additional property added needs to modify
code from all existing options.

By introducing an api-internal temporary mutable builder, we can simplify
the process of creating an immutable copy that has a single component
modified.
Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting `queue.compression`, which supports:

 - `none` (default): no compression is performed, but if compressed events
                     are encountered in the queue they will be decompressed
 - `speed`: compression optimized for speed
 - `balanced`: compression balancing speed against result size
 - `size`: compression optimized for maximum reduction of size
 - `disabled`: compression support entirely disabled; if a pipeline is run
               in this configuration against a PQ that already contains
               unacked compressed events, the pipeline WILL crash.

To accomplish this, we then provide an abstract base implementation of the
CompressionCodec whose decode method is capable of _detecting_ and decoding
zstd-encoded payload while letting other payloads through unmodified.
The detection is done with an operation on the first four bytes of the
payload, so no additional context is needed.

An instance of this zstd-aware compression codec is provided with a
pass-through encode operation when configured with `queue.compression: none`,
which is the default, ensuring that by default logstash is able to decode any
event that had previously been written.

We provide an additional implementation that is capable of _encoding_ events
with a configurable goal: speed, size, or a balance of the two.
@yaauie yaauie added enhancement persistent queues backport-skip Skip automated backport with mergify labels Sep 4, 2025
Copy link
Contributor

github-actions bot commented Sep 4, 2025

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Contributor

github-actions bot commented Sep 4, 2025

🔍 Preview links for changed docs

Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first pass, minor annotations, going to test this manually now.

settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(),
settings.getCheckpointMaxWrites(), settings.getCheckpointRetry()
);
return new BuilderImpl(settings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a suggestion, this Builder refactoring could have been a separate PR as it doesn't require the compression settings at all and is still a significant part of the changeset in this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to introduce this change ASAP: yet another parameter is coming in https://github.com/elastic/logstash/pull/18000/files

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pared off as #18180

byte[] serializedBytes = element.serialize();
byte[] data = compressionCodec.encode(serializedBytes);

logger.trace("serialized: {}->{}", serializedBytes.length, data.length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest this to be moved to the zstd aware codec, as we don't want a flood of "serialized X -> X" in the trace logs when the noop is used.

import java.util.function.Supplier;

/**
* A {@link CleanerThreadLocal} is semantically the same as a {@link ThreadLocal}, except that a clean action
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we get this utility out the PR into a separate one if we still want it? right now it's only used by its own tests.

Comment on lines 4 to 5
import org.logstash.util.CleanerThreadLocal;
import org.logstash.util.SetOnceReference;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not used in the abstract class

Suggested change
import org.logstash.util.CleanerThreadLocal;
import org.logstash.util.SetOnceReference;

Comment on lines 7 to 11
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more cleanup:

Suggested change
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.ref.Cleaner;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

@jsvd
Copy link
Member

jsvd commented Sep 5, 2025

Look at profiling, it seems like with Zstd.compress/decompress the instance spends about nearly 9% of the time doing context initializations:

Screenshot 2025-09-05 at 18 49 12

profile: profile.html

Something worth investigating is the use of thread locals for the contexts.

Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
Copy link

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @yaauie

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip Skip automated backport with mergify enhancement persistent queues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants