Skip to content

Commit 60edf0a

Browse files
neakorrudro
authored andcommitted
Signal concurrency limiting semaphore when task errors (#31)
This fixes the issue where if a task throws an error during execution, the concurrency limiting semaphore is never signaled. This can hault the entire sequence since no new tasks can continue execution.
1 parent c773c32 commit 60edf0a

File tree

3 files changed

+114
-50
lines changed

3 files changed

+114
-50
lines changed

Concurrency.xcodeproj/project.pbxproj

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -110,66 +110,76 @@
110110
);
111111
};
112112
"OBJ_10" = {
113-
isa = "PBXFileReference";
114-
path = "AtomicInt.swift";
113+
isa = "PBXGroup";
114+
children = (
115+
"OBJ_11",
116+
"OBJ_12"
117+
);
118+
name = "include";
119+
path = "include";
115120
sourceTree = "<group>";
116121
};
117122
"OBJ_11" = {
118123
isa = "PBXFileReference";
119-
path = "AtomicReference.swift";
124+
path = "AtomicBridges.h";
120125
sourceTree = "<group>";
121126
};
122127
"OBJ_12" = {
123128
isa = "PBXFileReference";
124-
path = "AutoReleasingSemaphore.swift";
129+
name = "module.modulemap";
130+
path = "/Users/yiw/Uber/GitHub/swift-concurrency/Concurrency.xcodeproj/GeneratedModuleMap/ObjCBridges/module.modulemap";
125131
sourceTree = "<group>";
126132
};
127133
"OBJ_13" = {
128-
isa = "PBXFileReference";
129-
path = "CountDownLatch.swift";
130-
sourceTree = "<group>";
131-
};
132-
"OBJ_14" = {
133134
isa = "PBXGroup";
134135
children = (
136+
"OBJ_14",
135137
"OBJ_15",
136138
"OBJ_16",
137139
"OBJ_17",
138-
"OBJ_18"
140+
"OBJ_18",
141+
"OBJ_19"
139142
);
140-
name = "Executor";
141-
path = "Executor";
143+
name = "Concurrency";
144+
path = "Sources/Concurrency";
145+
sourceTree = "SOURCE_ROOT";
146+
};
147+
"OBJ_14" = {
148+
isa = "PBXFileReference";
149+
path = "AtomicBool.swift";
142150
sourceTree = "<group>";
143151
};
144152
"OBJ_15" = {
145153
isa = "PBXFileReference";
146-
path = "ConcurrentSequenceExecutor.swift";
154+
path = "AtomicInt.swift";
147155
sourceTree = "<group>";
148156
};
149157
"OBJ_16" = {
150158
isa = "PBXFileReference";
151-
path = "ImmediateSerialSequenceExecutor.swift";
159+
path = "AtomicReference.swift";
152160
sourceTree = "<group>";
153161
};
154162
"OBJ_17" = {
155163
isa = "PBXFileReference";
156-
path = "SequenceExecutor.swift";
164+
path = "AutoReleasingSemaphore.swift";
157165
sourceTree = "<group>";
158166
};
159167
"OBJ_18" = {
160168
isa = "PBXFileReference";
161-
path = "Task.swift";
169+
path = "CountDownLatch.swift";
162170
sourceTree = "<group>";
163171
};
164172
"OBJ_19" = {
165173
isa = "PBXGroup";
166174
children = (
167175
"OBJ_20",
168-
"OBJ_21"
176+
"OBJ_21",
177+
"OBJ_22",
178+
"OBJ_23"
169179
);
170-
name = "ObjCBridges";
171-
path = "Sources/ObjCBridges";
172-
sourceTree = "SOURCE_ROOT";
180+
name = "Executor";
181+
path = "Executor";
182+
sourceTree = "<group>";
173183
};
174184
"OBJ_2" = {
175185
isa = "XCConfigurationList";
@@ -182,28 +192,22 @@
182192
};
183193
"OBJ_20" = {
184194
isa = "PBXFileReference";
185-
path = "AtomicBridges.m";
195+
path = "ConcurrentSequenceExecutor.swift";
186196
sourceTree = "<group>";
187197
};
188198
"OBJ_21" = {
189-
isa = "PBXGroup";
190-
children = (
191-
"OBJ_22",
192-
"OBJ_23"
193-
);
194-
name = "include";
195-
path = "include";
199+
isa = "PBXFileReference";
200+
path = "ImmediateSerialSequenceExecutor.swift";
196201
sourceTree = "<group>";
197202
};
198203
"OBJ_22" = {
199204
isa = "PBXFileReference";
200-
path = "AtomicBridges.h";
205+
path = "SequenceExecutor.swift";
201206
sourceTree = "<group>";
202207
};
203208
"OBJ_23" = {
204209
isa = "PBXFileReference";
205-
name = "module.modulemap";
206-
path = "/Users/yiw/Uber/GitHub/swift-concurrency/Concurrency.xcodeproj/GeneratedModuleMap/ObjCBridges/module.modulemap";
210+
path = "Task.swift";
207211
sourceTree = "<group>";
208212
};
209213
"OBJ_24" = {
@@ -457,35 +461,35 @@
457461
};
458462
"OBJ_42" = {
459463
isa = "PBXBuildFile";
460-
fileRef = "OBJ_9";
464+
fileRef = "OBJ_14";
461465
};
462466
"OBJ_43" = {
463467
isa = "PBXBuildFile";
464-
fileRef = "OBJ_10";
468+
fileRef = "OBJ_15";
465469
};
466470
"OBJ_44" = {
467471
isa = "PBXBuildFile";
468-
fileRef = "OBJ_11";
472+
fileRef = "OBJ_16";
469473
};
470474
"OBJ_45" = {
471475
isa = "PBXBuildFile";
472-
fileRef = "OBJ_12";
476+
fileRef = "OBJ_17";
473477
};
474478
"OBJ_46" = {
475479
isa = "PBXBuildFile";
476-
fileRef = "OBJ_13";
480+
fileRef = "OBJ_18";
477481
};
478482
"OBJ_47" = {
479483
isa = "PBXBuildFile";
480-
fileRef = "OBJ_15";
484+
fileRef = "OBJ_20";
481485
};
482486
"OBJ_48" = {
483487
isa = "PBXBuildFile";
484-
fileRef = "OBJ_16";
488+
fileRef = "OBJ_21";
485489
};
486490
"OBJ_49" = {
487491
isa = "PBXBuildFile";
488-
fileRef = "OBJ_17";
492+
fileRef = "OBJ_22";
489493
};
490494
"OBJ_5" = {
491495
isa = "PBXGroup";
@@ -500,7 +504,7 @@
500504
};
501505
"OBJ_50" = {
502506
isa = "PBXBuildFile";
503-
fileRef = "OBJ_18";
507+
fileRef = "OBJ_23";
504508
};
505509
"OBJ_51" = {
506510
isa = "PBXFrameworksBuildPhase";
@@ -693,7 +697,7 @@
693697
isa = "PBXGroup";
694698
children = (
695699
"OBJ_8",
696-
"OBJ_19"
700+
"OBJ_13"
697701
);
698702
name = "Sources";
699703
path = "";
@@ -753,14 +757,10 @@
753757
isa = "PBXGroup";
754758
children = (
755759
"OBJ_9",
756-
"OBJ_10",
757-
"OBJ_11",
758-
"OBJ_12",
759-
"OBJ_13",
760-
"OBJ_14"
760+
"OBJ_10"
761761
);
762-
name = "Concurrency";
763-
path = "Sources/Concurrency";
762+
name = "ObjCBridges";
763+
path = "Sources/ObjCBridges";
764764
sourceTree = "SOURCE_ROOT";
765765
};
766766
"OBJ_80" = {
@@ -864,7 +864,7 @@
864864
};
865865
"OBJ_86" = {
866866
isa = "PBXBuildFile";
867-
fileRef = "OBJ_20";
867+
fileRef = "OBJ_9";
868868
};
869869
"OBJ_87" = {
870870
isa = "PBXFrameworksBuildPhase";
@@ -873,7 +873,7 @@
873873
};
874874
"OBJ_9" = {
875875
isa = "PBXFileReference";
876-
path = "AtomicBool.swift";
876+
path = "AtomicBridges.m";
877877
sourceTree = "<group>";
878878
};
879879
};

Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public class ConcurrentSequenceExecutor: SequenceExecutor {
9393
sequenceHandle.sequenceDidComplete(with: result)
9494
}
9595
} catch {
96+
self.taskSemaphore?.signal()
9697
sequenceHandle.sequenceDidError(with: error)
9798
}
9899
}

Tests/ConcurrencyTests/Executor/ConcurrentSequenceExecutorTests.swift

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,69 @@ class ConcurrentSequenceExecutorTests: XCTestCase {
173173
}
174174
}
175175
}
176+
177+
func test_executeSequence_limitMaxConcurrentTasks_withSuccessTasks_verifyCompletion() {
178+
let executor = ConcurrentSequenceExecutor(name: "test_executeSequence_withNonTerminatingSequence_withTimeout_verifyAwaitTimeout", maxConcurrentTasks: 1)
179+
180+
let taskCount = AtomicInt(initialValue: 0)
181+
let sequencedTask = MockSelfRepeatingTask(id: 123) {
182+
return 0
183+
}
184+
185+
let handle = executor.executeSequence(from: sequencedTask) { _, _ -> SequenceExecution<Int> in
186+
let nextTask = MockSelfRepeatingTask(id: 123) {
187+
return 0
188+
}
189+
let newCount = taskCount.incrementAndGet()
190+
if newCount > 10 {
191+
return .endOfSequence(32838)
192+
} else {
193+
return .continueSequence(nextTask)
194+
}
195+
}
196+
197+
do {
198+
_ = try handle.await(withTimeout: 0.5)
199+
} catch {
200+
XCTFail()
201+
}
202+
}
203+
204+
func test_executeSequence_limitMaxConcurrentTasks_withErrorTasks_verifyCompletion() {
205+
let executor = ConcurrentSequenceExecutor(name: "test_executeSequence_withNonTerminatingSequence_withTimeout_verifyAwaitTimeout", maxConcurrentTasks: 1)
206+
207+
let taskCount = AtomicInt(initialValue: 0)
208+
let sequencedTask = MockSelfRepeatingTask(id: 123) {
209+
return 0
210+
}
211+
212+
let handle = executor.executeSequence(from: sequencedTask) { _, _ -> SequenceExecution<Int> in
213+
let newCount = taskCount.incrementAndGet()
214+
if newCount > 10 {
215+
let errorTask = MockSelfRepeatingTask(id: 123) {
216+
throw MockError.messagedError("ghasvhfjhbafjkh")
217+
}
218+
return .continueSequence(errorTask)
219+
} else {
220+
let nextTask = MockSelfRepeatingTask(id: 123) {
221+
return 0
222+
}
223+
return .continueSequence(nextTask)
224+
}
225+
}
226+
227+
do {
228+
_ = try handle.await(withTimeout: nil)
229+
XCTFail()
230+
} catch {
231+
switch error {
232+
case MockError.messagedError(let message):
233+
XCTAssertEqual(message, "ghasvhfjhbafjkh")
234+
default:
235+
XCTFail()
236+
}
237+
}
238+
}
176239
}
177240

178241
class MockSelfRepeatingTask: AbstractTask<Int> {

0 commit comments

Comments
 (0)