@@ -86,7 +86,7 @@ extension ByteBuffer {
86
86
/// - Returns: `ByteBuffer` containing compressed data
87
87
public mutating func decompressStream(
88
88
with decompressor: NIODecompressor ,
89
- process: ( ByteBuffer ) -> ( )
89
+ process: ( ByteBuffer ) throws -> ( )
90
90
) throws {
91
91
guard var window = decompressor. window else {
92
92
preconditionFailure ( " decompressString(with:flush:process requires your compressor has a window buffer " )
@@ -95,14 +95,48 @@ extension ByteBuffer {
95
95
do {
96
96
try decompressStream ( to: & window, with: decompressor)
97
97
} catch let error as CompressNIOError where error == . bufferOverflow {
98
- process ( window)
98
+ try process ( window)
99
99
window. moveReaderIndex ( to: 0 )
100
100
window. moveWriterIndex ( to: 0 )
101
101
}
102
102
}
103
103
104
104
if window. readableBytes > 0 {
105
- process ( window)
105
+ try process ( window)
106
+ }
107
+ }
108
+
109
+ /// A version of decompressStream which you provide a fixed sized window buffer to and a process closure.
110
+ ///
111
+ /// When the window buffer is full the process closure is called. If there is any unprocessed data left
112
+ /// at the end of the compress the process closure is called with this.
113
+ ///
114
+ /// Before calling this you need to provide a working window `ByteBuffer` to the decompressor by
115
+ /// setting `NIODecompressor.window`.
116
+ ///
117
+ /// - Parameters:
118
+ /// - compressor: Algorithm to use when decompressing
119
+ /// - process: Closure to be called when window buffer fills up or decompress has finished
120
+ /// - Returns: `ByteBuffer` containing compressed data
121
+ public mutating func decompressStream(
122
+ with decompressor: NIODecompressor ,
123
+ process: ( ByteBuffer ) async throws -> ( )
124
+ ) async throws {
125
+ guard var window = decompressor. window else {
126
+ preconditionFailure ( " decompressString(with:flush:process requires your compressor has a window buffer " )
127
+ }
128
+ while self . readableBytes > 0 {
129
+ do {
130
+ try decompressStream ( to: & window, with: decompressor)
131
+ } catch let error as CompressNIOError where error == . bufferOverflow {
132
+ try await process ( window)
133
+ window. moveReaderIndex ( to: 0 )
134
+ window. moveWriterIndex ( to: 0 )
135
+ }
136
+ }
137
+
138
+ if window. readableBytes > 0 {
139
+ try await process ( window)
106
140
}
107
141
}
108
142
@@ -215,14 +249,77 @@ extension ByteBuffer {
215
249
public mutating func compressStream(
216
250
with compressor: NIOCompressor ,
217
251
flush: CompressNIOFlush ,
218
- process: ( ByteBuffer ) -> ( )
252
+ process: ( ByteBuffer ) throws -> ( )
219
253
) throws {
220
254
guard var window = compressor. window else { preconditionFailure ( " compressString(with:flush:process requires your compressor has a window buffer " ) }
221
255
while self . readableBytes > 0 {
222
256
do {
223
257
try compressStream ( to: & window, with: compressor, flush: . no)
224
258
} catch let error as CompressNIOError where error == . bufferOverflow {
225
- process ( window)
259
+ try process ( window)
260
+ window. moveReaderIndex ( to: 0 )
261
+ window. moveWriterIndex ( to: 0 )
262
+ }
263
+ }
264
+
265
+ if flush == . sync {
266
+ while true {
267
+ do {
268
+ try compressStream ( to: & window, with: compressor, flush: . sync)
269
+ break
270
+ } catch let error as CompressNIOError where error == . bufferOverflow {
271
+ try process ( window)
272
+ window. moveReaderIndex ( to: 0 )
273
+ window. moveWriterIndex ( to: 0 )
274
+ }
275
+ }
276
+ } else if flush == . finish {
277
+ while true {
278
+ do {
279
+ try compressStream ( to: & window, with: compressor, flush: . finish)
280
+ break
281
+ } catch let error as CompressNIOError where error == . bufferOverflow {
282
+ try process ( window)
283
+ window. moveReaderIndex ( to: 0 )
284
+ window. moveWriterIndex ( to: 0 )
285
+ }
286
+ }
287
+ }
288
+
289
+ if flush == . finish {
290
+ if window. readableBytes > 0 {
291
+ try process ( window)
292
+ window. moveReaderIndex ( to: 0 )
293
+ window. moveWriterIndex ( to: 0 )
294
+ }
295
+ }
296
+ compressor. window = window
297
+ }
298
+
299
+ /// A version of compressStream which you provide a fixed sized window buffer to and a process closure.
300
+ ///
301
+ /// When the window buffer is full the process closure is called. If there is any unprocessed data left
302
+ /// at the end of the compress the process closure is called with this.
303
+ ///
304
+ /// Before calling this you need to provide a working window `ByteBuffer` to the compressor by setting
305
+ /// `NIOCompressor.window`.
306
+ ///
307
+ /// - Parameters:
308
+ /// - compressor: Algorithm to use when compressing
309
+ /// - flush: how compressor should flush output data.
310
+ /// - process: Closure to be called when window buffer fills up or compress has finished
311
+ /// - Returns: `ByteBuffer` containing compressed data
312
+ public mutating func compressStream(
313
+ with compressor: NIOCompressor ,
314
+ flush: CompressNIOFlush ,
315
+ process: ( ByteBuffer ) async throws -> ( )
316
+ ) async throws {
317
+ guard var window = compressor. window else { preconditionFailure ( " compressString(with:flush:process requires your compressor has a window buffer " ) }
318
+ while self . readableBytes > 0 {
319
+ do {
320
+ try compressStream ( to: & window, with: compressor, flush: . no)
321
+ } catch let error as CompressNIOError where error == . bufferOverflow {
322
+ try await process ( window)
226
323
window. moveReaderIndex ( to: 0 )
227
324
window. moveWriterIndex ( to: 0 )
228
325
}
@@ -234,7 +331,7 @@ extension ByteBuffer {
234
331
try compressStream ( to: & window, with: compressor, flush: . sync)
235
332
break
236
333
} catch let error as CompressNIOError where error == . bufferOverflow {
237
- process ( window)
334
+ try await process ( window)
238
335
window. moveReaderIndex ( to: 0 )
239
336
window. moveWriterIndex ( to: 0 )
240
337
}
@@ -245,7 +342,7 @@ extension ByteBuffer {
245
342
try compressStream ( to: & window, with: compressor, flush: . finish)
246
343
break
247
344
} catch let error as CompressNIOError where error == . bufferOverflow {
248
- process ( window)
345
+ try await process ( window)
249
346
window. moveReaderIndex ( to: 0 )
250
347
window. moveWriterIndex ( to: 0 )
251
348
}
@@ -254,7 +351,7 @@ extension ByteBuffer {
254
351
255
352
if flush == . finish {
256
353
if window. readableBytes > 0 {
257
- process ( window)
354
+ try await process ( window)
258
355
window. moveReaderIndex ( to: 0 )
259
356
window. moveWriterIndex ( to: 0 )
260
357
}
0 commit comments