-
Notifications
You must be signed in to change notification settings - Fork 3.5k
PQ: Add support for event-level compression using deflate #17959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ddd9fb2
c48121c
826628c
3ff73ce
ad0632a
8fe0ccb
3b086ff
eb55a84
39d7ac2
13c5773
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -125,7 +125,18 @@ def threaded_read_client | |||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
context "WrappedAckedQueue" do | ||||||||||||||||||||||||||||||
let(:path) { Stud::Temporary.directory } | ||||||||||||||||||||||||||||||
let(:queue) { LogStash::WrappedAckedQueue.new(path, 1024, 10, 1024, 1024, false, 4096) } | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
let(:queue_settings) do | ||||||||||||||||||||||||||||||
LogStash::AckedQueue.file_settings_builder(path) | ||||||||||||||||||||||||||||||
.capacity(1024) | ||||||||||||||||||||||||||||||
.maxUnread(10) | ||||||||||||||||||||||||||||||
.checkpointMaxAcks(1024) | ||||||||||||||||||||||||||||||
.checkpointMaxWrites(1024) | ||||||||||||||||||||||||||||||
.queueMaxBytes(4096) | ||||||||||||||||||||||||||||||
.build | ||||||||||||||||||||||||||||||
Comment on lines
+130
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||
end | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) } | ||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||
before do | ||||||||||||||||||||||||||||||
read_client.set_events_metric(metric.namespace([:stats, :events])) | ||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
package org.logstash.ackedqueue; | ||
|
||
import org.logstash.util.SetOnceReference; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.util.zip.DataFormatException; | ||
import java.util.zip.Inflater; | ||
|
||
/** | ||
* Subclasses of {@link AbstractDeflateAwareCompressionCodec} are {@link CompressionCodec}s that are capable | ||
* of detecting and decompressing deflate-compressed events. When decoding byte sequences that are <em>NOT</em> | ||
* deflate-compressed, the given bytes are emitted verbatim. | ||
*/ | ||
abstract class AbstractDeflateAwareCompressionCodec implements CompressionCodec { | ||
|
||
static final int BAOS_SHAREABLE_THRESHOLD_BYTES = 4096; | ||
|
||
private final ThreadLocal<BufferedInflater> bufferedInflaterThreadLocal; | ||
|
||
public AbstractDeflateAwareCompressionCodec() { | ||
this.bufferedInflaterThreadLocal = ThreadLocal.withInitial(BufferedInflater::new); | ||
} | ||
|
||
@Override | ||
public byte[] decode(byte[] data) { | ||
if (!isDeflate(data)) { | ||
return data; | ||
} | ||
final BufferedInflater bufferedInflater = bufferedInflaterThreadLocal.get(); | ||
try { | ||
return bufferedInflater.decode(data); | ||
} catch (IOException e) { | ||
throw new RuntimeException("IOException while decoding", e); | ||
} | ||
} | ||
|
||
static boolean isDeflate(byte[] data) { | ||
if (data.length < 2) { return false; } | ||
|
||
// parse first two bytes as big-endian short | ||
short header = (short) (((data[0] & 0xFF) << 8) | (data[1] & 0xFF)); | ||
|
||
/* | ||
* RFC-1950: ZLIB Compressed Data Format Specification version 3.3 | ||
* https://www.ietf.org/rfc/rfc1950.txt | ||
* ┏━━━━ CMF ━━━━━┳━━━━━━━━━━ FLG ━━━━━━━━━━┓ | ||
* ┠─CINFO─┬──CM──╂─FLEVEL─┬─FDICT─┬─FCHECK─┨ | ||
* ┃ 0XXX │ 1000 ┃ XX │ 0 │ XXXXX ┃ | ||
* ┗━━━━━━━┷━━━━━━┻━━━━━━━━┷━━━━━━━┷━━━━━━━━┛ | ||
* CINFO: 0XXX // always LTE 7 (7 is 32KB window) | ||
* CM: 1000 // always 8 for deflate | ||
* DFICT: 0 // always unset (no dictionary) | ||
* | ||
*/// 0XXX_1000_XX_0_XXXXX | ||
short mask = (short) 0b1000_1111_00_1_00000; // bits to keep | ||
short flip = (short) 0b0000_1000_00_0_00000; // bits to flip | ||
short goal = (short) 0b0000_0000_00_0_00000; // goal state | ||
if (((header & mask) ^ flip) != goal) { | ||
return false; | ||
} | ||
|
||
// additionally the FCHECK ensures that | ||
// the big-endian header is a multiple of 31 | ||
return header % 31 == 0; | ||
} | ||
|
||
/** | ||
* A {@link BufferedInflater} is a convenience wrapper around the complexities | ||
* of managing an {@link Inflater}, an intermediate {@code byte[]} buffer, and | ||
* a {@link ByteArrayOutputStream}. It enables internal reuse of small buffers | ||
* to reduce allocations. | ||
*/ | ||
static class BufferedInflater { | ||
private final Inflater inflater; | ||
private final byte[] intermediateBuffer; | ||
private final SetOnceReference<ByteArrayOutputStream> reusableBaosRef; | ||
|
||
public BufferedInflater() { | ||
this.inflater = new Inflater(); | ||
this.intermediateBuffer = new byte[1024]; | ||
this.reusableBaosRef = SetOnceReference.unset(); | ||
} | ||
|
||
public byte[] decode(final byte[] data) throws IOException { | ||
final ByteArrayOutputStream baos = getBaos(data.length); | ||
try { | ||
inflater.setInput(data); | ||
|
||
do { | ||
if (inflater.needsInput()) { | ||
throw new IOException(String.format("prematurely reached end of encoded value (%s/%s)", inflater.getBytesRead(), inflater.getTotalIn())); | ||
} | ||
try { | ||
int count = inflater.inflate(intermediateBuffer); | ||
baos.write(intermediateBuffer, 0, count); | ||
} catch (DataFormatException e) { | ||
throw new IOException("Failed to decode", e); | ||
} | ||
} while (!inflater.finished()); | ||
|
||
return baos.toByteArray(); | ||
} finally { | ||
inflater.reset(); | ||
baos.reset(); | ||
} | ||
} | ||
|
||
public void release() { | ||
inflater.end(); | ||
} | ||
|
||
private ByteArrayOutputStream getBaos(final int encodedSize) { | ||
if (encodedSize <= BAOS_SHAREABLE_THRESHOLD_BYTES) { | ||
return this.reusableBaosRef.offerAndGet(() -> new ByteArrayOutputStream(BAOS_SHAREABLE_THRESHOLD_BYTES)); | ||
} | ||
Comment on lines
+114
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. review note: if the thread never encounters a compressed payload that is small enough to safely use a shared (and therefore thread-permanent) BAOS, this is the bit that prevents us from taking on overhead of that shareable BAOS. |
||
return new ByteArrayOutputStream(encodedSize); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,60 @@ | ||||||
package org.logstash.ackedqueue; | ||||||
|
||||||
import org.apache.logging.log4j.LogManager; | ||||||
import org.apache.logging.log4j.Logger; | ||||||
|
||||||
import java.util.zip.Deflater; | ||||||
|
||||||
public interface CompressionCodec { | ||||||
Logger LOGGER = LogManager.getLogger(CompressionCodec.class); | ||||||
|
||||||
byte[] encode(byte[] data); | ||||||
byte[] decode(byte[] data); | ||||||
|
||||||
/** | ||||||
* The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that | ||||||
* does nothing when encoding and decoding. It is only meant to be activated | ||||||
* as a safety-latch in the event of compression being broken. | ||||||
*/ | ||||||
CompressionCodec NOOP = new CompressionCodec() { | ||||||
@Override | ||||||
public byte[] encode(byte[] data) { | ||||||
return data; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public byte[] decode(byte[] data) { | ||||||
return data; | ||||||
} | ||||||
}; | ||||||
|
||||||
static CompressionCodec fromConfigValue(final String configValue) { | ||||||
return fromConfigValue(configValue, LOGGER); | ||||||
} | ||||||
|
||||||
static CompressionCodec fromConfigValue(final String configValue, final Logger logger) { | ||||||
return switch (configValue) { | ||||||
case "disabled" -> { | ||||||
logger.warn("compression support has been disabled"); | ||||||
yield CompressionCodec.NOOP; | ||||||
} | ||||||
case "none" -> { | ||||||
logger.info("compression support is enabled (read-only)"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at a log entry saying "read-only" can confuse users that the PQ is in a read only mode
Suggested change
|
||||||
yield DeflateAwareCompressionCodec.getInstance(); | ||||||
} | ||||||
case "speed" -> { | ||||||
logger.info("compression support is enabled (goal: speed)"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Goal is a very comp sci term, and provides a sense of "we'll do our best but it's no promises". I don't think it's necessary to pass this subjectivity to the users, so maybe something like:
Suggested change
Or |
||||||
yield new DeflateEnabledCompressionCodec(Deflater.BEST_SPEED); | ||||||
} | ||||||
case "balanced" -> { | ||||||
logger.info("compression support is enabled (goal: balanced)"); | ||||||
yield new DeflateEnabledCompressionCodec(Deflater.DEFAULT_COMPRESSION); | ||||||
} | ||||||
case "size" -> { | ||||||
logger.info("compression support is enabled (goal: size)"); | ||||||
yield new DeflateEnabledCompressionCodec(Deflater.BEST_COMPRESSION); | ||||||
} | ||||||
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue)); | ||||||
}; | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package org.logstash.ackedqueue; | ||
|
||
/** | ||
* A {@link DeflateAwareCompressionCodec} is an {@link CompressionCodec} that can decode deflate-compressed | ||
* bytes, but performs no compression when encoding. | ||
*/ | ||
class DeflateAwareCompressionCodec extends AbstractDeflateAwareCompressionCodec { | ||
private static final DeflateAwareCompressionCodec INSTANCE = new DeflateAwareCompressionCodec(); | ||
|
||
static DeflateAwareCompressionCodec getInstance() { | ||
return INSTANCE; | ||
} | ||
|
||
@Override | ||
public byte[] encode(byte[] data) { | ||
return data; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we provide some guidance here on what the users should be able to see in their metrics to decide when to turn this on?
For example, we provide an example of observing mmap operations on multiple consecutive api calls to hot_threads. or/and a combination of low worker utilization and high queue backpressure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be addressing this with #18107