Open
Description
Expected behavior
BufferedReader
is probably the best abstraction in NIOFileSystem
to stream a bunch of data. Real world examples include for example JSON lines (JSONL).
I'd expect to be able to write
try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
var reader = file.bufferedReader(capacity: .mebibytes(4))
for line in try await reader.lines {
let thing = try JSONDecoder().decode(Thing.self, from: line)
try await process(thing)
}
}
And for more complex schemes that aren't just newlines, I'd like to be able to write something composable on BufferedReader.
Actual behavior
Right now I use
try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
var reader = file.bufferedReader(capacity: .mebibytes(4))
while true {
let (line, seenEOF) = try await reader.read(while: { $0 != UInt8("\n") })
guard !seenEOF else {
break
}
(_, _) = try await reader.read(while: { $0 == UInt8("\n") }) // skip new lines
let thing = try JSONDecoder().decode(Thing.self, from: line)
try await process(thing)
}
}
But this is
- quite ugly
while true
+guard !seenEOF
(_, _) =
- not actually correct because I can only easily test for 1 byte (
UInt8("\n")
). If I wanted to support\r\n
&\n
, then this would get a lot more messy
SwiftNIO version/commit hash
2.77.0
FWIW, I also have another thing which allows me to use
try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
for try await line in file.lines {
let thing = try JSONDecoder().decode(Thing.self, from: line)
try await process(thing)
}
}
but, it's very complex:
import NIO
import _NIOFileSystem
public struct AsyncByteBufferLineSequence<Base: Sendable>: AsyncSequence & Sendable
where Base: AsyncSequence, Base.Element == ByteBuffer {
public typealias Element = ByteBuffer
private let underlying: Base
private let dropTerminator: Bool
private let maximumAllowableBufferSize: Int
private let dropLastChunkIfNoNewline: Bool
public struct AsyncIterator: AsyncIteratorProtocol {
public typealias Element = ByteBuffer
private var underlying: Base.AsyncIterator
private let dropTerminator: Bool
private let maximumAllowableBufferSize: Int
private let dropLastChunkIfNoNewline: Bool
private var buffer = Buffer()
struct Buffer {
private var buffer: [ByteBuffer] = []
internal private(set) var byteCount: Int = 0
mutating func append(_ buffer: ByteBuffer) {
self.buffer.append(buffer)
self.byteCount += buffer.readableBytes
}
func allButLast() -> ArraySlice<ByteBuffer> {
return self.buffer.dropLast()
}
var byteCountButLast: Int {
return self.byteCount - (self.buffer.last?.readableBytes ?? 0)
}
var lastChunkView: ByteBufferView? {
return self.buffer.last?.readableBytesView
}
mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) -> ByteBuffer {
var output = ByteBuffer()
output.reserveCapacity(lastLength + self.byteCountButLast)
var writtenBytes = 0
for buffer in self.buffer.dropLast() {
writtenBytes += output.writeImmutableBuffer(buffer)
}
writtenBytes += output.writeImmutableBuffer(
self.buffer[self.buffer.endIndex - 1].readSlice(length: lastLength)!
)
if self.buffer.last!.readableBytes > 0 {
if self.buffer.count > 1 {
self.buffer.swapAt(0, self.buffer.endIndex - 1)
}
self.buffer.removeLast(self.buffer.count - 1)
} else {
self.buffer = []
}
self.byteCount -= writtenBytes
assert(self.byteCount >= 0)
return output
}
}
internal init(
underlying: Base.AsyncIterator,
dropTerminator: Bool,
maximumAllowableBufferSize: Int,
dropLastChunkIfNoNewline: Bool
) {
self.underlying = underlying
self.dropTerminator = dropTerminator
self.maximumAllowableBufferSize = maximumAllowableBufferSize
self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
}
private mutating func deliverUpTo(
view: ByteBufferView,
index: ByteBufferView.Index,
expectNewline: Bool
) -> ByteBuffer {
let howMany = view.startIndex.distance(to: index) + (expectNewline ? 1 : 0)
var output = self.buffer.concatenateEverything(upToLastChunkLengthToConsume: howMany)
if expectNewline {
assert(output.readableBytesView.last == UInt8(ascii: "\n"))
assert(
output.readableBytesView.firstIndex(of: UInt8(ascii: "\n"))
== output.readableBytesView.index(before: output.readableBytesView.endIndex))
} else {
assert(output.readableBytesView.last != UInt8(ascii: "\n"))
assert(!output.readableBytesView.contains(UInt8(ascii: "\n")))
}
if self.dropTerminator && expectNewline {
output.moveWriterIndex(to: output.writerIndex - 1)
}
return output
}
public mutating func next() async throws -> Element? {
while true {
if let view = self.buffer.lastChunkView {
if let newlineIndex = view.firstIndex(of: UInt8(ascii: "\n")) {
return self.deliverUpTo(
view: view,
index: newlineIndex,
expectNewline: true
)
}
if self.buffer.byteCount > self.maximumAllowableBufferSize {
return self.deliverUpTo(
view: view,
index: view.endIndex,
expectNewline: false
)
}
}
if let nextBuffer = try await self.underlying.next() {
self.buffer.append(nextBuffer)
} else {
if !self.dropLastChunkIfNoNewline, let view = self.buffer.lastChunkView, !view.isEmpty {
return self.deliverUpTo(
view: view,
index: view.endIndex,
expectNewline: false
)
} else {
return nil
}
}
}
}
}
public init(
_ underlying: Base, dropTerminator: Bool,
maximumAllowableBufferSize: Int,
dropLastChunkIfNoNewline: Bool
) {
self.underlying = underlying
self.dropTerminator = dropTerminator
self.maximumAllowableBufferSize = maximumAllowableBufferSize
self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
}
public func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(
underlying: self.underlying.makeAsyncIterator(),
dropTerminator: self.dropTerminator,
maximumAllowableBufferSize: self.maximumAllowableBufferSize,
dropLastChunkIfNoNewline: self.dropLastChunkIfNoNewline
)
}
}
extension ReadFileHandle {
public var lines: AsyncByteBufferLineSequence<FileChunks> {
return AsyncByteBufferLineSequence(
self.readChunks(),
dropTerminator: true,
maximumAllowableBufferSize: 8 * 1024 * 1024,
dropLastChunkIfNoNewline: false
)
}
}
Metadata
Metadata
Assignees
Labels
No labels