Skip to content

NIOFileSystem: BufferedReader isn't really composable #3011

Open
@weissi

Description

@weissi

Expected behavior

BufferedReader is probably the best abstraction in NIOFileSystem to stream a bunch of data. Real world examples include for example JSON lines (JSONL).

I'd expect to be able to write

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    var reader = file.bufferedReader(capacity: .mebibytes(4))
    for line in try await reader.lines {
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

And for more complex schemes that aren't just newlines, I'd like to be able to write something composable on BufferedReader.

Actual behavior

Right now I use

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    var reader = file.bufferedReader(capacity: .mebibytes(4))
    while true {
        let (line, seenEOF) = try await reader.read(while: { $0 != UInt8("\n") })
        guard !seenEOF else {
            break
        }
        (_, _) = try await reader.read(while: { $0 == UInt8("\n") }) // skip new lines
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

But this is

  • quite ugly
    • while true + guard !seenEOF
    • (_, _) =
  • not actually correct because I can only easily test for 1 byte (UInt8("\n")). If I wanted to support \r\n & \n, then this would get a lot more messy

SwiftNIO version/commit hash

2.77.0


FWIW, I also have another thing which allows me to use

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    for try await line in file.lines {
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

but, it's very complex:

import NIO
import _NIOFileSystem

public struct AsyncByteBufferLineSequence<Base: Sendable>: AsyncSequence & Sendable
where Base: AsyncSequence, Base.Element == ByteBuffer {
    public typealias Element = ByteBuffer
    private let underlying: Base
    private let dropTerminator: Bool
    private let maximumAllowableBufferSize: Int
    private let dropLastChunkIfNoNewline: Bool

    public struct AsyncIterator: AsyncIteratorProtocol {
        public typealias Element = ByteBuffer
        private var underlying: Base.AsyncIterator
        private let dropTerminator: Bool
        private let maximumAllowableBufferSize: Int
        private let dropLastChunkIfNoNewline: Bool
        private var buffer = Buffer()

        struct Buffer {
            private var buffer: [ByteBuffer] = []
            internal private(set) var byteCount: Int = 0

            mutating func append(_ buffer: ByteBuffer) {
                self.buffer.append(buffer)
                self.byteCount += buffer.readableBytes
            }

            func allButLast() -> ArraySlice<ByteBuffer> {
                return self.buffer.dropLast()
            }

            var byteCountButLast: Int {
                return self.byteCount - (self.buffer.last?.readableBytes ?? 0)
            }

            var lastChunkView: ByteBufferView? {
                return self.buffer.last?.readableBytesView
            }

            mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) -> ByteBuffer {
                var output = ByteBuffer()
                output.reserveCapacity(lastLength + self.byteCountButLast)

                var writtenBytes = 0
                for buffer in self.buffer.dropLast() {
                    writtenBytes += output.writeImmutableBuffer(buffer)
                }
                writtenBytes += output.writeImmutableBuffer(
                    self.buffer[self.buffer.endIndex - 1].readSlice(length: lastLength)!
                )
                if self.buffer.last!.readableBytes > 0 {
                    if self.buffer.count > 1 {
                        self.buffer.swapAt(0, self.buffer.endIndex - 1)
                    }
                    self.buffer.removeLast(self.buffer.count - 1)
                } else {
                    self.buffer = []
                }

                self.byteCount -= writtenBytes
                assert(self.byteCount >= 0)
                return output
            }
        }

        internal init(
            underlying: Base.AsyncIterator,
            dropTerminator: Bool,
            maximumAllowableBufferSize: Int,
            dropLastChunkIfNoNewline: Bool
        ) {
            self.underlying = underlying
            self.dropTerminator = dropTerminator
            self.maximumAllowableBufferSize = maximumAllowableBufferSize
            self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
        }

        private mutating func deliverUpTo(
            view: ByteBufferView,
            index: ByteBufferView.Index,
            expectNewline: Bool
        ) -> ByteBuffer {
            let howMany = view.startIndex.distance(to: index) + (expectNewline ? 1 : 0)

            var output = self.buffer.concatenateEverything(upToLastChunkLengthToConsume: howMany)
            if expectNewline {
                assert(output.readableBytesView.last == UInt8(ascii: "\n"))
                assert(
                    output.readableBytesView.firstIndex(of: UInt8(ascii: "\n"))
                        == output.readableBytesView.index(before: output.readableBytesView.endIndex))
            } else {
                assert(output.readableBytesView.last != UInt8(ascii: "\n"))
                assert(!output.readableBytesView.contains(UInt8(ascii: "\n")))
            }
            if self.dropTerminator && expectNewline {
                output.moveWriterIndex(to: output.writerIndex - 1)
            }

            return output
        }

        public mutating func next() async throws -> Element? {
            while true {
                if let view = self.buffer.lastChunkView {
                    if let newlineIndex = view.firstIndex(of: UInt8(ascii: "\n")) {
                        return self.deliverUpTo(
                            view: view,
                            index: newlineIndex,
                            expectNewline: true
                        )
                    }

                    if self.buffer.byteCount > self.maximumAllowableBufferSize {
                        return self.deliverUpTo(
                            view: view,
                            index: view.endIndex,
                            expectNewline: false
                        )
                    }
                }

                if let nextBuffer = try await self.underlying.next() {
                    self.buffer.append(nextBuffer)
                } else {
                    if !self.dropLastChunkIfNoNewline, let view = self.buffer.lastChunkView, !view.isEmpty {
                        return self.deliverUpTo(
                            view: view,
                            index: view.endIndex,
                            expectNewline: false
                        )
                    } else {
                        return nil
                    }
                }
            }
        }
    }

    public init(
        _ underlying: Base, dropTerminator: Bool,
        maximumAllowableBufferSize: Int,
        dropLastChunkIfNoNewline: Bool
    ) {
        self.underlying = underlying
        self.dropTerminator = dropTerminator
        self.maximumAllowableBufferSize = maximumAllowableBufferSize
        self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
    }

    public func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(
            underlying: self.underlying.makeAsyncIterator(),
            dropTerminator: self.dropTerminator,
            maximumAllowableBufferSize: self.maximumAllowableBufferSize,
            dropLastChunkIfNoNewline: self.dropLastChunkIfNoNewline
        )
    }
}

extension ReadFileHandle {
    public var lines: AsyncByteBufferLineSequence<FileChunks> {
        return AsyncByteBufferLineSequence(
            self.readChunks(),
            dropTerminator: true,
            maximumAllowableBufferSize: 8 * 1024 * 1024,
            dropLastChunkIfNoNewline: false
        )
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions