Skip to content

Commit 311f156

Browse files
author
Brennan Stehling
committed
revised to support AsyncChannel and AsyncThrowingChannel as the primary types
1 parent 0cab4e9 commit 311f156

File tree

5 files changed

+180
-72
lines changed

5 files changed

+180
-72
lines changed

Sources/AsyncChannelKit/AsyncChannel.swift

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,63 @@
11
import Foundation
22

3-
public actor AsyncChannel<Element: Sendable> {
4-
public enum Failure: Error {
3+
public actor AsyncChannel<Element: Sendable>: AsyncSequence {
4+
public struct Iterator: AsyncIteratorProtocol, Sendable {
5+
private let channel: AsyncChannel<Element>
6+
7+
public init(_ channel: AsyncChannel<Element>) {
8+
self.channel = channel
9+
}
10+
11+
public mutating func next() async -> Element? {
12+
await channel.next()
13+
}
14+
}
15+
16+
public enum InternalFailure: Error {
517
case cannotSendAfterTerminated
618
}
7-
public typealias ChannelContinuation = CheckedContinuation<Element?, Error>
19+
public typealias ChannelContinuation = CheckedContinuation<Element?, Never>
820

921
private var continuations: [ChannelContinuation] = []
1022
private var elements: [Element] = []
1123
private var terminated: Bool = false
12-
private var error: Error? = nil
1324

1425
private var hasNext: Bool {
1526
!continuations.isEmpty && !elements.isEmpty
1627
}
1728

18-
private var canFail: Bool {
19-
error != nil && !continuations.isEmpty
20-
}
21-
2229
private var canTerminate: Bool {
2330
terminated && elements.isEmpty && !continuations.isEmpty
2431
}
2532

2633
public init() {
2734
}
2835

29-
public func next() async throws -> Element? {
30-
try await withCheckedThrowingContinuation { (continuation: ChannelContinuation) in
36+
public nonisolated func makeAsyncIterator() -> Iterator {
37+
Iterator(self)
38+
}
39+
40+
public func next() async -> Element? {
41+
await withCheckedContinuation { (continuation: ChannelContinuation) in
3142
continuations.append(continuation)
3243
processNext()
3344
}
3445
}
3546

36-
public func send(element: Element) throws {
47+
public func send(_ element: Element) throws {
3748
guard !terminated else {
38-
throw Failure.cannotSendAfterTerminated
49+
throw InternalFailure.cannotSendAfterTerminated
3950
}
4051
elements.append(element)
4152
processNext()
4253
}
4354

44-
public func send(error: Error) throws {
45-
guard !terminated else {
46-
throw Failure.cannotSendAfterTerminated
47-
}
48-
self.error = error
49-
processNext()
50-
}
51-
52-
public func terminate() {
55+
public func finish() {
5356
terminated = true
5457
processNext()
5558
}
5659

5760
private func processNext() {
58-
if canFail {
59-
let contination = continuations.removeFirst()
60-
assert(continuations.isEmpty)
61-
assert(elements.isEmpty)
62-
assert(error != nil)
63-
if let error = error {
64-
contination.resume(throwing: error)
65-
return
66-
}
67-
}
68-
6961
if canTerminate {
7062
let contination = continuations.removeFirst()
7163
assert(continuations.isEmpty)

Sources/AsyncChannelKit/AsyncChannelIterator.swift

Lines changed: 0 additions & 13 deletions
This file was deleted.

Sources/AsyncChannelKit/AsyncChannelSequence.swift

Lines changed: 0 additions & 15 deletions
This file was deleted.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import Foundation
2+
3+
public actor AsyncThrowingChannel<Element: Sendable, Failure: Error>: AsyncSequence {
4+
public struct Iterator: AsyncIteratorProtocol, Sendable {
5+
private let channel: AsyncThrowingChannel<Element, Failure>
6+
7+
public init(_ channel: AsyncThrowingChannel<Element, Failure>) {
8+
self.channel = channel
9+
}
10+
11+
public mutating func next() async throws -> Element? {
12+
try await channel.next()
13+
}
14+
}
15+
16+
public enum InternalFailure: Error {
17+
case cannotSendAfterTerminated
18+
}
19+
public typealias ChannelContinuation = CheckedContinuation<Element?, Error>
20+
21+
private var continuations: [ChannelContinuation] = []
22+
private var elements: [Element] = []
23+
private var terminated: Bool = false
24+
private var error: Error? = nil
25+
26+
private var hasNext: Bool {
27+
!continuations.isEmpty && !elements.isEmpty
28+
}
29+
30+
private var canFail: Bool {
31+
error != nil && !continuations.isEmpty
32+
}
33+
34+
private var canTerminate: Bool {
35+
terminated && elements.isEmpty && !continuations.isEmpty
36+
}
37+
38+
public init() {
39+
}
40+
41+
public nonisolated func makeAsyncIterator() -> Iterator {
42+
Iterator(self)
43+
}
44+
45+
public func next() async throws -> Element? {
46+
try await withCheckedThrowingContinuation { (continuation: ChannelContinuation) in
47+
continuations.append(continuation)
48+
processNext()
49+
}
50+
}
51+
52+
public func send(_ element: Element) throws {
53+
guard !terminated else {
54+
throw InternalFailure.cannotSendAfterTerminated
55+
}
56+
elements.append(element)
57+
processNext()
58+
}
59+
60+
61+
public func fail(_ error: Error) where Failure == Error {
62+
self.error = error
63+
processNext()
64+
}
65+
66+
public func finish() {
67+
terminated = true
68+
processNext()
69+
}
70+
71+
private func processNext() {
72+
if canFail {
73+
let contination = continuations.removeFirst()
74+
assert(continuations.isEmpty)
75+
assert(elements.isEmpty)
76+
assert(error != nil)
77+
if let error = error {
78+
contination.resume(throwing: error)
79+
return
80+
}
81+
}
82+
83+
if canTerminate {
84+
let contination = continuations.removeFirst()
85+
assert(continuations.isEmpty)
86+
assert(elements.isEmpty)
87+
contination.resume(returning: nil)
88+
return
89+
}
90+
91+
guard hasNext else {
92+
return
93+
}
94+
95+
assert(!continuations.isEmpty)
96+
assert(!elements.isEmpty)
97+
98+
let contination = continuations.removeFirst()
99+
let element = elements.removeFirst()
100+
101+
contination.resume(returning: element)
102+
}
103+
}

Tests/AsyncChannelKitTests/AsyncChannelKitTests.swift

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ final class AsyncChannelKitTests: XCTestCase {
1717
func testNumberSequence() async throws {
1818
let input = [1, 2, 3, 4, 5]
1919
let channel = AsyncChannel<Int>()
20-
let sequence = AsyncChannelSequence(channel: channel)
2120

2221
// load all numbers into the channel with delays
2322
Task {
@@ -27,7 +26,7 @@ final class AsyncChannelKitTests: XCTestCase {
2726
var output: [Int] = []
2827

2928
print("-- before --")
30-
for try await element in sequence {
29+
for await element in channel {
3130
print(element)
3231
output.append(element)
3332
}
@@ -39,7 +38,6 @@ final class AsyncChannelKitTests: XCTestCase {
3938
func testStringSequence() async throws {
4039
let input = ["one", "two", "three", "four", "five"]
4140
let channel = AsyncChannel<String>()
42-
let sequence = AsyncChannelSequence(channel: channel)
4341

4442
// load all strings into the channel with delays
4543
Task {
@@ -49,7 +47,7 @@ final class AsyncChannelKitTests: XCTestCase {
4947
var output: [String] = []
5048

5149
print("-- before --")
52-
for try await element in sequence {
50+
for await element in channel {
5351
print(element)
5452
output.append(element)
5553
}
@@ -58,11 +56,42 @@ final class AsyncChannelKitTests: XCTestCase {
5856
XCTAssertEqual(input, output)
5957
}
6058

59+
func testSucceedingSequence() async throws {
60+
let input = [3, 7, 14, 21]
61+
let channel = AsyncThrowingChannel<Int, Error>()
62+
63+
// load all numbers into the channel with delays
64+
Task {
65+
try await send(elements: input, channel: channel, sleepSeconds: sleepSeconds) { element in
66+
if element == 13 {
67+
throw Failure.unluckyNumber
68+
} else {
69+
return element
70+
}
71+
}
72+
}
73+
74+
var output: [Int] = []
75+
var thrown: Error? = nil
76+
77+
print("-- before --")
78+
do {
79+
for try await element in channel {
80+
print(element)
81+
output.append(element)
82+
}
83+
} catch {
84+
thrown = error
85+
}
86+
print("-- after --")
87+
88+
XCTAssertNil(thrown)
89+
XCTAssertEqual(input, output)
90+
}
6191

6292
func testFailingSequence() async throws {
6393
let input = [3, 7, 13, 21]
64-
let channel = AsyncChannel<Int>()
65-
let sequence = AsyncChannelSequence(channel: channel)
94+
let channel = AsyncThrowingChannel<Int, Error>()
6695

6796
// load all numbers into the channel with delays
6897
Task {
@@ -80,7 +109,7 @@ final class AsyncChannelKitTests: XCTestCase {
80109

81110
print("-- before --")
82111
do {
83-
for try await element in sequence {
112+
for try await element in channel {
84113
print(element)
85114
output.append(element)
86115
}
@@ -94,25 +123,37 @@ final class AsyncChannelKitTests: XCTestCase {
94123
XCTAssertEqual(expected, output)
95124
}
96125

97-
private func send<Element>(elements: [Element], channel: AsyncChannel<Element>, sleepSeconds: Double = 0.1, processor: ((Element) throws -> Element)? = nil) async throws {
126+
private func send<Element>(elements: [Element], channel: AsyncChannel<Element>, sleepSeconds: Double = 0.1) async throws {
127+
var index = 0
128+
while index < elements.count {
129+
try await Task.sleep(seconds: sleepSeconds)
130+
let element = elements[index]
131+
try await channel.send(element)
132+
133+
index += 1
134+
}
135+
await channel.finish()
136+
}
137+
138+
private func send<Element>(elements: [Element], channel: AsyncThrowingChannel<Element, Error>, sleepSeconds: Double = 0.1, processor: ((Element) throws -> Element)? = nil) async throws {
98139
var index = 0
99140
while index < elements.count {
100141
try await Task.sleep(seconds: sleepSeconds)
101142
let element = elements[index]
102143
if let processor = processor {
103144
do {
104145
let processed = try processor(element)
105-
try await channel.send(element: processed)
146+
try await channel.send(processed)
106147
} catch {
107148
print("throwing \(error)")
108-
try await channel.send(error: error)
149+
await channel.fail(error)
109150
}
110151
} else {
111-
try await channel.send(element: element)
152+
try await channel.send(element)
112153
}
113154

114155
index += 1
115156
}
116-
await channel.terminate()
157+
await channel.finish()
117158
}
118159
}

0 commit comments

Comments
 (0)