Skip to content

Commit 30c3ce0

Browse files
compressable:Zstd comp support (#4657)
**Which issue(s) this PR fixes**: Fixes #4162 **What this PR does / why we need it**: Adds new compression method support to handle messages **Docs Changes**: TODO **Release Note**: N/A --------- Signed-off-by: Athish Pranav D <athishanna@gmail.com> Co-authored-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent 29189a1 commit 30c3ce0

18 files changed

+607
-103
lines changed

fluentd.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Gem::Specification.new do |gem|
3333
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
3434
gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"])
3535
gem.add_runtime_dependency("webrick", ["~> 1.4"])
36+
gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"])
3637

3738
# gems that aren't default gems as of Ruby 3.4
3839
gem.add_runtime_dependency("base64", ["~> 0.2"])

lib/fluent/event.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ def to_msgpack_stream(time_int: false, packer: nil)
6262
out.full_pack
6363
end
6464

65-
def to_compressed_msgpack_stream(time_int: false, packer: nil)
65+
def to_compressed_msgpack_stream(time_int: false, packer: nil, type: :gzip)
6666
packed = to_msgpack_stream(time_int: time_int, packer: packer)
67-
compress(packed)
67+
compress(packed, type: type)
6868
end
6969

7070
def to_msgpack_stream_forced_integer(packer: nil)
@@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil)
268268
end
269269

270270
class CompressedMessagePackEventStream < MessagePackEventStream
271-
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
272-
super
271+
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
272+
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)
273273
@decompressed_data = nil
274274
@compressed_data = data
275+
@type = compress
275276
end
276277

277278
def empty?
@@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil)
303304

304305
def ensure_decompressed!
305306
return if @decompressed_data
306-
@data = @decompressed_data = decompress(@data)
307+
@data = @decompressed_data = decompress(@data, type: @type)
307308
end
308309
end
309310

lib/fluent/plugin/buffer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
6464
config_param :queued_chunks_limit_size, :integer, default: nil
6565

6666
desc 'Compress buffered data.'
67-
config_param :compress, :enum, list: [:text, :gzip], default: :text
67+
config_param :compress, :enum, list: [:text, :gzip, :zstd], default: :text
6868

6969
desc 'If true, chunks are thrown away when unrecoverable error happens'
7070
config_param :disable_chunk_backup, :bool, default: false

lib/fluent/plugin/buffer/chunk.rb

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,11 @@ def initialize(metadata, compress: :text)
5959
@size = 0
6060
@created_at = Fluent::Clock.real_now
6161
@modified_at = Fluent::Clock.real_now
62-
63-
extend Decompressable if compress == :gzip
62+
if compress == :gzip
63+
extend GzipDecompressable
64+
elsif compress == :zstd
65+
extend ZstdDecompressable
66+
end
6467
end
6568

6669
attr_reader :unique_id, :metadata, :state
@@ -85,7 +88,7 @@ def modified_at
8588

8689
# data is array of formatted record string
8790
def append(data, **kwargs)
88-
raise ArgumentError, '`compress: gzip` can be used for Compressable module' if kwargs[:compress] == :gzip
91+
raise ArgumentError, "`compress: #{kwargs[:compress]}` can be used for Compressable module" if kwargs[:compress] == :gzip || kwargs[:compress] == :zstd
8992
begin
9093
adding = data.join.force_encoding(Encoding::ASCII_8BIT)
9194
rescue
@@ -172,23 +175,23 @@ def purge
172175
end
173176

174177
def read(**kwargs)
175-
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
178+
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
176179
raise NotImplementedError, "Implement this method in child class"
177180
end
178181

179182
def open(**kwargs, &block)
180-
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
183+
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
181184
raise NotImplementedError, "Implement this method in child class"
182185
end
183186

184187
def write_to(io, **kwargs)
185-
raise ArgumentError, '`compressed: gzip` can be used for Compressable module' if kwargs[:compressed] == :gzip
188+
raise ArgumentError, "`compressed: #{kwargs[:compressed]}` can be used for Compressable module" if kwargs[:compressed] == :gzip || kwargs[:compressed] == :zstd
186189
open do |i|
187190
IO.copy_stream(i, io)
188191
end
189192
end
190193

191-
module Decompressable
194+
module GzipDecompressable
192195
include Fluent::Plugin::Compressable
193196

194197
def append(data, **kwargs)
@@ -241,6 +244,60 @@ def write_to(io, **kwargs)
241244
end
242245
end
243246
end
247+
248+
module ZstdDecompressable
249+
include Fluent::Plugin::Compressable
250+
251+
def append(data, **kwargs)
252+
if kwargs[:compress] == :zstd
253+
io = StringIO.new
254+
stream = Zstd::StreamWriter.new(io)
255+
data.each do |d|
256+
stream.write(d)
257+
end
258+
stream.finish
259+
concat(io.string, data.size)
260+
else
261+
super
262+
end
263+
end
264+
265+
def open(**kwargs, &block)
266+
if kwargs[:compressed] == :zstd
267+
super
268+
else
269+
super(**kwargs) do |chunk_io|
270+
output_io = if chunk_io.is_a?(StringIO)
271+
StringIO.new
272+
else
273+
Tempfile.new('decompressed-data')
274+
end
275+
output_io.binmode if output_io.is_a?(Tempfile)
276+
decompress(input_io: chunk_io, output_io: output_io, type: :zstd)
277+
output_io.seek(0, IO::SEEK_SET)
278+
yield output_io
279+
end
280+
end
281+
end
282+
283+
def read(**kwargs)
284+
if kwargs[:compressed] == :zstd
285+
super
286+
else
287+
decompress(super,type: :zstd)
288+
end
289+
end
290+
291+
def write_to(io, **kwargs)
292+
open(compressed: :zstd) do |chunk_io|
293+
if kwargs[:compressed] == :zstd
294+
IO.copy_stream(chunk_io, io)
295+
else
296+
decompress(input_io: chunk_io, output_io: io, type: :zstd)
297+
end
298+
end
299+
end
300+
end
244301
end
245302
end
246303
end

lib/fluent/plugin/compressable.rb

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,81 +16,127 @@
1616

1717
require 'stringio'
1818
require 'zlib'
19+
require 'zstd-ruby'
1920

2021
module Fluent
2122
module Plugin
2223
module Compressable
23-
def compress(data, **kwargs)
24+
def compress(data, type: :gzip, **kwargs)
2425
output_io = kwargs[:output_io]
2526
io = output_io || StringIO.new
26-
Zlib::GzipWriter.wrap(io) do |gz|
27-
gz.write data
27+
if type == :gzip
28+
writer = Zlib::GzipWriter.new(io)
29+
elsif type == :zstd
30+
writer = Zstd::StreamWriter.new(io)
31+
else
32+
raise ArgumentError, "Unknown compression type: #{type}"
2833
end
29-
34+
writer.write(data)
35+
writer.finish
3036
output_io || io.string
3137
end
3238

3339
# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
3440
# https://www.ruby-forum.com/topic/971591#979503
35-
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
41+
def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip)
3642
case
3743
when input_io && output_io
38-
io_decompress(input_io, output_io)
44+
io_decompress(input_io, output_io, type)
3945
when input_io
4046
output_io = StringIO.new
41-
io = io_decompress(input_io, output_io)
47+
io = io_decompress(input_io, output_io, type)
4248
io.string
4349
when compressed_data.nil? || compressed_data.empty?
4450
# check compressed_data(String) is 0 length
4551
compressed_data
4652
when output_io
4753
# execute after checking compressed_data is empty or not
4854
io = StringIO.new(compressed_data)
49-
io_decompress(io, output_io)
55+
io_decompress(io, output_io, type)
5056
else
51-
string_decompress(compressed_data)
57+
string_decompress(compressed_data, type)
5258
end
5359
end
5460

5561
private
5662

57-
def string_decompress(compressed_data)
63+
def string_decompress_gzip(compressed_data)
5864
io = StringIO.new(compressed_data)
59-
6065
out = ''
6166
loop do
62-
gz = Zlib::GzipReader.new(io)
63-
out << gz.read
64-
unused = gz.unused
65-
gz.finish
66-
67+
reader = Zlib::GzipReader.new(io)
68+
out << reader.read
69+
unused = reader.unused
70+
reader.finish
6771
unless unused.nil?
6872
adjust = unused.length
6973
io.pos -= adjust
7074
end
7175
break if io.eof?
7276
end
77+
out
78+
end
7379

80+
def string_decompress_zstd(compressed_data)
81+
io = StringIO.new(compressed_data)
82+
out = ''
83+
loop do
84+
reader = Zstd::StreamReader.new(io)
85+
# Zstd::StreamReader needs to specify the size of the buffer
86+
out << reader.read(1024)
87+
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
88+
break if io.eof?
89+
end
7490
out
7591
end
7692

77-
def io_decompress(input, output)
93+
def string_decompress(compressed_data, type = :gzip)
94+
if type == :gzip
95+
string_decompress_gzip(compressed_data)
96+
elsif type == :zstd
97+
string_decompress_zstd(compressed_data)
98+
else
99+
raise ArgumentError, "Unknown compression type: #{type}"
100+
end
101+
end
102+
103+
def io_decompress_gzip(input, output)
78104
loop do
79-
gz = Zlib::GzipReader.new(input)
80-
v = gz.read
105+
reader = Zlib::GzipReader.new(input)
106+
v = reader.read
81107
output.write(v)
82-
unused = gz.unused
83-
gz.finish
84-
108+
unused = reader.unused
109+
reader.finish
85110
unless unused.nil?
86111
adjust = unused.length
87112
input.pos -= adjust
88113
end
89114
break if input.eof?
90115
end
116+
output
117+
end
91118

119+
def io_decompress_zstd(input, output)
120+
loop do
121+
reader = Zstd::StreamReader.new(input)
122+
# Zstd::StreamReader needs to specify the size of the buffer
123+
v = reader.read(1024)
124+
output.write(v)
125+
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
126+
break if input.eof?
127+
end
92128
output
93129
end
130+
131+
def io_decompress(input, output, type = :gzip)
132+
if type == :gzip
133+
io_decompress_gzip(input, output)
134+
elsif type == :zstd
135+
io_decompress_zstd(input, output)
136+
else
137+
raise ArgumentError, "Unknown compression type: #{type}"
138+
end
139+
end
94140
end
95141
end
96142
end

lib/fluent/plugin/in_forward.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,10 +307,14 @@ def on_message(msg, chunk_size, conn)
307307
case entries
308308
when String
309309
# PackedForward
310-
option = msg[2]
311-
size = (option && option['size']) || 0
312-
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
313-
es = es_class.new(entries, nil, size.to_i)
310+
option = msg[2] || {}
311+
size = option['size'] || 0
312+
313+
if option['compressed'] && option['compressed'] != 'text'
314+
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
315+
else
316+
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
317+
end
314318
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
315319
if @enable_field_injection
316320
es = add_source_info(es, conn)

0 commit comments

Comments
 (0)