From db1393094244804593649367f0ea0a7d7b48f05c Mon Sep 17 00:00:00 2001 From: Rick Clephas Date: Tue, 4 Mar 2025 23:06:29 +0100 Subject: [PATCH 1/4] Remove single value buffer from AsyncSequence implementation --- KMPNativeCoroutinesAsync/AsyncSequence.swift | 16 ++++++++++------ sample/Async/AsyncSequenceIntegrationTests.swift | 3 +-- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/KMPNativeCoroutinesAsync/AsyncSequence.swift b/KMPNativeCoroutinesAsync/AsyncSequence.swift index fe31ee8a..cf2ab690 100644 --- a/KMPNativeCoroutinesAsync/AsyncSequence.swift +++ b/KMPNativeCoroutinesAsync/AsyncSequence.swift @@ -26,7 +26,8 @@ public struct NativeFlowAsyncSequence: AsyncSequen private let semaphore = DispatchSemaphore(value: 1) private var nativeCancellable: NativeCancellable? - private var item: (Output, () -> Unit)? = nil + private var item: Output? = nil + private var next: (() -> Unit)? = nil private var result: Failure?? = Optional.none private var cancellationError: Failure? = nil private var continuation: UnsafeContinuation? = nil @@ -38,11 +39,11 @@ public struct NativeFlowAsyncSequence: AsyncSequen if let continuation = self.continuation { continuation.resume(returning: item) self.continuation = nil - return next() } else { - self.item = (item, next) - return unit + self.item = item } + self.next = next + return unit }, { error, unit in self.semaphore.wait() defer { self.semaphore.signal() } @@ -75,9 +76,8 @@ public struct NativeFlowAsyncSequence: AsyncSequen try await withUnsafeThrowingContinuation { continuation in self.semaphore.wait() defer { self.semaphore.signal() } - if let (item, next) = self.item { + if let item = self.item { continuation.resume(returning: item) - _ = next() self.item = nil } else if let result = self.result { if let error = result { @@ -92,6 +92,10 @@ public struct NativeFlowAsyncSequence: AsyncSequen fatalError("Concurrent calls to next aren't supported") } self.continuation = continuation + if let next = self.next { + _ = next() + self.next = nil + } } } } onCancel: { diff --git a/sample/Async/AsyncSequenceIntegrationTests.swift b/sample/Async/AsyncSequenceIntegrationTests.swift index 04177f40..fe6dade1 100644 --- a/sample/Async/AsyncSequenceIntegrationTests.swift +++ b/sample/Async/AsyncSequenceIntegrationTests.swift @@ -41,8 +41,7 @@ class AsyncSequenceIntegrationTests: XCTestCase { var receivedValueCount: Int32 = 0 for try await _ in sequence { let emittedCount = integrationTests.emittedCount - // Note the AsyncSequence buffers at most a single item - XCTAssert(emittedCount == receivedValueCount || emittedCount == receivedValueCount + 1, "Back pressure isn't applied") + XCTAssert(emittedCount == receivedValueCount, "Back pressure isn't applied") delay(0.2) receivedValueCount += 1 } From 7e366e6aa5acb0522e7c9cd3d82cfba78e298943 Mon Sep 17 00:00:00 2001 From: Rick Clephas Date: Wed, 5 Mar 2025 09:02:18 +0100 Subject: [PATCH 2/4] Simplify AsyncSequence implementation by using a single state --- KMPNativeCoroutinesAsync/AsyncSequence.swift | 133 +++++++++++-------- 1 file changed, 81 insertions(+), 52 deletions(-) diff --git a/KMPNativeCoroutinesAsync/AsyncSequence.swift b/KMPNativeCoroutinesAsync/AsyncSequence.swift index cf2ab690..35335a21 100644 --- a/KMPNativeCoroutinesAsync/AsyncSequence.swift +++ b/KMPNativeCoroutinesAsync/AsyncSequence.swift @@ -24,51 +24,83 @@ public struct NativeFlowAsyncSequence: AsyncSequen public class Iterator: AsyncIteratorProtocol, @unchecked Sendable { + private enum State { + case new(NativeFlow) + case producing(UnsafeContinuation) + case consuming(() -> Unit) + case completed(Failure?) + case cancelled(Failure) + } + private let semaphore = DispatchSemaphore(value: 1) + private var state: State private var nativeCancellable: NativeCancellable? - private var item: Output? = nil - private var next: (() -> Unit)? = nil - private var result: Failure?? = Optional.none - private var cancellationError: Failure? = nil - private var continuation: UnsafeContinuation? = nil - init(nativeFlow: NativeFlow) { - nativeCancellable = nativeFlow({ item, next, unit in - self.semaphore.wait() - defer { self.semaphore.signal() } - if let continuation = self.continuation { - continuation.resume(returning: item) - self.continuation = nil + init(nativeFlow: @escaping NativeFlow) { + state = .new(nativeFlow) + } + + private func onItem(item: Output, next: @escaping () -> Unit, unit: Unit) -> Unit { + semaphore.wait() + defer { semaphore.signal() } + switch state { + case .new: + fatalError("onItem can't be called while in state new") + case .producing(let continuation): + continuation.resume(returning: item) + state = .consuming(next) + return unit + case .consuming: + fatalError("onItem can't be called while in state consuming") + case .completed: + fatalError("onItem can't be called while in state completed") + case .cancelled: + fatalError("onItem can't be called while in state cancelled") + } + } + + private func onComplete(error: Failure?, unit: Unit) -> Unit { + semaphore.wait() + defer { semaphore.signal() } + switch state { + case .new: + fatalError("onComplete can't be called while in state new") + case .producing(let continuation): + if let error { + continuation.resume(throwing: error) } else { - self.item = item + continuation.resume(returning: nil) } - self.next = next + state = .completed(error) return unit - }, { error, unit in - self.semaphore.wait() - defer { self.semaphore.signal() } - self.result = Optional.some(error) - if let continuation = self.continuation { - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: nil) - } - self.continuation = nil - } - self.nativeCancellable = nil + case .consuming: + state = .completed(error) return unit - }, { cancellationError, unit in - self.semaphore.wait() - defer { self.semaphore.signal() } - self.cancellationError = cancellationError - if let continuation = self.continuation { - continuation.resume(returning: nil) - self.continuation = nil - } - self.nativeCancellable = nil + case .completed: + return unit + case .cancelled: + return unit + } + } + + private func onCancelled(error: Failure, unit: Unit) -> Unit { + semaphore.wait() + defer { semaphore.signal() } + switch state { + case .new: + fatalError("onCancelled can't be called while in state new") + case .producing(let continuation): + continuation.resume(returning: nil) + state = .cancelled(error) return unit - }) + case .consuming: + state = .cancelled(error) + return unit + case .completed: + return unit + case .cancelled: + return unit + } } public func next() async throws -> Output? { @@ -76,26 +108,23 @@ public struct NativeFlowAsyncSequence: AsyncSequen try await withUnsafeThrowingContinuation { continuation in self.semaphore.wait() defer { self.semaphore.signal() } - if let item = self.item { - continuation.resume(returning: item) - self.item = nil - } else if let result = self.result { - if let error = result { + switch state { + case .new(let nativeFlow): + nativeCancellable = nativeFlow(onItem, onComplete, onCancelled) + state = .producing(continuation) + case .producing: + fatalError("Concurrent calls to next aren't supported") + case .consuming(let next): + _ = next() + state = .producing(continuation) + case .completed(let error): + if let error { continuation.resume(throwing: error) } else { continuation.resume(returning: nil) } - } else if self.cancellationError != nil { + case .cancelled: continuation.resume(throwing: CancellationError()) - } else { - guard self.continuation == nil else { - fatalError("Concurrent calls to next aren't supported") - } - self.continuation = continuation - if let next = self.next { - _ = next() - self.next = nil - } } } } onCancel: { From 000a34e7655df79d4a46304d34149c1d26cccd17 Mon Sep 17 00:00:00 2001 From: Rick Clephas Date: Wed, 5 Mar 2025 20:53:14 +0100 Subject: [PATCH 3/4] Handle AsyncSequence cancellation consistently --- KMPNativeCoroutinesAsync/AsyncSequence.swift | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/KMPNativeCoroutinesAsync/AsyncSequence.swift b/KMPNativeCoroutinesAsync/AsyncSequence.swift index 35335a21..1e658ca3 100644 --- a/KMPNativeCoroutinesAsync/AsyncSequence.swift +++ b/KMPNativeCoroutinesAsync/AsyncSequence.swift @@ -29,7 +29,7 @@ public struct NativeFlowAsyncSequence: AsyncSequen case producing(UnsafeContinuation) case consuming(() -> Unit) case completed(Failure?) - case cancelled(Failure) + case cancelled } private let semaphore = DispatchSemaphore(value: 1) @@ -90,11 +90,11 @@ public struct NativeFlowAsyncSequence: AsyncSequen case .new: fatalError("onCancelled can't be called while in state new") case .producing(let continuation): - continuation.resume(returning: nil) - state = .cancelled(error) + continuation.resume(throwing: CancellationError()) + state = .cancelled return unit case .consuming: - state = .cancelled(error) + state = .cancelled return unit case .completed: return unit @@ -128,8 +128,14 @@ public struct NativeFlowAsyncSequence: AsyncSequen } } } onCancel: { + self.semaphore.wait() + if case .new = state { + state = .cancelled + } + let nativeCancellable = self.nativeCancellable + self.nativeCancellable = nil + self.semaphore.signal() _ = nativeCancellable?() - nativeCancellable = nil } } } From 415893e4280ae41920151b32c5cf36a0eadf455a Mon Sep 17 00:00:00 2001 From: Rick Clephas Date: Wed, 5 Mar 2025 20:53:54 +0100 Subject: [PATCH 4/4] Fix AsyncSequence tests to properly simulate async operations --- KMPNativeCoroutinesAsyncTests/AsyncSequenceTests.swift | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/KMPNativeCoroutinesAsyncTests/AsyncSequenceTests.swift b/KMPNativeCoroutinesAsyncTests/AsyncSequenceTests.swift index 320d3f43..1e25fe42 100644 --- a/KMPNativeCoroutinesAsyncTests/AsyncSequenceTests.swift +++ b/KMPNativeCoroutinesAsyncTests/AsyncSequenceTests.swift @@ -13,7 +13,7 @@ class AsyncSequenceTests: XCTestCase { private class TestValue { } - func testCancellableInvoked() async { + func testCancellableInvoked() async throws { var cancelCount = 0 let nativeFlow: NativeFlow = { _, _, cancelCallback in return { @@ -25,6 +25,7 @@ class AsyncSequenceTests: XCTestCase { for try await _ in asyncSequence(for: nativeFlow) { } } XCTAssertEqual(cancelCount, 0, "Cancellable shouldn't be invoked yet") + try await Task.sleep(nanoseconds: 10_000_000) // Gives the sequence a moment to start handle.cancel() let result = await handle.result XCTAssertEqual(cancelCount, 1, "Cancellable should be invoked once") @@ -65,8 +66,10 @@ class AsyncSequenceTests: XCTestCase { func testCompletionWithError() async { let sendError = NSError(domain: "Test", code: 0) let nativeFlow: NativeFlow = { _, completionCallback, _ in - completionCallback(sendError, ()) - return { } + let handle = Task { + completionCallback(sendError, ()) + } + return { handle.cancel() } } var valueCount = 0 do {