-
Notifications
You must be signed in to change notification settings - Fork 3.5k
PQ: Add support for event-level compression using ZStandard (ZSTD) #18121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The `ackedqueue.SettingsImpl` uses an _immutable_ builder, which makes adding options cumbersome; each additional property added needs to modify code from all existing options. By introducing an api-internal temporary mutable builder, we can simplify the process of creating an immutable copy that has a single component modified.
Adds non-breaking support for event compression to the persisted queue, as configured by a new per-pipeline setting `queue.compression`, which supports: - `none` (default): no compression is performed, but if compressed events are encountered in the queue they will be decompressed - `speed`: compression optimized for speed - `balanced`: compression balancing speed against result size - `size`: compression optimized for maximum reduction of size - `disabled`: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash. To accomplish this, we then provide an abstract base implementation of the CompressionCodec whose decode method is capable of _detecting_ and decoding zstd-encoded payload while letting other payloads through unmodified. The detection is done with an operation on the first four bytes of the payload, so no additional context is needed. An instance of this zstd-aware compression codec is provided with a pass-through encode operation when configured with `queue.compression: none`, which is the default, ensuring that by default logstash is able to decode any event that had previously been written. We provide an additional implementation that is capable of _encoding_ events with a configurable goal: speed, size, or a balance of the two.
🤖 GitHub commentsExpand to view the GitHub comments
Just comment with:
|
🔍 Preview links for changed docs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first pass, minor annotations, going to test this manually now.
settings.getQueueMaxBytes(), settings.getMaxUnread(), settings.getCheckpointMaxAcks(), | ||
settings.getCheckpointMaxWrites(), settings.getCheckpointRetry() | ||
); | ||
return new BuilderImpl(settings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a suggestion, this Builder refactoring could have been a separate PR as it doesn't require the compression settings at all and is still a significant part of the changeset in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason to introduce this change ASAP: yet another parameter is coming in https://github.com/elastic/logstash/pull/18000/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pared off as #18180
byte[] serializedBytes = element.serialize(); | ||
byte[] data = compressionCodec.encode(serializedBytes); | ||
|
||
logger.trace("serialized: {}->{}", serializedBytes.length, data.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest this to be moved to the zstd aware codec, as we don't want a flood of "serialized X -> X" in the trace logs when the noop is used.
import java.util.function.Supplier; | ||
|
||
/** | ||
* A {@link CleanerThreadLocal} is semantically the same as a {@link ThreadLocal}, except that a clean action |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we get this utility out the PR into a separate one if we still want it? right now it's only used by its own tests.
import org.logstash.util.CleanerThreadLocal; | ||
import org.logstash.util.SetOnceReference; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not used in the abstract class
import org.logstash.util.CleanerThreadLocal; | |
import org.logstash.util.SetOnceReference; |
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.lang.ref.Cleaner; | ||
import java.util.zip.DataFormatException; | ||
import java.util.zip.Inflater; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more cleanup:
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.lang.ref.Cleaner; | |
import java.util.zip.DataFormatException; | |
import java.util.zip.Inflater; |
Look at profiling, it seems like with Zstd.compress/decompress the instance spends about nearly 9% of the time doing context initializations: ![]() profile: profile.html Something worth investigating is the use of thread locals for the contexts. |
Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
|
💚 Build Succeeded
History
cc @yaauie |
Release notes
Adds support for event compression in the persisted queue, controlled by the per-pipeline
queue.compression
setting, which defaults tonone
.What does this PR do?
Adds non-breaking support for event compression to the persisted queue, as
configured by a new per-pipeline setting
queue.compression
, which supports:none
(default): no compression is performed, but if compressed events are encountered in the queue they will be decompressedspeed
: compression optimized for speed (minimal overhead, but less compression)balanced
: compression balancing speed against result sizesize
: compression optimized for maximum reduction of size (minimal size, but more resource-intensive)disabled
: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash.This PR does necessary refactors as no-op stand-alone commits to make reviewing more straight-forward. It is best reviewed in commit order.
Why is it important/What is the impact to the user?
Disk IO is often a performance bottleneck when using the PQ. This feature allows users to spend available resources to reduce the size of events on disk, and therefore also the Disk IO.
Checklist
Author's Checklist
How to test this PR locally
example-input.ndjson
with event contents-S
to setqueue.type=persisted
,queue.drain=true
, andqueue.compression=size
:lsq-pagedump
:Related issues
Use cases