From 85e6ad2629f0e7d13dd4100d79eb1daf61014cae Mon Sep 17 00:00:00 2001 From: harry lachenmayer Date: Wed, 6 Dec 2023 17:36:33 +0000 Subject: [PATCH 1/3] fix: remove overlapping operators from swift-async-algorithms --- Package.resolved | 13 +- Package.swift | 10 +- README.md | 7 - .../Combiners/Merge/AsyncMerge2Sequence.swift | 63 -- .../Combiners/Merge/AsyncMerge3Sequence.swift | 69 --- .../Combiners/Merge/AsyncMergeSequence.swift | 57 -- .../Combiners/Merge/MergeStateMachine.swift | 249 -------- Sources/Combiners/Zip/AsyncZip2Sequence.swift | 61 -- Sources/Combiners/Zip/AsyncZip3Sequence.swift | 66 --- Sources/Combiners/Zip/AsyncZipSequence.swift | 56 -- Sources/Combiners/Zip/Zip2Runtime.swift | 214 ------- Sources/Combiners/Zip/Zip2StateMachine.swift | 373 ------------ Sources/Combiners/Zip/Zip3Runtime.swift | 252 -------- Sources/Combiners/Zip/Zip3StateMachine.swift | 542 ------------------ Sources/Combiners/Zip/ZipRuntime.swift | 186 ------ Sources/Combiners/Zip/ZipStateMachine.swift | 335 ----------- Sources/Creators/AsyncLazySequence.swift | 51 -- .../Merge/AsyncMergeSequenceTests.swift | 292 ---------- .../Combiners/Zip/AsyncZipSequenceTests.swift | 415 -------------- Tests/Creators/AsyncLazySequenceTests.swift | 50 -- .../Operators/AsyncPrependSequenceTests.swift | 1 + .../Operators/AsyncSequence+AssignTests.swift | 3 +- .../AsyncSequence+FlatMapLatestTests.swift | 5 +- .../AsyncSwitchToLatestSequenceTests.swift | 9 +- 24 files changed, 30 insertions(+), 3349 deletions(-) delete mode 100644 Sources/Combiners/Merge/AsyncMerge2Sequence.swift delete mode 100644 Sources/Combiners/Merge/AsyncMerge3Sequence.swift delete mode 100644 Sources/Combiners/Merge/AsyncMergeSequence.swift delete mode 100644 Sources/Combiners/Merge/MergeStateMachine.swift delete mode 100644 Sources/Combiners/Zip/AsyncZip2Sequence.swift delete mode 100644 Sources/Combiners/Zip/AsyncZip3Sequence.swift delete mode 100644 Sources/Combiners/Zip/AsyncZipSequence.swift delete mode 100644 Sources/Combiners/Zip/Zip2Runtime.swift delete mode 100644 Sources/Combiners/Zip/Zip2StateMachine.swift delete mode 100644 Sources/Combiners/Zip/Zip3Runtime.swift delete mode 100644 Sources/Combiners/Zip/Zip3StateMachine.swift delete mode 100644 Sources/Combiners/Zip/ZipRuntime.swift delete mode 100644 Sources/Combiners/Zip/ZipStateMachine.swift delete mode 100644 Sources/Creators/AsyncLazySequence.swift delete mode 100644 Tests/Combiners/Merge/AsyncMergeSequenceTests.swift delete mode 100644 Tests/Combiners/Zip/AsyncZipSequenceTests.swift delete mode 100644 Tests/Creators/AsyncLazySequenceTests.swift diff --git a/Package.resolved b/Package.resolved index e378654..1241cb6 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,13 +1,22 @@ { "object": { "pins": [ + { + "package": "swift-async-algorithms", + "repositoryURL": "https://github.com/apple/swift-async-algorithms.git", + "state": { + "branch": null, + "revision": "da4e36f86544cdf733a40d59b3a2267e3a7bbf36", + "version": "1.0.0" + } + }, { "package": "swift-collections", "repositoryURL": "https://github.com/apple/swift-collections.git", "state": { "branch": null, - "revision": "f504716c27d2e5d4144fa4794b12129301d17729", - "version": "1.0.3" + "revision": "a902f1823a7ff3c9ab2fba0f992396b948eda307", + "version": "1.0.5" } } ] diff --git a/Package.swift b/Package.swift index 4f5d569..cb582c4 100644 --- a/Package.swift +++ b/Package.swift @@ -16,7 +16,10 @@ let package = Package( name: "AsyncExtensions", targets: ["AsyncExtensions"]), ], - dependencies: [.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3"))], + dependencies: [ + .package(url: "https://github.com/apple/swift-async-algorithms.git", .upToNextMajor(from: "1.0.0")), + .package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3")) + ], targets: [ .target( name: "AsyncExtensions", @@ -32,7 +35,10 @@ let package = Package( ), .testTarget( name: "AsyncExtensionsTests", - dependencies: ["AsyncExtensions"], + dependencies: [ + "AsyncExtensions", + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms") + ], path: "Tests"), ] ) diff --git a/README.md b/README.md index 30fe0ef..f04b3c6 100644 --- a/README.md +++ b/README.md @@ -44,12 +44,6 @@ AsyncStream) * [AsyncThrowingReplaySubject](./Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift): Throwing subject with a shared output. Maintain an replays a buffered amount of values ### Combiners -* [`zip(_:_:)`](./Sources/Combiners/Zip/AsyncZip2Sequence.swift): Zips two `AsyncSequence` into an AsyncSequence of tuple of elements -* [`zip(_:_:_:)`](./Sources/Combiners/Zip/AsyncZip3Sequence.swift): Zips three `AsyncSequence` into an AsyncSequence of tuple of elements -* [`zip(_:)`](./Sources/Combiners/Zip/AsyncZipSequence.swift): Zips any async sequences into an array of elements -* [`merge(_:_:)`](./Sources/Combiners/Merge/AsyncMerge2Sequence.swift): Merges two `AsyncSequence` into an AsyncSequence of elements -* [`merge(_:_:_:)`](./Sources/Combiners/Merge/AsyncMerge3Sequence.swift): Merges three `AsyncSequence` into an AsyncSequence of elements -* [`merge(_:)`](./Sources/Combiners/Merge/AsyncMergeSequence.swift): Merges any `AsyncSequence` into an AsyncSequence of elements * [`withLatest(_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift): Combines elements from self with the last known element from an other `AsyncSequence` * [`withLatest(_:_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift): Combines elements from self with the last known elements from two other async sequences @@ -58,7 +52,6 @@ AsyncStream) * [AsyncFailSequence](./Sources/Creators/AsyncFailSequence.swift): Creates an `AsyncSequence` that immediately fails * [AsyncJustSequence](./Sources/Creators/AsyncJustSequence.swift): Creates an `AsyncSequence` that emits an element an finishes * [AsyncThrowingJustSequence](./Sources/Creators/AsyncThrowingJustSequence.swift): Creates an `AsyncSequence` that emits an elements and finishes bases on a throwing closure -* [AsyncLazySequence](./Sources/Creators/AsyncLazySequence.swift): Creates an `AsyncSequence` of the elements from the base sequence * [AsyncTimerSequence](./Sources/Creators/AsyncTimerSequence.swift): Creates an `AsyncSequence` that emits a date value periodically * [AsyncStream Pipe](./Sources/Creators/AsyncStream+Pipe.swift): Creates an AsyncStream and returns a tuple standing for its inputs and outputs diff --git a/Sources/Combiners/Merge/AsyncMerge2Sequence.swift b/Sources/Combiners/Merge/AsyncMerge2Sequence.swift deleted file mode 100644 index 89a4b23..0000000 --- a/Sources/Combiners/Merge/AsyncMerge2Sequence.swift +++ /dev/null @@ -1,63 +0,0 @@ -// -// AsyncMerge2Sequence.swift -// -// -// Created by Thibault Wittemberg on 31/03/2022. -// - -/// Creates an asynchronous sequence of elements from two underlying asynchronous sequences -public func merge( - _ base1: Base1, - _ base2: Base2 -) -> AsyncMerge2Sequence { - AsyncMerge2Sequence(base1, base2) -} - -/// An asynchronous sequence of elements from two underlying asynchronous sequences -/// -/// In a `AsyncMerge2Sequence` instance, the *i*th element is the *i*th element -/// resolved in sequential order out of the two underlying asynchronous sequences. -/// Use the `merge(_:_:)` function to create an `AsyncMerge2Sequence`. -public struct AsyncMerge2Sequence: AsyncSequence -where Base1.Element == Base2.Element { - public typealias Element = Base1.Element - public typealias AsyncIterator = Iterator - - let base1: Base1 - let base2: Base2 - - public init(_ base1: Base1, _ base2: Base2) { - self.base1 = base1 - self.base2 = base2 - } - - public func makeAsyncIterator() -> Iterator { - Iterator( - base1: self.base1, - base2: self.base2 - ) - } - - public struct Iterator: AsyncIteratorProtocol { - let mergeStateMachine: MergeStateMachine - - init(base1: Base1, base2: Base2) { - self.mergeStateMachine = MergeStateMachine( - base1, - base2 - ) - } - - public mutating func next() async rethrows -> Element? { - let mergedElement = await self.mergeStateMachine.next() - switch mergedElement { - case .element(let result): - return try result._rethrowGet() - case .termination: - return nil - } - } - } -} - -extension AsyncMerge2Sequence: Sendable where Base1: Sendable, Base2: Sendable {} diff --git a/Sources/Combiners/Merge/AsyncMerge3Sequence.swift b/Sources/Combiners/Merge/AsyncMerge3Sequence.swift deleted file mode 100644 index 468b1ee..0000000 --- a/Sources/Combiners/Merge/AsyncMerge3Sequence.swift +++ /dev/null @@ -1,69 +0,0 @@ -// -// AsyncMerge3Sequence.swift -// -// -// Created by Thibault Wittemberg on 31/03/2022. -// - -/// Creates an asynchronous sequence of elements from three underlying asynchronous sequences -public func merge( - _ base1: Base1, - _ base2: Base2, - _ base3: Base3 -) -> AsyncMerge3Sequence { - AsyncMerge3Sequence(base1, base2, base3) -} - -/// An asynchronous sequence of elements from three underlying asynchronous sequences -/// -/// In a `AsyncMerge3Sequence` instance, the *i*th element is the *i*th element -/// resolved in sequential order out of the two underlying asynchronous sequences. -/// Use the `merge(_:_:_:)` function to create an `AsyncMerge3Sequence`. -public struct AsyncMerge3Sequence: AsyncSequence -where Base1.Element == Base2.Element, Base3.Element == Base2.Element { - public typealias Element = Base1.Element - public typealias AsyncIterator = Iterator - - let base1: Base1 - let base2: Base2 - let base3: Base3 - - public init(_ base1: Base1, _ base2: Base2, _ base3: Base3) { - self.base1 = base1 - self.base2 = base2 - self.base3 = base3 - } - - public func makeAsyncIterator() -> Iterator { - Iterator( - base1: self.base1, - base2: self.base2, - base3: self.base3 - ) - } - - public struct Iterator: AsyncIteratorProtocol { - let mergeStateMachine: MergeStateMachine - - init(base1: Base1, base2: Base2, base3: Base3) { - self.mergeStateMachine = MergeStateMachine( - base1, - base2, - base3 - ) - } - - public mutating func next() async rethrows -> Element? { - let mergedElement = await self.mergeStateMachine.next() - switch mergedElement { - case .element(let result): - return try result._rethrowGet() - case .termination: - return nil - } - } - } -} - -extension AsyncMerge3Sequence: Sendable where Base1: Sendable, Base2: Sendable, Base3: Sendable {} -extension AsyncMerge3Sequence.Iterator: Sendable where Base1: Sendable, Base2: Sendable, Base3: Sendable {} diff --git a/Sources/Combiners/Merge/AsyncMergeSequence.swift b/Sources/Combiners/Merge/AsyncMergeSequence.swift deleted file mode 100644 index ad85bf1..0000000 --- a/Sources/Combiners/Merge/AsyncMergeSequence.swift +++ /dev/null @@ -1,57 +0,0 @@ -// -// AsyncMergeSequence.swift -// -// -// Created by Thibault Wittemberg on 31/03/2022. -// - -/// Creates an asynchronous sequence of elements from many underlying asynchronous sequences -public func merge( - _ bases: Base... -) -> AsyncMergeSequence { - AsyncMergeSequence(bases) -} - -/// An asynchronous sequence of elements from many underlying asynchronous sequences -/// -/// In a `AsyncMergeSequence` instance, the *i*th element is the *i*th element -/// resolved in sequential order out of the two underlying asynchronous sequences. -/// Use the `merge(...)` function to create an `AsyncMergeSequence`. -public struct AsyncMergeSequence: AsyncSequence { - public typealias Element = Base.Element - public typealias AsyncIterator = Iterator - - let bases: [Base] - - public init(_ bases: [Base]) { - self.bases = bases - } - - public func makeAsyncIterator() -> Iterator { - Iterator( - bases: self.bases - ) - } - - public struct Iterator: AsyncIteratorProtocol { - let mergeStateMachine: MergeStateMachine - - init(bases: [Base]) { - self.mergeStateMachine = MergeStateMachine( - bases - ) - } - - public mutating func next() async rethrows -> Element? { - let mergedElement = await self.mergeStateMachine.next() - switch mergedElement { - case .element(let result): - return try result._rethrowGet() - case .termination: - return nil - } - } - } -} - -extension AsyncMergeSequence: Sendable where Base: Sendable {} diff --git a/Sources/Combiners/Merge/MergeStateMachine.swift b/Sources/Combiners/Merge/MergeStateMachine.swift deleted file mode 100644 index 3cbec30..0000000 --- a/Sources/Combiners/Merge/MergeStateMachine.swift +++ /dev/null @@ -1,249 +0,0 @@ -// -// MergeStateMachine.swift -// -// -// Created by Thibault Wittemberg on 08/09/2022. -// - -import DequeModule - -struct MergeStateMachine: Sendable { - enum BufferState { - case idle - case queued(Deque>) - case awaiting(UnsafeContinuation, Never>) - case closed - } - - struct State { - var buffer: BufferState - var basesToTerminate: Int - } - - struct OnNextDecision { - let continuation: UnsafeContinuation, Never> - let regulatedElement: RegulatedElement - } - - let requestNextRegulatedElements: @Sendable () -> Void - let state: ManagedCriticalState - let task: Task - - init( - _ base1: Base1, - _ base2: Base2 - ) where Base1.Element == Element, Base2.Element == Element { - self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: 2)) - - let regulator1 = Regulator(base1, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) - let regulator2 = Regulator(base2, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) - - self.requestNextRegulatedElements = { - regulator1.requestNextRegulatedElement() - regulator2.requestNextRegulatedElement() - } - - self.task = Task { - await withTaskGroup(of: Void.self) { group in - group.addTask { - await regulator1.iterate() - } - - group.addTask { - await regulator2.iterate() - } - } - } - } - - init( - _ base1: Base1, - _ base2: Base2, - _ base3: Base3 - ) where Base1.Element == Element, Base2.Element == Element, Base3.Element == Base1.Element { - self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: 3)) - - let regulator1 = Regulator(base1, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) - let regulator2 = Regulator(base2, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) - let regulator3 = Regulator(base3, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) - - self.requestNextRegulatedElements = { - regulator1.requestNextRegulatedElement() - regulator2.requestNextRegulatedElement() - regulator3.requestNextRegulatedElement() - } - - self.task = Task { - await withTaskGroup(of: Void.self) { group in - group.addTask { - await regulator1.iterate() - } - - group.addTask { - await regulator2.iterate() - } - - group.addTask { - await regulator3.iterate() - } - } - } - } - - init( - _ bases: [Base] - ) where Base.Element == Element { - self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: bases.count)) - - var regulators = [Regulator]() - - for base in bases { - let regulator = Regulator(base, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) - regulators.append(regulator) - } - - let immutableRegulators = regulators - self.requestNextRegulatedElements = { - for regulator in immutableRegulators { - regulator.requestNextRegulatedElement() - } - } - - self.task = Task { - await withTaskGroup(of: Void.self) { group in - for regulators in immutableRegulators { - group.addTask { - await regulators.iterate() - } - } - } - } - } - - @Sendable - static func onNextRegulatedElement(_ element: RegulatedElement, state: ManagedCriticalState) { - let decision = state.withCriticalRegion { state -> OnNextDecision? in - switch (state.buffer, element) { - case (.idle, .element): - state.buffer = .queued([element]) - return nil - case (.queued(var elements), .element): - elements.append(element) - state.buffer = .queued(elements) - return nil - case (.awaiting(let continuation), .element(.success)): - state.buffer = .idle - return OnNextDecision(continuation: continuation, regulatedElement: element) - case (.awaiting(let continuation), .element(.failure)): - state.buffer = .closed - return OnNextDecision(continuation: continuation, regulatedElement: element) - - case (.idle, .termination): - state.basesToTerminate -= 1 - if state.basesToTerminate == 0 { - state.buffer = .closed - } else { - state.buffer = .idle - } - return nil - - case (.queued(var elements), .termination): - state.basesToTerminate -= 1 - if state.basesToTerminate == 0 { - elements.append(.termination) - state.buffer = .queued(elements) - } - return nil - - case (.awaiting(let continuation), .termination): - state.basesToTerminate -= 1 - if state.basesToTerminate == 0 { - state.buffer = .closed - return OnNextDecision(continuation: continuation, regulatedElement: .termination) - } else { - state.buffer = .awaiting(continuation) - return nil - } - - case (.closed, _): - return nil - } - } - - if let decision = decision { - decision.continuation.resume(returning: decision.regulatedElement) - } - } - - @Sendable - func unsuspendAndClearOnCancel() { - let continuation = self.state.withCriticalRegion { state -> UnsafeContinuation, Never>? in - switch state.buffer { - case .awaiting(let continuation): - state.basesToTerminate = 0 - state.buffer = .closed - return continuation - default: - state.basesToTerminate = 0 - state.buffer = .closed - return nil - } - } - - continuation?.resume(returning: .termination) - self.task.cancel() - } - - func next() async -> RegulatedElement { - await withTaskCancellationHandler { - self.unsuspendAndClearOnCancel() - } operation: { - self.requestNextRegulatedElements() - - let regulatedElement = await withUnsafeContinuation { (continuation: UnsafeContinuation, Never>) in - let decision = self.state.withCriticalRegion { state -> OnNextDecision? in - switch state.buffer { - case .queued(var elements): - guard let regulatedElement = elements.popFirst() else { - assertionFailure("The buffer cannot by empty, it should be idle in this case") - return OnNextDecision(continuation: continuation, regulatedElement: .termination) - } - switch regulatedElement { - case .termination: - state.buffer = .closed - return OnNextDecision(continuation: continuation, regulatedElement: .termination) - case .element(.success): - if elements.isEmpty { - state.buffer = .idle - } else { - state.buffer = .queued(elements) - } - return OnNextDecision(continuation: continuation, regulatedElement: regulatedElement) - case .element(.failure): - state.buffer = .closed - return OnNextDecision(continuation: continuation, regulatedElement: regulatedElement) - } - case .idle: - state.buffer = .awaiting(continuation) - return nil - case .awaiting: - assertionFailure("The next function cannot be called concurrently") - return OnNextDecision(continuation: continuation, regulatedElement: .termination) - case .closed: - return OnNextDecision(continuation: continuation, regulatedElement: .termination) - } - } - - if let decision = decision { - decision.continuation.resume(returning: decision.regulatedElement) - } - } - - if case .termination = regulatedElement, case .element(.failure) = regulatedElement { - self.task.cancel() - } - - return regulatedElement - } - } -} diff --git a/Sources/Combiners/Zip/AsyncZip2Sequence.swift b/Sources/Combiners/Zip/AsyncZip2Sequence.swift deleted file mode 100644 index d9ebe93..0000000 --- a/Sources/Combiners/Zip/AsyncZip2Sequence.swift +++ /dev/null @@ -1,61 +0,0 @@ -// -// AsyncZip2Sequence.swift -// -// -// Created by Thibault Wittemberg on 13/01/2022. -// - -/// `zip` produces an `AsyncSequence` that combines the latest elements from two sequences according to their temporality -/// and emits a tuple to the client. If any Async Sequence ends successfully or fails with an error, so to does the zipped -/// Async Sequence. -/// -/// ``` -/// let asyncSequence1 = [1, 2, 3, 4, 5].async -/// let asyncSequence2 = ["1", "2", "3", "4", "5"].async -/// -/// let zippedAsyncSequence = zip(asyncSequence1, asyncSequence2) -/// -/// for await element in zippedAsyncSequence { -/// print(element) // will print -> (1, "1") (2, "2") (3, "3") (4, "4") (5, "5") -/// } -/// ``` -/// Use the `zip(_:_:)` function to create an `AsyncZip2Sequence`. -public func zip( - _ base1: Base1, - _ base2: Base2 -) -> AsyncZip2Sequence { - AsyncZip2Sequence(base1, base2) -} - -public struct AsyncZip2Sequence: AsyncSequence -where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element: Sendable { - public typealias Element = (Base1.Element, Base2.Element) - public typealias AsyncIterator = Iterator - - let base1: Base1 - let base2: Base2 - - init(_ base1: Base1, _ base2: Base2) { - self.base1 = base1 - self.base2 = base2 - } - - public func makeAsyncIterator() -> AsyncIterator { - Iterator( - base1, - base2 - ) - } - - public struct Iterator: AsyncIteratorProtocol { - let runtime: Zip2Runtime - - init(_ base1: Base1, _ base2: Base2) { - self.runtime = Zip2Runtime(base1, base2) - } - - public func next() async rethrows -> Element? { - try await self.runtime.next() - } - } -} diff --git a/Sources/Combiners/Zip/AsyncZip3Sequence.swift b/Sources/Combiners/Zip/AsyncZip3Sequence.swift deleted file mode 100644 index 89d8a24..0000000 --- a/Sources/Combiners/Zip/AsyncZip3Sequence.swift +++ /dev/null @@ -1,66 +0,0 @@ -// -// AsyncZip3Sequence.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -/// `zip` produces an `AsyncSequence` that combines the latest elements from three sequences according to their temporality -/// and emits a tuple to the client. If any Async Sequence ends successfully or fails with an error, so to does the zipped -/// Async Sequence. -/// -/// ``` -/// let asyncSequence1 = [1, 2, 3, 4, 5].async -/// let asyncSequence2 = ["1", "2", "3", "4", "5"].async -/// let asyncSequence3 = ["A", "B", "C", "D", "E"].async -/// -/// let zippedAsyncSequence = zip(asyncSequence1, asyncSequence2, asyncSequence3) -/// -/// for await element in zippedAsyncSequence { -/// print(element) // will print -> (1, "1", "A") (2, "2", "B") (3, "3", "V") (4, "4", "D") (5, "5", "E") -/// } -/// ``` -/// Use the `zip(_:_:_:)` function to create an `AsyncZip3Sequence`. -public func zip( - _ base1: Base1, - _ base2: Base2, - _ base3: Base3 -) -> AsyncZip3Sequence { - AsyncZip3Sequence(base1, base2, base3) -} - -public struct AsyncZip3Sequence: AsyncSequence -where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element: Sendable, Base3: Sendable, Base3.Element: Sendable { - public typealias Element = (Base1.Element, Base2.Element, Base3.Element) - public typealias AsyncIterator = Iterator - - let base1: Base1 - let base2: Base2 - let base3: Base3 - - init(_ base1: Base1, _ base2: Base2, _ base3: Base3) { - self.base1 = base1 - self.base2 = base2 - self.base3 = base3 - } - - public func makeAsyncIterator() -> AsyncIterator { - Iterator( - base1, - base2, - base3 - ) - } - - public struct Iterator: AsyncIteratorProtocol { - let runtime: Zip3Runtime - - init(_ base1: Base1, _ base2: Base2, _ base3: Base3) { - self.runtime = Zip3Runtime(base1, base2, base3) - } - - public func next() async rethrows -> Element? { - try await self.runtime.next() - } - } -} diff --git a/Sources/Combiners/Zip/AsyncZipSequence.swift b/Sources/Combiners/Zip/AsyncZipSequence.swift deleted file mode 100644 index 74b0b01..0000000 --- a/Sources/Combiners/Zip/AsyncZipSequence.swift +++ /dev/null @@ -1,56 +0,0 @@ -// -// AsyncZipSequence.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -/// `zip` produces an `AsyncSequence` that combines the latest elements from sequences according to their temporality -/// and emits an array to the client. If any Async Sequence ends successfully or fails with an error, so to does the zipped -/// Async Sequence. -/// -/// ``` -/// let asyncSequence1 = [1, 2, 3, 4, 5].async -/// let asyncSequence2 = [1, 2, 3, 4, 5].async -/// let asyncSequence3 = [1, 2, 3, 4, 5].async -/// let asyncSequence4 = [1, 2, 3, 4, 5].async -/// let asyncSequence5 = [1, 2, 3, 4, 5].async -/// -/// let zippedAsyncSequence = zip(asyncSequence1, asyncSequence2, asyncSequence3, asyncSequence4, asyncSequence5) -/// -/// for await element in zippedAsyncSequence { -/// print(element) // will print -> [1, 1, 1, 1, 1] [2, 2, 2, 2, 2] [3, 3, 3, 3, 3] [4, 4, 4, 4, 4] [5, 5, 5, 5, 5] -/// } -/// ``` -/// Use the `zip(_:)` function to create an `AsyncZipSequence`. -public func zip(_ bases: Base...) -> AsyncZipSequence { - AsyncZipSequence(bases) -} - -public struct AsyncZipSequence: AsyncSequence -where Base: Sendable, Base.Element: Sendable { - public typealias Element = [Base.Element] - public typealias AsyncIterator = Iterator - - let bases: [Base] - - init(_ bases: [Base]) { - self.bases = bases - } - - public func makeAsyncIterator() -> AsyncIterator { - Iterator(bases) - } - - public struct Iterator: AsyncIteratorProtocol { - let runtime: ZipRuntime - - init(_ bases: [Base]) { - self.runtime = ZipRuntime(bases) - } - - public func next() async rethrows -> Element? { - try await self.runtime.next() - } - } -} diff --git a/Sources/Combiners/Zip/Zip2Runtime.swift b/Sources/Combiners/Zip/Zip2Runtime.swift deleted file mode 100644 index 699b8ca..0000000 --- a/Sources/Combiners/Zip/Zip2Runtime.swift +++ /dev/null @@ -1,214 +0,0 @@ -// -// Zip2Runtime.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -final class Zip2Runtime: Sendable -where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element: Sendable { - typealias ZipStateMachine = Zip2StateMachine - - private let stateMachine = ManagedCriticalState(ZipStateMachine()) - - init(_ base1: Base1, _ base2: Base2) { - self.stateMachine.withCriticalRegion { machine in - machine.taskIsStarted(task: Task { - await withTaskGroup(of: Void.self) { group in - group.addTask { - var base1Iterator = base1.makeAsyncIterator() - - do { - while true { - await withUnsafeContinuation { (continuation: UnsafeContinuation) in - let output = self.stateMachine.withCriticalRegion { machine in - machine.newLoopFromBase1(suspendedBase: continuation) - } - - self.handle(newLoopFromBaseOutput: output) - } - - guard let element1 = try await base1Iterator.next() else { - break - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.base1HasProducedElement(element: element1) - } - - self.handle(baseHasProducedElementOutput: output) - } - } catch { - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseHasProducedFailure(error: error) - } - - self.handle(baseHasProducedFailureOutput: output) - } - - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.baseIsFinished() - } - - self.handle(baseIsFinishedOutput: output) - } - - group.addTask { - var base2Iterator = base2.makeAsyncIterator() - - do { - while true { - await withUnsafeContinuation { (continuation: UnsafeContinuation) in - let output = self.stateMachine.withCriticalRegion { machine in - machine.newLoopFromBase2(suspendedBase: continuation) - } - - self.handle(newLoopFromBaseOutput: output) - } - - guard let element2 = try await base2Iterator.next() else { - break - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.base2HasProducedElement(element: element2) - } - - self.handle(baseHasProducedElementOutput: output) - } - } catch { - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseHasProducedFailure(error: error) - } - - self.handle(baseHasProducedFailureOutput: output) - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseIsFinished() - } - - self.handle(baseIsFinishedOutput: output) - } - } - }) - } - } - - private func handle(newLoopFromBaseOutput: ZipStateMachine.NewLoopFromBaseOutput) { - switch newLoopFromBaseOutput { - case .none: - break - - case .resumeBases(let suspendedBases): - suspendedBases.forEach { $0.resume() } - - case .terminate(let task, let suspendedBase, let suspendedDemand): - suspendedBase.resume() - suspendedDemand?.resume(returning: nil) - task?.cancel() - } - } - - private func handle(baseHasProducedElementOutput: ZipStateMachine.BaseHasProducedElementOutput) { - switch baseHasProducedElementOutput { - case .none: - break - - case .resumeDemand(let suspendedDemand, let result1, let result2): - suspendedDemand?.resume(returning: (result1, result2)) - - case .terminate(let task, let suspendedBases): - suspendedBases?.forEach { $0.resume() } - task?.cancel() - } - } - - private func handle(baseHasProducedFailureOutput: ZipStateMachine.BaseHasProducedFailureOutput) { - switch baseHasProducedFailureOutput { - case .resumeDemandAndTerminate(let task, let suspendedDemand, let suspendedBases, let result1, let result2): - suspendedDemand?.resume(returning: (result1, result2)) - suspendedBases.forEach { $0.resume() } - task?.cancel() - - case .terminate(let task, let suspendedBases): - suspendedBases?.forEach { $0.resume() } - task?.cancel() - } - } - - private func handle(baseIsFinishedOutput: ZipStateMachine.BaseIsFinishedOutput) { - switch baseIsFinishedOutput { - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - func next() async rethrows -> (Base1.Element, Base2.Element)? { - try await withTaskCancellationHandler { - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.rootTaskIsCancelled() - } - - self.handle(rootTaskIsCancelledOutput: output) - } operation: { - let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result, Result)?, Never>) in - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.newDemandFromConsumer(suspendedDemand: continuation) - } - - self.handle(newDemandFromConsumerOutput: output) - } - - guard let results = results else { - return nil - } - - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.demandIsFulfilled() - } - - self.handle(demandIsFulfilledOutput: output) - - return try (results.0._rethrowGet(), results.1._rethrowGet()) - } - } - - private func handle(rootTaskIsCancelledOutput: ZipStateMachine.RootTaskIsCancelledOutput) { - switch rootTaskIsCancelledOutput { - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - private func handle(newDemandFromConsumerOutput: ZipStateMachine.NewDemandFromConsumerOutput) { - switch newDemandFromConsumerOutput { - case .none: - break - - case .resumeBases(let suspendedBases): - suspendedBases.forEach { $0.resume() } - - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - private func handle(demandIsFulfilledOutput: ZipStateMachine.DemandIsFulfilledOutput) { - switch demandIsFulfilledOutput { - case .none: - break - - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0.resume(returning: nil) } - task?.cancel() - } - } -} diff --git a/Sources/Combiners/Zip/Zip2StateMachine.swift b/Sources/Combiners/Zip/Zip2StateMachine.swift deleted file mode 100644 index 4b3b43e..0000000 --- a/Sources/Combiners/Zip/Zip2StateMachine.swift +++ /dev/null @@ -1,373 +0,0 @@ -// -// Zip2StateMachine.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -struct Zip2StateMachine: Sendable -where Element1: Sendable, Element2: Sendable { - private enum State { - case initial - case started(task: Task) - case awaitingDemandFromConsumer( - task: Task?, - suspendedBases: [UnsafeContinuation] - ) - case awaitingBaseResults( - task: Task?, - result1: Result?, - result2: Result?, - suspendedBases: [UnsafeContinuation], - suspendedDemand: UnsafeContinuation<(Result, Result)?, Never>? - ) - case finished - } - - private var state: State = .initial - - mutating func taskIsStarted(task: Task) { - switch self.state { - case .initial: - self.state = .started(task: task) - - default: - assertionFailure("Inconsistent state, the task cannot start while the state is other than initial") - } - } - - enum NewDemandFromConsumerOutput { - case none - case resumeBases(suspendedBases: [UnsafeContinuation]) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result)?, Never>?]? - ) - } - - mutating func newDemandFromConsumer( - suspendedDemand: UnsafeContinuation<(Result, Result)?, Never> - ) -> NewDemandFromConsumerOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: [suspendedDemand]) - - case .started(let task): - self.state = .awaitingBaseResults(task: task, result1: nil, result2: nil, suspendedBases: [], suspendedDemand: suspendedDemand) - return .none - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .awaitingBaseResults(task: task, result1: nil, result2: nil, suspendedBases: [], suspendedDemand: suspendedDemand) - return .resumeBases(suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, _, let suspendedBases, let oldSuspendedDemand): - assertionFailure("Inconsistent state, a demand is already suspended") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [oldSuspendedDemand, suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: [suspendedDemand]) - } - } - - enum NewLoopFromBaseOutput { - case none - case resumeBases(suspendedBases: [UnsafeContinuation]) - case terminate( - task: Task?, - suspendedBase: UnsafeContinuation, - suspendedDemand: UnsafeContinuation<(Result, Result)?, Never>? - ) - } - - mutating func newLoopFromBase1(suspendedBase: UnsafeContinuation) -> NewLoopFromBaseOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - - case .started(let task): - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: [suspendedBase]) - return .none - - case .awaitingDemandFromConsumer(let task, var suspendedBases): - assert(suspendedBases.count < 2, "There cannot be more than 2 suspended base at the same time") - suspendedBases.append(suspendedBase) - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .awaitingBaseResults(let task, let result1, let result2, var suspendedBases, let suspendedDemand): - assert(suspendedBases.count < 2, "There cannot be more than 2 suspended bases at the same time") - if result1 != nil { - suspendedBases.append(suspendedBase) - self.state = .awaitingBaseResults(task: task, result1: result1, result2: result2, suspendedBases: suspendedBases, suspendedDemand: suspendedDemand) - return .none - } else { - self.state = .awaitingBaseResults(task: task, result1: result1, result2: result2, suspendedBases: suspendedBases, suspendedDemand: suspendedDemand) - return .resumeBases(suspendedBases: [suspendedBase]) - } - - case .finished: - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - } - } - - mutating func newLoopFromBase2(suspendedBase: UnsafeContinuation) -> NewLoopFromBaseOutput { - switch state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - - case .started(let task): - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: [suspendedBase]) - return .none - - case .awaitingDemandFromConsumer(let task, var suspendedBases): - assert(suspendedBases.count < 2, "There cannot be more than 2 suspended base at the same time") - suspendedBases.append(suspendedBase) - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .awaitingBaseResults(let task, let result1, let result2, var suspendedBases, let suspendedDemand): - assert(suspendedBases.count < 2, "There cannot be more than 2 suspended bases at the same time") - if result2 != nil { - suspendedBases.append(suspendedBase) - self.state = .awaitingBaseResults(task: task, result1: result1, result2: result2, suspendedBases: suspendedBases, suspendedDemand: suspendedDemand) - return .none - } else { - self.state = .awaitingBaseResults(task: task, result1: result1, result2: result2, suspendedBases: suspendedBases, suspendedDemand: suspendedDemand) - return .resumeBases(suspendedBases: [suspendedBase]) - } - - case .finished: - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - } - } - - enum BaseHasProducedElementOutput { - case none - case resumeDemand( - suspendedDemand: UnsafeContinuation<(Result, Result)?, Never>?, - result1: Result, - result2: Result - ) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]? - ) - } - - mutating func base1HasProducedElement(element: Element1) -> BaseHasProducedElementOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, let result2, let suspendedBases, let suspendedDemand): - if let result2 = result2 { - self.state = .awaitingBaseResults(task: task, result1: .success(element), result2: result2, suspendedBases: suspendedBases, suspendedDemand: nil) - return .resumeDemand(suspendedDemand: suspendedDemand, result1: .success(element), result2: result2) - } else { - self.state = .awaitingBaseResults(task: task, result1: .success(element), result2: nil, suspendedBases: suspendedBases, suspendedDemand: suspendedDemand) - return .none - } - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - enum BaseHasProducedFailureOutput { - case resumeDemandAndTerminate( - task: Task?, - suspendedDemand: UnsafeContinuation<(Result, Result)?, Never>?, - suspendedBases: [UnsafeContinuation], - result1: Result, - result2: Result - ) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]? - ) - } - - mutating func baseHasProducedFailure(error: any Error) -> BaseHasProducedFailureOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .resumeDemandAndTerminate( - task: task, - suspendedDemand: suspendedDemand, - suspendedBases: suspendedBases, - result1: .failure(error), - result2: .failure(error) - ) - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - mutating func base2HasProducedElement(element: Element2) -> BaseHasProducedElementOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, let result1, _, let suspendedBases, let suspendedDemand): - if let result1 = result1 { - self.state = .awaitingBaseResults(task: task, result1: result1, result2: .success(element), suspendedBases: suspendedBases, suspendedDemand: nil) - return .resumeDemand(suspendedDemand: suspendedDemand, result1: result1, result2: .success(element)) - } else { - self.state = .awaitingBaseResults(task: task, result1: nil, result2: .success(element), suspendedBases: suspendedBases, suspendedDemand: suspendedDemand) - return .none - } - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - enum DemandIsFulfilledOutput { - case none - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result)?, Never>]? - ) - } - - mutating func demandIsFulfilled() -> DemandIsFulfilledOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - assertionFailure("Inconsistent state, results are not yet available to be acknowledged") - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, results are not yet available to be acknowledged") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, let result1, let result2, let suspendedBases, let suspendedDemand): - assert(suspendedDemand == nil, "Inconsistent state, there cannot be a suspended demand when ackowledging the demand") - assert(result1 != nil && result2 != nil, "Inconsistent state, all results are not yet available to be acknowledged") - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } - - enum RootTaskIsCancelledOutput { - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result)?, Never>?]? - ) - } - - mutating func rootTaskIsCancelled() -> RootTaskIsCancelledOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, _, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } - - enum BaseIsFinishedOutput { - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result)?, Never>?]? - ) - } - - mutating func baseIsFinished() -> BaseIsFinishedOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, _, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } -} diff --git a/Sources/Combiners/Zip/Zip3Runtime.swift b/Sources/Combiners/Zip/Zip3Runtime.swift deleted file mode 100644 index eeb98f9..0000000 --- a/Sources/Combiners/Zip/Zip3Runtime.swift +++ /dev/null @@ -1,252 +0,0 @@ -// -// Zip3Runtime.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -final class Zip3Runtime: Sendable -where Base1: Sendable, Base1.Element: Sendable, Base2: Sendable, Base2.Element: Sendable, Base3: Sendable, Base3.Element: Sendable { - typealias ZipStateMachine = Zip3StateMachine - - private let stateMachine = ManagedCriticalState(ZipStateMachine()) - - init(_ base1: Base1, _ base2: Base2, _ base3: Base3) { - self.stateMachine.withCriticalRegion { machine in - machine.taskIsStarted(task: Task { - await withTaskGroup(of: Void.self) { group in - group.addTask { - var base1Iterator = base1.makeAsyncIterator() - - do { - while true { - await withUnsafeContinuation { (continuation: UnsafeContinuation) in - let output = self.stateMachine.withCriticalRegion { machine in - machine.newLoopFromBase1(suspendedBase: continuation) - } - - self.handle(newLoopFromBaseOutput: output) - } - - guard let element1 = try await base1Iterator.next() else { - break - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.base1HasProducedElement(element: element1) - } - - self.handle(baseHasProducedElementOutput: output) - } - } catch { - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseHasProducedFailure(error: error) - } - - self.handle(baseHasProducedFailureOutput: output) - } - - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.baseIsFinished() - } - - self.handle(baseIsFinishedOutput: output) - } - - group.addTask { - var base2Iterator = base2.makeAsyncIterator() - - do { - while true { - await withUnsafeContinuation { (continuation: UnsafeContinuation) in - let output = self.stateMachine.withCriticalRegion { machine in - machine.newLoopFromBase2(suspendedBase: continuation) - } - - self.handle(newLoopFromBaseOutput: output) - } - - guard let element2 = try await base2Iterator.next() else { - break - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.base2HasProducedElement(element: element2) - } - - self.handle(baseHasProducedElementOutput: output) - } - } catch { - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseHasProducedFailure(error: error) - } - - self.handle(baseHasProducedFailureOutput: output) - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseIsFinished() - } - - self.handle(baseIsFinishedOutput: output) - } - - group.addTask { - var base3Iterator = base3.makeAsyncIterator() - - do { - while true { - await withUnsafeContinuation { (continuation: UnsafeContinuation) in - let output = self.stateMachine.withCriticalRegion { machine in - machine.newLoopFromBase3(suspendedBase: continuation) - } - - self.handle(newLoopFromBaseOutput: output) - } - - guard let element3 = try await base3Iterator.next() else { - break - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.base3HasProducedElement(element: element3) - } - - self.handle(baseHasProducedElementOutput: output) - } - } catch { - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseHasProducedFailure(error: error) - } - - self.handle(baseHasProducedFailureOutput: output) - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseIsFinished() - } - - self.handle(baseIsFinishedOutput: output) - } - } - }) - } - } - - private func handle(newLoopFromBaseOutput: ZipStateMachine.NewLoopFromBaseOutput) { - switch newLoopFromBaseOutput { - case .none: - break - - case .resumeBases(let suspendedBases): - suspendedBases.forEach { $0.resume() } - - case .terminate(let task, let suspendedBase, let suspendedDemand): - suspendedBase.resume() - suspendedDemand?.resume(returning: nil) - task?.cancel() - } - } - - private func handle(baseHasProducedElementOutput: ZipStateMachine.BaseHasProducedElementOutput) { - switch baseHasProducedElementOutput { - case .none: - break - - case .resumeDemand(let suspendedDemand, let result1, let result2, let result3): - suspendedDemand?.resume(returning: (result1, result2, result3)) - - case .terminate(let task, let suspendedBases): - suspendedBases?.forEach { $0.resume() } - task?.cancel() - } - } - - private func handle(baseHasProducedFailureOutput: ZipStateMachine.BaseHasProducedFailureOutput) { - switch baseHasProducedFailureOutput { - case .resumeDemandAndTerminate(let task, let suspendedDemand, let suspendedBases, let result1, let result2, let result3): - suspendedDemand?.resume(returning: (result1, result2, result3)) - suspendedBases.forEach { $0.resume() } - task?.cancel() - - case .terminate(let task, let suspendedBases): - suspendedBases?.forEach { $0.resume() } - task?.cancel() - } - } - - private func handle(baseIsFinishedOutput: ZipStateMachine.BaseIsFinishedOutput) { - switch baseIsFinishedOutput { - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - func next() async rethrows -> (Base1.Element, Base2.Element, Base3.Element)? { - try await withTaskCancellationHandler { - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.rootTaskIsCancelled() - } - - self.handle(rootTaskIsCancelledOutput: output) - } operation: { - let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<(Result, Result, Result)?, Never>) in - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.newDemandFromConsumer(suspendedDemand: continuation) - } - - self.handle(newDemandFromConsumerOutput: output) - } - - guard let results = results else { - return nil - } - - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.demandIsFulfilled() - } - - self.handle(demandIsFulfilledOutput: output) - - return try (results.0._rethrowGet(), results.1._rethrowGet(), results.2._rethrowGet()) - } - } - - private func handle(rootTaskIsCancelledOutput: ZipStateMachine.RootTaskIsCancelledOutput) { - switch rootTaskIsCancelledOutput { - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - private func handle(newDemandFromConsumerOutput: ZipStateMachine.NewDemandFromConsumerOutput) { - switch newDemandFromConsumerOutput { - case .none: - break - - case .resumeBases(let suspendedBases): - suspendedBases.forEach { $0.resume() } - - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - private func handle(demandIsFulfilledOutput: ZipStateMachine.DemandIsFulfilledOutput) { - switch demandIsFulfilledOutput { - case .none: - break - - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0.resume(returning: nil) } - task?.cancel() - } - } -} diff --git a/Sources/Combiners/Zip/Zip3StateMachine.swift b/Sources/Combiners/Zip/Zip3StateMachine.swift deleted file mode 100644 index 3d3ed82..0000000 --- a/Sources/Combiners/Zip/Zip3StateMachine.swift +++ /dev/null @@ -1,542 +0,0 @@ -// -// Zip3StateMachine.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -struct Zip3StateMachine: Sendable -where Element1: Sendable, Element2: Sendable, Element3: Sendable { - private enum State { - case initial - case started(task: Task) - case awaitingDemandFromConsumer( - task: Task?, - suspendedBases: [UnsafeContinuation] - ) - case awaitingBaseResults( - task: Task?, - result1: Result?, - result2: Result?, - result3: Result?, - suspendedBases: [UnsafeContinuation], - suspendedDemand: UnsafeContinuation<(Result, Result, Result)?, Never>? - ) - case finished - } - - private var state: State = .initial - - mutating func taskIsStarted(task: Task) { - switch self.state { - case .initial: - self.state = .started(task: task) - - default: - assertionFailure("Inconsistent state, the task cannot start while the state is other than initial") - } - } - - enum NewDemandFromConsumerOutput { - case none - case resumeBases(suspendedBases: [UnsafeContinuation]) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result, Result)?, Never>?]? - ) - } - - mutating func newDemandFromConsumer( - suspendedDemand: UnsafeContinuation<(Result, Result, Result)?, Never> - ) -> NewDemandFromConsumerOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: [suspendedDemand]) - - case .started(let task): - self.state = .awaitingBaseResults( - task: task, - result1: nil, - result2: nil, - result3: nil, - suspendedBases: [], - suspendedDemand: suspendedDemand - ) - return .none - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .awaitingBaseResults( - task: task, - result1: nil, - result2: nil, - result3: nil, - suspendedBases: [], - suspendedDemand: suspendedDemand - ) - return .resumeBases(suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, _, _, let suspendedBases, let oldSuspendedDemand): - assertionFailure("Inconsistent state, a demand is already suspended") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [oldSuspendedDemand, suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: [suspendedDemand]) - } - } - - enum NewLoopFromBaseOutput { - case none - case resumeBases(suspendedBases: [UnsafeContinuation]) - case terminate( - task: Task?, - suspendedBase: UnsafeContinuation, - suspendedDemand: UnsafeContinuation<(Result, Result, Result)?, Never>? - ) - } - - mutating func newLoopFromBase1(suspendedBase: UnsafeContinuation) -> NewLoopFromBaseOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - - case .started(let task): - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: [suspendedBase]) - return .none - - case .awaitingDemandFromConsumer(let task, var suspendedBases): - assert(suspendedBases.count < 3, "There cannot be more than 3 suspended base at the same time") - suspendedBases.append(suspendedBase) - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .awaitingBaseResults(let task, let result1, let result2, let result3, var suspendedBases, let suspendedDemand): - assert(suspendedBases.count < 3, "There cannot be more than 3 suspended bases at the same time") - if result1 != nil { - suspendedBases.append(suspendedBase) - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .none - } else { - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .resumeBases(suspendedBases: [suspendedBase]) - } - - case .finished: - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - } - } - - mutating func newLoopFromBase2(suspendedBase: UnsafeContinuation) -> NewLoopFromBaseOutput { - switch state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - - case .started(let task): - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: [suspendedBase]) - return .none - - case .awaitingDemandFromConsumer(let task, var suspendedBases): - assert(suspendedBases.count < 3, "There cannot be more than 3 suspended base at the same time") - suspendedBases.append(suspendedBase) - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .awaitingBaseResults(let task, let result1, let result2, let result3, var suspendedBases, let suspendedDemand): - assert(suspendedBases.count < 3, "There cannot be more than 3 suspended bases at the same time") - if result2 != nil { - suspendedBases.append(suspendedBase) - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .none - } else { - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .resumeBases(suspendedBases: [suspendedBase]) - } - - case .finished: - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - } - } - - mutating func newLoopFromBase3(suspendedBase: UnsafeContinuation) -> NewLoopFromBaseOutput { - switch state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - - case .started(let task): - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: [suspendedBase]) - return .none - - case .awaitingDemandFromConsumer(let task, var suspendedBases): - assert(suspendedBases.count < 3, "There cannot be more than 3 suspended base at the same time") - suspendedBases.append(suspendedBase) - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .awaitingBaseResults(let task, let result1, let result2, let result3, var suspendedBases, let suspendedDemand): - assert(suspendedBases.count < 3, "There cannot be more than 3 suspended bases at the same time") - if result3 != nil { - suspendedBases.append(suspendedBase) - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .none - } else { - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .resumeBases(suspendedBases: [suspendedBase]) - } - - case .finished: - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - } - } - - enum BaseHasProducedElementOutput { - case none - case resumeDemand( - suspendedDemand: UnsafeContinuation<(Result, Result, Result)?, Never>?, - result1: Result, - result2: Result, - result3: Result - ) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]? - ) - } - - mutating func base1HasProducedElement(element: Element1) -> BaseHasProducedElementOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, let result2, let result3, let suspendedBases, let suspendedDemand): - if let result2 = result2, let result3 = result3 { - self.state = .awaitingBaseResults( - task: task, - result1: .success(element), - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: nil - ) - return .resumeDemand(suspendedDemand: suspendedDemand, result1: .success(element), result2: result2, result3: result3) - } else { - self.state = .awaitingBaseResults( - task: task, - result1: .success(element), - result2: result2, - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .none - } - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - enum BaseHasProducedFailureOutput { - case resumeDemandAndTerminate( - task: Task?, - suspendedDemand: UnsafeContinuation<(Result, Result, Result)?, Never>?, - suspendedBases: [UnsafeContinuation], - result1: Result, - result2: Result, - result3: Result - ) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]? - ) - } - - mutating func base2HasProducedElement(element: Element2) -> BaseHasProducedElementOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, let result1, _, let result3, let suspendedBases, let suspendedDemand): - if let result1 = result1, let result3 = result3 { - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: .success(element), - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: nil - ) - return .resumeDemand(suspendedDemand: suspendedDemand, result1: result1, result2: .success(element), result3: result3) - } else { - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: .success(element), - result3: result3, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .none - } - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - mutating func base3HasProducedElement(element: Element3) -> BaseHasProducedElementOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, let result1, let result2, _, let suspendedBases, let suspendedDemand): - if let result1 = result1, let result2 = result2 { - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: .success(element), - suspendedBases: suspendedBases, - suspendedDemand: nil - ) - return .resumeDemand(suspendedDemand: suspendedDemand, result1: result1, result2: result2, result3: .success(element)) - } else { - self.state = .awaitingBaseResults( - task: task, - result1: result1, - result2: result2, - result3: .success(element), - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .none - } - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - mutating func baseHasProducedFailure(error: Error) -> BaseHasProducedFailureOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, _, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .resumeDemandAndTerminate( - task: task, - suspendedDemand: suspendedDemand, - suspendedBases: suspendedBases, - result1: .failure(error), - result2: .failure(error), - result3: .failure(error) - ) - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - enum DemandIsFulfilledOutput { - case none - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result)?, Never>]? - ) - } - - mutating func demandIsFulfilled() -> DemandIsFulfilledOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - assertionFailure("Inconsistent state, results are not yet available to be acknowledged") - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, results are not yet available to be acknowledged") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, let result1, let result2, let result3, let suspendedBases, let suspendedDemand): - assert(suspendedDemand == nil, "Inconsistent state, there cannot be a suspended demand when ackowledging the demand") - assert( - result1 != nil && result2 != nil && result3 != nil, - "Inconsistent state, all results are not yet available to be acknowledged" - ) - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } - - enum RootTaskIsCancelledOutput { - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result, Result)?, Never>?]? - ) - } - - mutating func rootTaskIsCancelled() -> RootTaskIsCancelledOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, _, _, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } - - enum BaseIsFinishedOutput { - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<(Result, Result, Result)?, Never>?]? - ) - } - - mutating func baseIsFinished() -> BaseIsFinishedOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, _, _, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } -} diff --git a/Sources/Combiners/Zip/ZipRuntime.swift b/Sources/Combiners/Zip/ZipRuntime.swift deleted file mode 100644 index be04f9d..0000000 --- a/Sources/Combiners/Zip/ZipRuntime.swift +++ /dev/null @@ -1,186 +0,0 @@ -// -// ZipRuntime.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -final class ZipRuntime: Sendable -where Base: Sendable, Base.Element: Sendable { - typealias StateMachine = ZipStateMachine - - private let stateMachine: ManagedCriticalState - private let indexes = ManagedCriticalState(0) - - init(_ bases: [Base]) { - self.stateMachine = ManagedCriticalState(StateMachine(numberOfBases: bases.count)) - - self.stateMachine.withCriticalRegion { machine in - machine.taskIsStarted(task: Task { - await withTaskGroup(of: Void.self) { group in - for base in bases { - let index = self.indexes.withCriticalRegion { indexes -> Int in - defer { indexes += 1 } - return indexes - } - - group.addTask { - var baseIterator = base.makeAsyncIterator() - - do { - while true { - await withUnsafeContinuation { (continuation: UnsafeContinuation) in - let output = self.stateMachine.withCriticalRegion { machine in - machine.newLoopFromBase(index: index, suspendedBase: continuation) - } - - self.handle(newLoopFromBaseOutput: output) - } - - guard let element = try await baseIterator.next() else { - break - } - - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseHasProducedElement(index: index, element: element) - } - - self.handle(baseHasProducedElementOutput: output) - } - } catch { - let output = self.stateMachine.withCriticalRegion { machine in - machine.baseHasProducedFailure(error: error) - } - - self.handle(baseHasProducedFailureOutput: output) - } - - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.baseIsFinished() - } - - self.handle(baseIsFinishedOutput: output) - } - } - } - }) - } - } - - private func handle(newLoopFromBaseOutput: StateMachine.NewLoopFromBaseOutput) { - switch newLoopFromBaseOutput { - case .none: - break - - case .resumeBases(let suspendedBases): - suspendedBases.forEach { $0.resume() } - - case .terminate(let task, let suspendedBase, let suspendedDemand): - suspendedBase.resume() - suspendedDemand?.resume(returning: nil) - task?.cancel() - } - } - - private func handle(baseHasProducedElementOutput: StateMachine.BaseHasProducedElementOutput) { - switch baseHasProducedElementOutput { - case .none: - break - - case .resumeDemand(let suspendedDemand, let results): - suspendedDemand?.resume(returning: results) - - case .terminate(let task, let suspendedBases): - suspendedBases?.forEach { $0.resume() } - task?.cancel() - } - } - - private func handle(baseHasProducedFailureOutput: StateMachine.BaseHasProducedFailureOutput) { - switch baseHasProducedFailureOutput { - case .resumeDemandAndTerminate(let task, let suspendedDemand, let suspendedBases, let results): - suspendedDemand?.resume(returning: results) - suspendedBases.forEach { $0.resume() } - task?.cancel() - - case .terminate(let task, let suspendedBases): - suspendedBases?.forEach { $0.resume() } - task?.cancel() - } - } - - private func handle(baseIsFinishedOutput: StateMachine.BaseIsFinishedOutput) { - switch baseIsFinishedOutput { - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - func next() async rethrows -> [Base.Element]? { - try await withTaskCancellationHandler { - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.rootTaskIsCancelled() - } - - self.handle(rootTaskIsCancelledOutput: output) - } operation: { - let results = await withUnsafeContinuation { (continuation: UnsafeContinuation<[Int: Result]?, Never>) in - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.newDemandFromConsumer(suspendedDemand: continuation) - } - - self.handle(newDemandFromConsumerOutput: output) - } - - guard let results = results else { - return nil - } - - let output = self.stateMachine.withCriticalRegion { stateMachine in - stateMachine.demandIsFulfilled() - } - - self.handle(demandIsFulfilledOutput: output) - - return try results.sorted { $0.key < $1.key }.map { try $0.value._rethrowGet() } - } - } - - private func handle(rootTaskIsCancelledOutput: StateMachine.RootTaskIsCancelledOutput) { - switch rootTaskIsCancelledOutput { - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - private func handle(newDemandFromConsumerOutput: StateMachine.NewDemandFromConsumerOutput) { - switch newDemandFromConsumerOutput { - case .none: - break - - case .resumeBases(let suspendedBases): - suspendedBases.forEach { $0.resume() } - - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0?.resume(returning: nil) } - task?.cancel() - } - } - - private func handle(demandIsFulfilledOutput: StateMachine.DemandIsFulfilledOutput) { - switch demandIsFulfilledOutput { - case .none: - break - - case .terminate(let task, let suspendedBases, let suspendedDemands): - suspendedBases?.forEach { $0.resume() } - suspendedDemands?.forEach { $0.resume(returning: nil) } - task?.cancel() - } - } -} diff --git a/Sources/Combiners/Zip/ZipStateMachine.swift b/Sources/Combiners/Zip/ZipStateMachine.swift deleted file mode 100644 index 41e4461..0000000 --- a/Sources/Combiners/Zip/ZipStateMachine.swift +++ /dev/null @@ -1,335 +0,0 @@ -// -// ZipStateMachine.swift -// -// -// Created by Thibault Wittemberg on 24/09/2022. -// - -struct ZipStateMachine: Sendable -where Element: Sendable { - private enum State { - case initial - case started(task: Task) - case awaitingDemandFromConsumer( - task: Task?, - suspendedBases: [UnsafeContinuation] - ) - case awaitingBaseResults( - task: Task?, - results: [Int: Result]?, - suspendedBases: [UnsafeContinuation], - suspendedDemand: UnsafeContinuation<[Int: Result]?, Never>? - ) - case finished - } - - private var state: State = .initial - - let numberOfBases: Int - - init(numberOfBases: Int) { - self.numberOfBases = numberOfBases - } - - mutating func taskIsStarted(task: Task) { - switch self.state { - case .initial: - self.state = .started(task: task) - - default: - assertionFailure("Inconsistent state, the task cannot start while the state is other than initial") - } - } - - enum NewDemandFromConsumerOutput { - case none - case resumeBases(suspendedBases: [UnsafeContinuation]) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<[Int: Result]?, Never>?]? - ) - } - - mutating func newDemandFromConsumer( - suspendedDemand: UnsafeContinuation<[Int: Result]?, Never> - ) -> NewDemandFromConsumerOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: [suspendedDemand]) - - case .started(let task): - self.state = .awaitingBaseResults(task: task, results: nil, suspendedBases: [], suspendedDemand: suspendedDemand) - return .none - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .awaitingBaseResults(task: task, results: nil, suspendedBases: [], suspendedDemand: suspendedDemand) - return .resumeBases(suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, let suspendedBases, let oldSuspendedDemand): - assertionFailure("Inconsistent state, a demand is already suspended") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [oldSuspendedDemand, suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: [suspendedDemand]) - } - } - - enum NewLoopFromBaseOutput { - case none - case resumeBases(suspendedBases: [UnsafeContinuation]) - case terminate( - task: Task?, - suspendedBase: UnsafeContinuation, - suspendedDemand: UnsafeContinuation<[Int: Result]?, Never>? - ) - } - - mutating func newLoopFromBase(index: Int, suspendedBase: UnsafeContinuation) -> NewLoopFromBaseOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - - case .started(let task): - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: [suspendedBase]) - return .none - - case .awaitingDemandFromConsumer(let task, var suspendedBases): - assert( - suspendedBases.count < self.numberOfBases, - "There cannot be more than \(self.numberOfBases) suspended base at the same time" - ) - suspendedBases.append(suspendedBase) - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .awaitingBaseResults(let task, let results, var suspendedBases, let suspendedDemand): - assert( - suspendedBases.count < self.numberOfBases, - "There cannot be more than \(self.numberOfBases) suspended base at the same time" - ) - if results?[index] != nil { - suspendedBases.append(suspendedBase) - self.state = .awaitingBaseResults( - task: task, - results: results, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .none - } else { - self.state = .awaitingBaseResults( - task: task, - results: results, - suspendedBases: suspendedBases, - suspendedDemand: suspendedDemand - ) - return .resumeBases(suspendedBases: [suspendedBase]) - } - - case .finished: - return .terminate(task: nil, suspendedBase: suspendedBase, suspendedDemand: nil) - } - } - - enum BaseHasProducedElementOutput { - case none - case resumeDemand( - suspendedDemand: UnsafeContinuation<[Int: Result]?, Never>?, - results: [Int: Result] - ) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]? - ) - } - - mutating func baseHasProducedElement(index: Int, element: Element) -> BaseHasProducedElementOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, let results, let suspendedBases, let suspendedDemand): - assert(results?[index] == nil, "Inconsistent state, a base can only produce an element when the previous one has been consumed") - var mutableResults: [Int: Result] - if let results = results { - mutableResults = results - } else { - mutableResults = [:] - } - mutableResults[index] = .success(element) - if mutableResults.count == self.numberOfBases { - self.state = .awaitingBaseResults(task: task, results: mutableResults, suspendedBases: suspendedBases, suspendedDemand: nil) - return .resumeDemand(suspendedDemand: suspendedDemand, results: mutableResults) - } else { - self.state = .awaitingBaseResults(task: task, results: mutableResults, suspendedBases: suspendedBases, suspendedDemand: suspendedDemand) - return .none - } - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - enum BaseHasProducedFailureOutput { - case resumeDemandAndTerminate( - task: Task?, - suspendedDemand: UnsafeContinuation<[Int: Result]?, Never>?, - suspendedBases: [UnsafeContinuation], - results: [Int: Result] - ) - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]? - ) - } - - mutating func baseHasProducedFailure(error: any Error) -> BaseHasProducedFailureOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil) - - case .started(let task): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, a base can only produce an element when the consumer is awaiting for it") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases) - - case .awaitingBaseResults(let task, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .resumeDemandAndTerminate( - task: task, - suspendedDemand: suspendedDemand, - suspendedBases: suspendedBases, - results: [0: .failure(error)] - ) - - case .finished: - return .terminate(task: nil, suspendedBases: nil) - } - } - - enum DemandIsFulfilledOutput { - case none - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<[Int: Result]?, Never>]? - ) - } - - mutating func demandIsFulfilled() -> DemandIsFulfilledOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - assertionFailure("Inconsistent state, results are not yet available to be acknowledged") - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - assertionFailure("Inconsistent state, results are not yet available to be acknowledged") - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, let results, let suspendedBases, let suspendedDemand): - assert(suspendedDemand == nil, "Inconsistent state, there cannot be a suspended demand when ackowledging the demand") - assert(results?.count == self.numberOfBases, "Inconsistent state, all results are not yet available to be acknowledged") - self.state = .awaitingDemandFromConsumer(task: task, suspendedBases: suspendedBases) - return .none - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } - - enum RootTaskIsCancelledOutput { - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<[Int: Result]?, Never>?]? - ) - } - - mutating func rootTaskIsCancelled() -> RootTaskIsCancelledOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } - - enum BaseIsFinishedOutput { - case terminate( - task: Task?, - suspendedBases: [UnsafeContinuation]?, - suspendedDemands: [UnsafeContinuation<[Int: Result]?, Never>?]? - ) - } - - mutating func baseIsFinished() -> BaseIsFinishedOutput { - switch self.state { - case .initial: - assertionFailure("Inconsistent state, the task is not started") - self.state = .finished - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - - case .started(let task): - self.state = .finished - return .terminate(task: task, suspendedBases: nil, suspendedDemands: nil) - - case .awaitingDemandFromConsumer(let task, let suspendedBases): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: nil) - - case .awaitingBaseResults(let task, _, let suspendedBases, let suspendedDemand): - self.state = .finished - return .terminate(task: task, suspendedBases: suspendedBases, suspendedDemands: [suspendedDemand]) - - case .finished: - return .terminate(task: nil, suspendedBases: nil, suspendedDemands: nil) - } - } -} diff --git a/Sources/Creators/AsyncLazySequence.swift b/Sources/Creators/AsyncLazySequence.swift deleted file mode 100644 index b68d998..0000000 --- a/Sources/Creators/AsyncLazySequence.swift +++ /dev/null @@ -1,51 +0,0 @@ -// -// AsyncLazySequence.swift -// -// -// Created by Thibault Wittemberg on 01/01/2022. -// - -public extension Sequence { - /// Creates an AsyncSequence of the sequence elements. - /// - Returns: The AsyncSequence that outputs the elements from the sequence. - var async: AsyncLazySequence { - AsyncLazySequence(self) - } -} - -/// `AsyncLazySequence` is an AsyncSequence that outputs elements from a traditional Sequence. -/// If the parent task is cancelled while iterating then the iteration finishes. -/// -/// ``` -/// let fromSequence = AsyncLazySequence([1, 2, 3, 4, 5]) -/// -/// for await element in fromSequence { -/// print(element) // will print 1 2 3 4 5 -/// } -/// ``` -public struct AsyncLazySequence: AsyncSequence { - public typealias Element = Base.Element - public typealias AsyncIterator = Iterator - - private var base: Base - - public init(_ base: Base) { - self.base = base - } - - public func makeAsyncIterator() -> AsyncIterator { - Iterator(base: self.base.makeIterator()) - } - - public struct Iterator: AsyncIteratorProtocol { - var base: Base.Iterator - - public mutating func next() async -> Base.Element? { - guard !Task.isCancelled else { return nil } - return self.base.next() - } - } -} - -extension AsyncLazySequence: Sendable where Base: Sendable {} -extension AsyncLazySequence.Iterator: Sendable where Base.Iterator: Sendable {} diff --git a/Tests/Combiners/Merge/AsyncMergeSequenceTests.swift b/Tests/Combiners/Merge/AsyncMergeSequenceTests.swift deleted file mode 100644 index bebc5ef..0000000 --- a/Tests/Combiners/Merge/AsyncMergeSequenceTests.swift +++ /dev/null @@ -1,292 +0,0 @@ -// -// AsyncMergeSequenceTests.swift -// -// -// Created by Thibault Wittemberg on 01/01/2022. -// - -import AsyncExtensions -import XCTest - -private struct TimedAsyncSequence: AsyncSequence, AsyncIteratorProtocol { - typealias Element = Element - typealias AsyncIterator = TimedAsyncSequence - - private let intervalInMills: [UInt64] - private var iterator: Array.Iterator - private var index = 0 - private let indexOfError: Int? - - init(intervalInMills: [UInt64], sequence: [Element], indexOfError: Int? = nil) { - self.intervalInMills = intervalInMills - self.iterator = sequence.makeIterator() - self.indexOfError = indexOfError - } - - mutating func next() async throws -> Element? { - - if let indexOfError = self.indexOfError, self.index == indexOfError { - throw MockError(code: 1) - } - - if self.index < self.intervalInMills.count { - try await Task.sleep(nanoseconds: self.intervalInMills[index] * 1_000_000) - self.index += 1 - } - return self.iterator.next() - } - - func makeAsyncIterator() -> AsyncIterator { - self - } -} - -final class AsyncMergeSequenceTests: XCTestCase { - func testMerge_merges_sequences_according_to_the_timeline_using_asyncSequences() async throws { - // -- 0 ------------------------------- 1000 ----------------------------- 2000 - - // --------------- 500 --------------------------------- 1500 ------------------- - // -- a ----------- d ------------------ b --------------- e --------------- c -- - // - // output should be: a, d, b, e, c - let expectedElements = ["a", "d", "b", "e", "c"] - - let asyncSequence1 = TimedAsyncSequence(intervalInMills: [0, 1000, 1000], sequence: ["a", "b", "c"]) - let asyncSequence2 = TimedAsyncSequence(intervalInMills: [500, 1000], sequence: ["d", "e"]) - - let sut = merge(asyncSequence1, asyncSequence2) - - var receivedElements = [String]() - var iterator = sut.makeAsyncIterator() - while let element = try await iterator.next() { - try await Task.sleep(nanoseconds: 110_000_000) - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements, expectedElements) - - let pastEnd = try await iterator.next() - XCTAssertNil(pastEnd) - } - - func testMerge_merges_four_sequences() async { - let asyncSequence1 = [1, 2, 3, 4, 5] - let asyncSequence2 = [10, 20, 30, 40, 50] - let asyncSequence3 = [100, 200, 300, 400, 500] - let asyncSequence4 = [1000, 2000, 3000, 4000, 5000] - - let expectedElements = asyncSequence1 + asyncSequence2 + asyncSequence3 + asyncSequence4 - - - let sut = merge(asyncSequence1.async, asyncSequence2.async, asyncSequence3.async, asyncSequence4.async) - - var receivedElements = [Int]() - var iterator = sut.makeAsyncIterator() - while let element = await iterator.next() { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.sorted(), expectedElements) - - let pastEnd = await iterator.next() - XCTAssertNil(pastEnd) - } - - func testMerge_merges_sequences_according_to_the_timeline_using_streams() { - let canSend2Expectation = expectation(description: "2 can be sent") - let canSend3Expectation = expectation(description: "3 can be sent") - let canSend4Expectation = expectation(description: "4 can be sent") - let canSend5Expectation = expectation(description: "5 can be sent") - let canSend6Expectation = expectation(description: "6 can be sent") - let canSendFinishExpectation = expectation(description: "finish can be sent") - - let mergedSequenceIsFinisedExpectation = expectation(description: "The merged sequence is finished") - - let stream1 = AsyncCurrentValueSubject(1) - let stream2 = AsyncPassthroughSubject() - let stream3 = AsyncPassthroughSubject() - - let sut = merge(stream1, stream2, stream3) - - Task { - var receivedElements = [Int]() - - for await element in sut { - receivedElements.append(element) - if element == 1 { - canSend2Expectation.fulfill() - } - if element == 2 { - canSend3Expectation.fulfill() - } - if element == 3 { - canSend4Expectation.fulfill() - } - if element == 4 { - canSend5Expectation.fulfill() - } - if element == 5 { - canSend6Expectation.fulfill() - } - - if element == 6 { - canSendFinishExpectation.fulfill() - } - } - XCTAssertEqual(receivedElements, [1, 2, 3, 4, 5, 6]) - mergedSequenceIsFinisedExpectation.fulfill() - } - - wait(for: [canSend2Expectation], timeout: 1) - - stream2.send(2) - wait(for: [canSend3Expectation], timeout: 1) - - stream3.send(3) - wait(for: [canSend4Expectation], timeout: 1) - - stream3.send(4) - wait(for: [canSend5Expectation], timeout: 1) - - stream2.send(5) - wait(for: [canSend6Expectation], timeout: 1) - - stream1.send(6) - - wait(for: [canSendFinishExpectation], timeout: 1) - - stream1.send(Termination.finished) - stream2.send(Termination.finished) - stream3.send(Termination.finished) - - wait(for: [mergedSequenceIsFinisedExpectation], timeout: 1) - } - - func testMerge_returns_empty_sequence_when_all_sequences_are_empty() async { - var receivedResult = [Int]() - - let asyncSequence1 = AsyncEmptySequence() - let asyncSequence2 = AsyncEmptySequence() - let asyncSequence3 = AsyncEmptySequence() - - let sut = merge(asyncSequence1, asyncSequence2, asyncSequence3) - - for await element in sut { - receivedResult.append(element) - } - - XCTAssertTrue(receivedResult.isEmpty) - } - - func testMerge_returns_original_sequence_when_one_sequence_is_empty() async { - let expectedResult = [1, 2, 3] - var receivedResult = [Int]() - - let asyncSequence1 = expectedResult.async - let asyncSequence2 = AsyncEmptySequence() - - let sut = merge(asyncSequence1, asyncSequence2) - - for await element in sut { - receivedResult.append(element) - } - - XCTAssertEqual(receivedResult, expectedResult) - } - - func testMerge_propagates_error() { - let canSend2Expectation = expectation(description: "2 can be sent") - let canSend3Expectation = expectation(description: "3 can be sent") - let mergedSequenceIsFinishedExpectation = expectation(description: "The merged sequence is finished") - - let stream1 = AsyncThrowingCurrentValueSubject(1) - let stream2 = AsyncPassthroughSubject() - - let sut = merge(stream1, stream2) - - Task { - var receivedElements = [Int]() - do { - for try await element in sut { - receivedElements.append(element) - if element == 1 { - canSend2Expectation.fulfill() - } - if element == 2 { - canSend3Expectation.fulfill() - } - } - } catch { - XCTAssertEqual(receivedElements, [1, 2]) - mergedSequenceIsFinishedExpectation.fulfill() - } - } - - wait(for: [canSend2Expectation], timeout: 1) - - stream2.send(2) - wait(for: [canSend3Expectation], timeout: 1) - - stream1.send(.failure(MockError(code: 1))) - - wait(for: [mergedSequenceIsFinishedExpectation], timeout: 1) - } - - func testMerge_finishes_when_task_is_cancelled() { - let canCancelExpectation = expectation(description: "The first element has been emitted") - let hasCancelExceptation = expectation(description: "The task has been cancelled") - let taskHasFinishedExpectation = expectation(description: "The task has finished") - - let asyncSequence1 = TimedAsyncSequence(intervalInMills: [100, 100, 100], sequence: [1, 2, 3]) - let asyncSequence2 = TimedAsyncSequence(intervalInMills: [50, 100, 100, 100], sequence: [6, 7, 8, 9]) - let asyncSequence3 = TimedAsyncSequence(intervalInMills: [1, 399], sequence: [10, 11]) - - let sut = merge(asyncSequence1, asyncSequence2, asyncSequence3) - - let task = Task { - var firstElement: Int? - for try await element in sut { - firstElement = element - canCancelExpectation.fulfill() - wait(for: [hasCancelExceptation], timeout: 5) - } - XCTAssertEqual(firstElement, 10) - taskHasFinishedExpectation.fulfill() - } - - wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task - - task.cancel() - - hasCancelExceptation.fulfill() // we can release the lock in the for loop - - wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished - } - - func testMerge_finishes_when_task_is_cancelled_while_waiting_for_an_element() { - let firstElementHasBeenReceivedExpectation = expectation(description: "The first elemenet has been received") - let canIterateExpectation = expectation(description: "We can iterate") - let hasCancelExceptation = expectation(description: "The iteration is cancelled") - - let asyncSequence1 = AsyncCurrentValueSubject(1) - let asyncSequence2 = AsyncPassthroughSubject() - - let sut = merge(asyncSequence1, asyncSequence2) - - let task = Task { - var iterator = sut.makeAsyncIterator() - canIterateExpectation.fulfill() - while let _ = await iterator.next() { - firstElementHasBeenReceivedExpectation.fulfill() - } - hasCancelExceptation.fulfill() - } - - wait(for: [canIterateExpectation], timeout: 1) - - wait(for: [firstElementHasBeenReceivedExpectation], timeout: 1) - - task.cancel() - - wait(for: [hasCancelExceptation], timeout: 1) - } -} diff --git a/Tests/Combiners/Zip/AsyncZipSequenceTests.swift b/Tests/Combiners/Zip/AsyncZipSequenceTests.swift deleted file mode 100644 index 701bd48..0000000 --- a/Tests/Combiners/Zip/AsyncZipSequenceTests.swift +++ /dev/null @@ -1,415 +0,0 @@ -// -// AsyncZipSequenceTests.swift -// -// -// Created by Thibault Wittemberg on 14/01/2022. -// - -@testable import AsyncExtensions -import XCTest - -private struct TimedAsyncSequence: AsyncSequence, AsyncIteratorProtocol { - typealias Element = Element - typealias AsyncIterator = TimedAsyncSequence - - private let intervalInMills: UInt64 - private var iterator: Array.Iterator - - init(intervalInMills: UInt64, sequence: [Element]) { - self.intervalInMills = intervalInMills - self.iterator = sequence.makeIterator() - } - - mutating func next() async -> Element? { - try? await Task.sleep(nanoseconds: self.intervalInMills * 1_000_000) - return self.iterator.next() - } - - func makeAsyncIterator() -> AsyncIterator { - self - } -} - -final class AsyncZipSequenceTests: XCTestCase { - func testZip2_respects_chronology_and_ends_when_first_sequence_ends() async { - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4]) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 10, sequence: ["6", "7", "8", "9", "10"]) - - let sut = zip(asyncSeq1, asyncSeq2) - - var receivedElements = [(Int, String)]() - - for await element in sut { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.map { $0.0 }, [1, 2, 3, 4]) - XCTAssertEqual(receivedElements.map { $0.1 }, ["6", "7", "8", "9"]) - } - - func testZip2_respects_chronology_and_ends_when_second_sequence_ends() async { - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4, 5]) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 10, sequence: ["6", "7", "8"]) - - let sut = zip(asyncSeq1, asyncSeq2) - - var receivedElements = [(Int, String)]() - - for await element in sut { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.map { $0.0 }, [1, 2, 3]) - XCTAssertEqual(receivedElements.map { $0.1 }, ["6", "7", "8"]) - } - - func testZip2_respects_returns_nil_pastEnd() async { - let asyncSeq1 = AsyncLazySequence([1, 2, 3]) - let asyncSeq2 = AsyncLazySequence(["1", "2", "3"]) - - let sut = zip(asyncSeq1, asyncSeq2) - - var receivedElements = [(Int, String)]() - let iterator = sut.makeAsyncIterator() - - while let element = await iterator.next() { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.map { $0.0 }, [1, 2, 3]) - XCTAssertEqual(receivedElements.map { $0.1 }, ["1", "2", "3"]) - - let pastEnd = await iterator.next() - XCTAssertNil(pastEnd) - } - - func testZip2_propagates_error_when_first_fails() async throws { - let mockError = MockError(code: Int.random(in: 0...100)) - - let asyncSeq1 = AsyncFailSequence( mockError) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4]) - - let sut = zip(asyncSeq1, asyncSeq2) - let iterator = sut.makeAsyncIterator() - - do { - while let element = try await iterator.next() { - print(element) - } - XCTFail("The zipped sequence should fail") - } catch { - XCTAssertEqual(error as? MockError, mockError) - } - - let pastFail = try await iterator.next() - XCTAssertNil(pastFail) - } - - func testZip2_propagates_error_when_second_fails() async throws { - let mockError = MockError(code: Int.random(in: 0...100)) - - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4]) - let asyncSeq2 = AsyncFailSequence( mockError) - - let sut = zip(asyncSeq1, asyncSeq2) - let iterator = sut.makeAsyncIterator() - - do { - while let _ = try await iterator.next() {} - XCTFail("The zipped sequence should fail") - } catch { - XCTAssertEqual(error as? MockError, mockError) - } - - let pastFail = try await iterator.next() - XCTAssertNil(pastFail) - } - - func testZip2_finishes_when_task_is_cancelled() { - let canCancelExpectation = expectation(description: "The first element has been emitted") - let hasCancelExceptation = expectation(description: "The task has been cancelled") - let taskHasFinishedExpectation = expectation(description: "The task has finished") - - let asyncSeq1 = AsyncLazySequence([1, 2, 3]) - let asyncSeq2 = AsyncLazySequence(["1", "2", "3"]) - - let sut = zip(asyncSeq1, asyncSeq2) - - let task = Task { - var firstElement: (Int, String)? - for try await element in sut { - firstElement = element - canCancelExpectation.fulfill() - wait(for: [hasCancelExceptation], timeout: 5) - } - XCTAssertEqual(firstElement!.0, 1) - XCTAssertEqual(firstElement!.1, "1") - taskHasFinishedExpectation.fulfill() - } - - wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task - - task.cancel() - - hasCancelExceptation.fulfill() // we can release the lock in the for loop - - wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished - } -} - -extension AsyncZipSequenceTests { - func testZip3_respects_chronology_and_ends_when_first_sequence_ends() async throws { - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3]) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 10, sequence: ["6", "7", "8", "9", "10"]) - let asyncSeq3 = TimedAsyncSequence(intervalInMills: 30, sequence: [true, false, true, false, true]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - - var receivedElements = [(Int, String, Bool)]() - - for try await element in sut { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.map { $0.0 }, [1, 2, 3]) - XCTAssertEqual(receivedElements.map { $0.1 }, ["6", "7", "8"]) - XCTAssertEqual(receivedElements.map { $0.2 }, [true, false, true]) - } - - func testZip3_respects_chronology_and_ends_when_second_sequence_ends() async throws { - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4, 5]) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 10, sequence: ["6", "7", "8"]) - let asyncSeq3 = TimedAsyncSequence(intervalInMills: 30, sequence: [true, false, true, false, true]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - - var receivedElements = [(Int, String, Bool)]() - - for try await element in sut { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.map { $0.0 }, [1, 2, 3]) - XCTAssertEqual(receivedElements.map { $0.1 }, ["6", "7", "8"]) - XCTAssertEqual(receivedElements.map { $0.2 }, [true, false, true]) - } - - func testZip3_respects_chronology_and_ends_when_third_sequence_ends() async throws { - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4, 5]) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 10, sequence: ["6", "7", "8", "9", "10"]) - let asyncSeq3 = TimedAsyncSequence(intervalInMills: 30, sequence: [true, false, true]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - - var receivedElements = [(Int, String, Bool)]() - - for try await element in sut { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.map { $0.0 }, [1, 2, 3]) - XCTAssertEqual(receivedElements.map { $0.1 }, ["6", "7", "8"]) - XCTAssertEqual(receivedElements.map { $0.2 }, [true, false, true]) - } - - func testZip3_respects_returns_nil_pastEnd() async { - let asyncSeq1 = AsyncLazySequence([1, 2, 3]) - let asyncSeq2 = AsyncLazySequence(["1", "2", "3"]) - let asyncSeq3 = AsyncLazySequence([true, false, true]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - - var receivedElements = [(Int, String, Bool)]() - let iterator = sut.makeAsyncIterator() - - while let element = await iterator.next() { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.map { $0.0 }, [1, 2, 3]) - XCTAssertEqual(receivedElements.map { $0.1 }, ["1", "2", "3"]) - XCTAssertEqual(receivedElements.map { $0.2 }, [true, false, true]) - - let pastEnd = await iterator.next() - XCTAssertNil(pastEnd) - } - - func testZip3_propagates_error_when_first_fails() async throws { - let mockError = MockError(code: Int.random(in: 0...100)) - - let asyncSeq1 = AsyncFailSequence( mockError) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4]) - let asyncSeq3 = TimedAsyncSequence(intervalInMills: 5, sequence: ["1", "2", "3", "4"]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - let iterator = sut.makeAsyncIterator() - - do { - while let _ = try await iterator.next() {} - XCTFail("The zipped sequence should fail") - } catch { - XCTAssertEqual(error as? MockError, mockError) - } - - let pastFail = try await iterator.next() - XCTAssertNil(pastFail) - } - - func testZip3_propagates_error_when_second_fails() async throws { - let mockError = MockError(code: Int.random(in: 0...100)) - - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4]) - let asyncSeq2 = AsyncFailSequence( mockError) - let asyncSeq3 = TimedAsyncSequence(intervalInMills: 5, sequence: ["1", "2", "3", "4"]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - let iterator = sut.makeAsyncIterator() - - do { - while let _ = try await iterator.next() {} - XCTFail("The zipped sequence should fail") - } catch { - XCTAssertEqual(error as? MockError, mockError) - } - - let pastFail = try await iterator.next() - XCTAssertNil(pastFail) - } - - func testZip3_propagates_error_when_third_fails() async throws { - let mockError = MockError(code: Int.random(in: 0...100)) - - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4]) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 5, sequence: ["1", "2", "3", "4"]) - let asyncSeq3 = AsyncFailSequence( mockError) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - let iterator = sut.makeAsyncIterator() - - do { - while let _ = try await iterator.next() {} - XCTFail("The zipped sequence should fail") - } catch { - XCTAssertEqual(error as? MockError, mockError) - } - - let pastFail = try await iterator.next() - XCTAssertNil(pastFail) - } - - func testZip3_finishes_when_task_is_cancelled() { - let canCancelExpectation = expectation(description: "The first element has been emitted") - let hasCancelExceptation = expectation(description: "The task has been cancelled") - let taskHasFinishedExpectation = expectation(description: "The task has finished") - - let asyncSeq1 = AsyncLazySequence([1, 2, 3]) - let asyncSeq2 = AsyncLazySequence(["1", "2", "3"]) - let asyncSeq3 = AsyncLazySequence([true, false, true]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3) - - let task = Task { - var firstElement: (Int, String, Bool)? - for try await element in sut { - firstElement = element - canCancelExpectation.fulfill() - wait(for: [hasCancelExceptation], timeout: 5) - } - XCTAssertEqual(firstElement!.0, 1) // the AsyncSequence is cancelled having only emitted the first element - XCTAssertEqual(firstElement!.1, "1") - XCTAssertEqual(firstElement!.2, true) - taskHasFinishedExpectation.fulfill() - } - - wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task - - task.cancel() - - hasCancelExceptation.fulfill() // we can release the lock in the for loop - - wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished - } -} - -extension AsyncZipSequenceTests { - func testZip_respects_chronology_and_ends_when_any_sequence_ends() async { - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 50, sequence: [1, 2, 3, 4, 5]) - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 10, sequence: [1, 2, 3]) - let asyncSeq3 = TimedAsyncSequence(intervalInMills: 30, sequence: [1, 2, 3, 4, 5]) - let asyncSeq4 = TimedAsyncSequence(intervalInMills: 5, sequence: [1, 2, 3]) - let asyncSeq5 = TimedAsyncSequence(intervalInMills: 20, sequence: [1, 2, 3, 4, 5]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3, asyncSeq4, asyncSeq5) - - var receivedElements = [[Int]]() - - let iterator = sut.makeAsyncIterator() - while let element = await iterator.next() { - receivedElements.append(element) - } - - XCTAssertEqual(receivedElements.count, 3) - XCTAssertEqual(receivedElements[0], [1, 1, 1, 1, 1]) - XCTAssertEqual(receivedElements[1], [2, 2, 2, 2, 2]) - XCTAssertEqual(receivedElements[2], [3, 3, 3, 3, 3]) - - let pastEnd = await iterator.next() - XCTAssertNil(pastEnd) - } - - func testZip_propagates_error() async throws { - let mockError = MockError(code: Int.random(in: 0...100)) - - let asyncSeq1 = TimedAsyncSequence(intervalInMills: 5, sequence: [1, 2, 3, 4]).eraseToAnyAsyncSequence() - let asyncSeq2 = TimedAsyncSequence(intervalInMills: 10, sequence: [1, 2, 3, 4]).eraseToAnyAsyncSequence() - let asyncSeq3 = AsyncFailSequence( mockError).eraseToAnyAsyncSequence() - let asyncSeq4 = TimedAsyncSequence(intervalInMills: 20, sequence: [1, 2, 3, 4]).eraseToAnyAsyncSequence() - let asyncSeq5 = TimedAsyncSequence(intervalInMills: 15, sequence: [1, 2, 3, 4]).eraseToAnyAsyncSequence() - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3, asyncSeq4, asyncSeq5) - let iterator = sut.makeAsyncIterator() - - do { - while let _ = try await iterator.next() {} - XCTFail("The zipped sequence should fail") - } catch { - XCTAssertEqual(error as? MockError, mockError) - } - - let pastFail = try await iterator.next() - XCTAssertNil(pastFail) - } - - func testZip_finishes_when_task_is_cancelled() { - let canCancelExpectation = expectation(description: "The first element has been emitted") - let hasCancelExceptation = expectation(description: "The task has been cancelled") - let taskHasFinishedExpectation = expectation(description: "The task has finished") - - let asyncSeq1 = AsyncLazySequence([1, 2, 3]) - let asyncSeq2 = AsyncLazySequence([1, 2, 3]) - let asyncSeq3 = AsyncLazySequence([1, 2, 3]) - let asyncSeq4 = AsyncLazySequence([1, 2, 3]) - let asyncSeq5 = AsyncLazySequence([1, 2, 3]) - - let sut = zip(asyncSeq1, asyncSeq2, asyncSeq3, asyncSeq4, asyncSeq5) - - let task = Task { - var firstElement: [Int]? - for await element in sut { - firstElement = element - canCancelExpectation.fulfill() - wait(for: [hasCancelExceptation], timeout: 5) - } - XCTAssertEqual(firstElement!, [1, 1, 1, 1, 1]) - taskHasFinishedExpectation.fulfill() - } - - wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task - - task.cancel() - - hasCancelExceptation.fulfill() // we can release the lock in the for loop - - wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished - } -} diff --git a/Tests/Creators/AsyncLazySequenceTests.swift b/Tests/Creators/AsyncLazySequenceTests.swift deleted file mode 100644 index f164fa9..0000000 --- a/Tests/Creators/AsyncLazySequenceTests.swift +++ /dev/null @@ -1,50 +0,0 @@ -// -// AsyncLazySequenceTests.swift -// -// -// Created by Thibault Wittemberg on 02/01/2022. -// - -import AsyncExtensions -import XCTest - -final class AsyncLazySequenceTests: XCTestCase { - func test_AsyncLazySequence_returns_original_sequence() async { - var receivedResult = [Int]() - - let sequence = [1, 2, 3, 4, 5] - - let sut = AsyncLazySequence(sequence) - - for await element in sut { - receivedResult.append(element) - } - - XCTAssertEqual(receivedResult, sequence) - } - - func test_AsyncLazySequence_returns_an_asyncSequence_that_finishes_when_task_is_cancelled() { - let canCancelExpectation = expectation(description: "The first element has been emitted") - let hasCancelExceptation = expectation(description: "The task has been cancelled") - - let sequence = (0...1_000_000) - - let sut = AsyncLazySequence(sequence) - - let task = Task { - var firstElement: Int? - for await element in sut { - firstElement = element - canCancelExpectation.fulfill() - wait(for: [hasCancelExceptation], timeout: 5) - } - XCTAssertEqual(firstElement!, 0) // the AsyncSequence is cancelled having only emitted the first element - } - - wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task - - task.cancel() - - hasCancelExceptation.fulfill() // we can release the lock in the for loop - } -} diff --git a/Tests/Operators/AsyncPrependSequenceTests.swift b/Tests/Operators/AsyncPrependSequenceTests.swift index e520e06..adfd84c 100644 --- a/Tests/Operators/AsyncPrependSequenceTests.swift +++ b/Tests/Operators/AsyncPrependSequenceTests.swift @@ -5,6 +5,7 @@ // Created by Thibault Wittemberg on 01/01/2022. // +import AsyncAlgorithms import AsyncExtensions import XCTest diff --git a/Tests/Operators/AsyncSequence+AssignTests.swift b/Tests/Operators/AsyncSequence+AssignTests.swift index d738d14..03688f2 100644 --- a/Tests/Operators/AsyncSequence+AssignTests.swift +++ b/Tests/Operators/AsyncSequence+AssignTests.swift @@ -5,6 +5,7 @@ // Created by Thibault Wittemberg on 02/02/2022. // +import AsyncAlgorithms import AsyncExtensions import XCTest @@ -21,7 +22,7 @@ private class Root { final class AsyncSequence_AssignTests: XCTestCase { func testAssign_sets_elements_on_the_root() async throws { let root = Root() - let sut = AsyncLazySequence(["1", "2", "3"]) + let sut = ["1", "2", "3"].async try await sut.assign(to: \.property, on: root) XCTAssertEqual(root.successiveValues, ["1", "2", "3"]) } diff --git a/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift b/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift index a968baf..f8dccb2 100644 --- a/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift +++ b/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift @@ -5,6 +5,7 @@ // Created by Thibault Wittemberg on 10/01/2022. // +import AsyncAlgorithms @testable import AsyncExtensions import XCTest @@ -166,10 +167,10 @@ final class AsyncSequence_FlatMapLatestTests: XCTestCase { func testFlatMapLatest_propagates_errors() async { let expectedError = MockError(code: Int.random(in: 0...100)) - let sut = AsyncLazySequence([1, 2]) + let sut = [1, 2].async .flatMapLatest { element -> AnyAsyncSequence in if element == 1 { - return AsyncLazySequence([1]).eraseToAnyAsyncSequence() + return [1].async.eraseToAnyAsyncSequence() } return AsyncFailSequence(expectedError).eraseToAnyAsyncSequence() diff --git a/Tests/Operators/AsyncSwitchToLatestSequenceTests.swift b/Tests/Operators/AsyncSwitchToLatestSequenceTests.swift index bda619d..1be42eb 100644 --- a/Tests/Operators/AsyncSwitchToLatestSequenceTests.swift +++ b/Tests/Operators/AsyncSwitchToLatestSequenceTests.swift @@ -5,6 +5,7 @@ // Created by Thibault Wittemberg on 04/01/2022. // +import AsyncAlgorithms @testable import AsyncExtensions import XCTest @@ -102,10 +103,10 @@ final class AsyncSwitchToLatestSequenceTests: XCTestCase { func testSwitchToLatest_propagates_errors_when_base_sequence_fails() async { let sequences = [ - AsyncLazySequence([1, 2, 3]).eraseToAnyAsyncSequence(), - AsyncLazySequence([4, 5, 6]).eraseToAnyAsyncSequence(), - AsyncLazySequence([7, 8, 9]).eraseToAnyAsyncSequence(), // should fail here - AsyncLazySequence([10, 11, 12]).eraseToAnyAsyncSequence(), + [1, 2, 3].async.eraseToAnyAsyncSequence(), + [4, 5, 6].async.eraseToAnyAsyncSequence(), + [7, 8, 9].async.eraseToAnyAsyncSequence(), // should fail here + [10, 11, 12].async.eraseToAnyAsyncSequence(), ] let sourceSequence = LongAsyncSequence(elements: sequences, interval: .milliseconds(100), failAt: 2) From c9e1a62c5b7ca44b562116d9fd843d548b741792 Mon Sep 17 00:00:00 2001 From: harry lachenmayer Date: Wed, 6 Dec 2023 17:45:02 +0000 Subject: [PATCH 2/3] docs: remove sentence about overlaps in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f04b3c6..52a13b7 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ **AsyncExtensions** provides a collection of operators that intends to ease the creation and combination of `AsyncSequences`. -**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms). For now there is an overlap between both libraries, but when **swift-async-algorithms** becomes stable the overlapping operators while be deprecated in **AsyncExtensions**. Nevertheless **AsyncExtensions** will continue to provide the operators that the community needs and are not provided by Apple. +**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms), which provides operators that the community needs and are not provided by Apple. ## Adding AsyncExtensions as a Dependency From ce1bf5dfeef6cd767e4e5ca343df5501cb461b2d Mon Sep 17 00:00:00 2001 From: harry lachenmayer Date: Wed, 6 Dec 2023 17:51:36 +0000 Subject: [PATCH 3/3] re-add variadic merge --- README.md | 1 + .../Combiners/Merge/AsyncMergeSequence.swift | 57 ++++ .../Combiners/Merge/MergeStateMachine.swift | 249 ++++++++++++++++++ 3 files changed, 307 insertions(+) create mode 100644 Sources/Combiners/Merge/AsyncMergeSequence.swift create mode 100644 Sources/Combiners/Merge/MergeStateMachine.swift diff --git a/README.md b/README.md index 52a13b7..71a93d0 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ AsyncStream) * [AsyncThrowingReplaySubject](./Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift): Throwing subject with a shared output. Maintain an replays a buffered amount of values ### Combiners +* [`merge(_:)`](./Sources/Combiners/Merge/AsyncMergeSequence.swift): Merges any `AsyncSequence` into an AsyncSequence of elements * [`withLatest(_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift): Combines elements from self with the last known element from an other `AsyncSequence` * [`withLatest(_:_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift): Combines elements from self with the last known elements from two other async sequences diff --git a/Sources/Combiners/Merge/AsyncMergeSequence.swift b/Sources/Combiners/Merge/AsyncMergeSequence.swift new file mode 100644 index 0000000..ad85bf1 --- /dev/null +++ b/Sources/Combiners/Merge/AsyncMergeSequence.swift @@ -0,0 +1,57 @@ +// +// AsyncMergeSequence.swift +// +// +// Created by Thibault Wittemberg on 31/03/2022. +// + +/// Creates an asynchronous sequence of elements from many underlying asynchronous sequences +public func merge( + _ bases: Base... +) -> AsyncMergeSequence { + AsyncMergeSequence(bases) +} + +/// An asynchronous sequence of elements from many underlying asynchronous sequences +/// +/// In a `AsyncMergeSequence` instance, the *i*th element is the *i*th element +/// resolved in sequential order out of the two underlying asynchronous sequences. +/// Use the `merge(...)` function to create an `AsyncMergeSequence`. +public struct AsyncMergeSequence: AsyncSequence { + public typealias Element = Base.Element + public typealias AsyncIterator = Iterator + + let bases: [Base] + + public init(_ bases: [Base]) { + self.bases = bases + } + + public func makeAsyncIterator() -> Iterator { + Iterator( + bases: self.bases + ) + } + + public struct Iterator: AsyncIteratorProtocol { + let mergeStateMachine: MergeStateMachine + + init(bases: [Base]) { + self.mergeStateMachine = MergeStateMachine( + bases + ) + } + + public mutating func next() async rethrows -> Element? { + let mergedElement = await self.mergeStateMachine.next() + switch mergedElement { + case .element(let result): + return try result._rethrowGet() + case .termination: + return nil + } + } + } +} + +extension AsyncMergeSequence: Sendable where Base: Sendable {} diff --git a/Sources/Combiners/Merge/MergeStateMachine.swift b/Sources/Combiners/Merge/MergeStateMachine.swift new file mode 100644 index 0000000..3cbec30 --- /dev/null +++ b/Sources/Combiners/Merge/MergeStateMachine.swift @@ -0,0 +1,249 @@ +// +// MergeStateMachine.swift +// +// +// Created by Thibault Wittemberg on 08/09/2022. +// + +import DequeModule + +struct MergeStateMachine: Sendable { + enum BufferState { + case idle + case queued(Deque>) + case awaiting(UnsafeContinuation, Never>) + case closed + } + + struct State { + var buffer: BufferState + var basesToTerminate: Int + } + + struct OnNextDecision { + let continuation: UnsafeContinuation, Never> + let regulatedElement: RegulatedElement + } + + let requestNextRegulatedElements: @Sendable () -> Void + let state: ManagedCriticalState + let task: Task + + init( + _ base1: Base1, + _ base2: Base2 + ) where Base1.Element == Element, Base2.Element == Element { + self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: 2)) + + let regulator1 = Regulator(base1, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) + let regulator2 = Regulator(base2, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) + + self.requestNextRegulatedElements = { + regulator1.requestNextRegulatedElement() + regulator2.requestNextRegulatedElement() + } + + self.task = Task { + await withTaskGroup(of: Void.self) { group in + group.addTask { + await regulator1.iterate() + } + + group.addTask { + await regulator2.iterate() + } + } + } + } + + init( + _ base1: Base1, + _ base2: Base2, + _ base3: Base3 + ) where Base1.Element == Element, Base2.Element == Element, Base3.Element == Base1.Element { + self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: 3)) + + let regulator1 = Regulator(base1, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) + let regulator2 = Regulator(base2, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) + let regulator3 = Regulator(base3, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) + + self.requestNextRegulatedElements = { + regulator1.requestNextRegulatedElement() + regulator2.requestNextRegulatedElement() + regulator3.requestNextRegulatedElement() + } + + self.task = Task { + await withTaskGroup(of: Void.self) { group in + group.addTask { + await regulator1.iterate() + } + + group.addTask { + await regulator2.iterate() + } + + group.addTask { + await regulator3.iterate() + } + } + } + } + + init( + _ bases: [Base] + ) where Base.Element == Element { + self.state = ManagedCriticalState(State(buffer: .idle, basesToTerminate: bases.count)) + + var regulators = [Regulator]() + + for base in bases { + let regulator = Regulator(base, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }) + regulators.append(regulator) + } + + let immutableRegulators = regulators + self.requestNextRegulatedElements = { + for regulator in immutableRegulators { + regulator.requestNextRegulatedElement() + } + } + + self.task = Task { + await withTaskGroup(of: Void.self) { group in + for regulators in immutableRegulators { + group.addTask { + await regulators.iterate() + } + } + } + } + } + + @Sendable + static func onNextRegulatedElement(_ element: RegulatedElement, state: ManagedCriticalState) { + let decision = state.withCriticalRegion { state -> OnNextDecision? in + switch (state.buffer, element) { + case (.idle, .element): + state.buffer = .queued([element]) + return nil + case (.queued(var elements), .element): + elements.append(element) + state.buffer = .queued(elements) + return nil + case (.awaiting(let continuation), .element(.success)): + state.buffer = .idle + return OnNextDecision(continuation: continuation, regulatedElement: element) + case (.awaiting(let continuation), .element(.failure)): + state.buffer = .closed + return OnNextDecision(continuation: continuation, regulatedElement: element) + + case (.idle, .termination): + state.basesToTerminate -= 1 + if state.basesToTerminate == 0 { + state.buffer = .closed + } else { + state.buffer = .idle + } + return nil + + case (.queued(var elements), .termination): + state.basesToTerminate -= 1 + if state.basesToTerminate == 0 { + elements.append(.termination) + state.buffer = .queued(elements) + } + return nil + + case (.awaiting(let continuation), .termination): + state.basesToTerminate -= 1 + if state.basesToTerminate == 0 { + state.buffer = .closed + return OnNextDecision(continuation: continuation, regulatedElement: .termination) + } else { + state.buffer = .awaiting(continuation) + return nil + } + + case (.closed, _): + return nil + } + } + + if let decision = decision { + decision.continuation.resume(returning: decision.regulatedElement) + } + } + + @Sendable + func unsuspendAndClearOnCancel() { + let continuation = self.state.withCriticalRegion { state -> UnsafeContinuation, Never>? in + switch state.buffer { + case .awaiting(let continuation): + state.basesToTerminate = 0 + state.buffer = .closed + return continuation + default: + state.basesToTerminate = 0 + state.buffer = .closed + return nil + } + } + + continuation?.resume(returning: .termination) + self.task.cancel() + } + + func next() async -> RegulatedElement { + await withTaskCancellationHandler { + self.unsuspendAndClearOnCancel() + } operation: { + self.requestNextRegulatedElements() + + let regulatedElement = await withUnsafeContinuation { (continuation: UnsafeContinuation, Never>) in + let decision = self.state.withCriticalRegion { state -> OnNextDecision? in + switch state.buffer { + case .queued(var elements): + guard let regulatedElement = elements.popFirst() else { + assertionFailure("The buffer cannot by empty, it should be idle in this case") + return OnNextDecision(continuation: continuation, regulatedElement: .termination) + } + switch regulatedElement { + case .termination: + state.buffer = .closed + return OnNextDecision(continuation: continuation, regulatedElement: .termination) + case .element(.success): + if elements.isEmpty { + state.buffer = .idle + } else { + state.buffer = .queued(elements) + } + return OnNextDecision(continuation: continuation, regulatedElement: regulatedElement) + case .element(.failure): + state.buffer = .closed + return OnNextDecision(continuation: continuation, regulatedElement: regulatedElement) + } + case .idle: + state.buffer = .awaiting(continuation) + return nil + case .awaiting: + assertionFailure("The next function cannot be called concurrently") + return OnNextDecision(continuation: continuation, regulatedElement: .termination) + case .closed: + return OnNextDecision(continuation: continuation, regulatedElement: .termination) + } + } + + if let decision = decision { + decision.continuation.resume(returning: decision.regulatedElement) + } + } + + if case .termination = regulatedElement, case .element(.failure) = regulatedElement { + self.task.cancel() + } + + return regulatedElement + } + } +}