Skip to content

Commit 8040f9a

Browse files
committed
pythongh-109934: notify cancelled futures on thread pool shutdown
When `ThreadPoolExecutor` shuts down it cancels any pending futures, however at present it doesn't notify waiters. Thus their state stays as `CANCELLED` instead of `CANCELLED_AND_NOTIFIED` and any waiters are not awakened. Call `set_running_or_notify_cancel` on the cancelled futures to fix this.
1 parent a66bae8 commit 8040f9a

File tree

3 files changed

+53
-0
lines changed

3 files changed

+53
-0
lines changed

Lib/concurrent/futures/thread.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ def shutdown(self, wait=True, *, cancel_futures=False):
264264
break
265265
if work_item is not None:
266266
work_item.future.cancel()
267+
work_item.future.set_running_or_notify_cancel()
267268

268269
# Send a wake-up to prevent threads calling
269270
# _work_queue.get(block=True) from permanently blocking.

Lib/test/test_concurrent_futures/test_thread_pool.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,56 @@ def log_n_wait(ident):
112112
# ident='third' is cancelled because it remained in the collection of futures
113113
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
114114

115+
def test_shutdown_cancels_pending_futures(self):
116+
def waiter(barrier):
117+
barrier.wait(3)
118+
def noop():
119+
pass
120+
barrier = threading.Barrier(2)
121+
called_back_1 = threading.Event()
122+
called_back_2 = threading.Event()
123+
with self.executor_type(max_workers=1) as pool:
124+
125+
# Submit two futures, the first of which will block and prevent the
126+
# second from running
127+
f1 = pool.submit(waiter, barrier)
128+
f2 = pool.submit(noop)
129+
f1.add_done_callback(lambda f: called_back_1.set())
130+
f2.add_done_callback(lambda f: called_back_2.set())
131+
fs = {f1, f2}
132+
133+
completed_iter = futures.as_completed(fs, timeout=0)
134+
self.assertRaises(TimeoutError, next, completed_iter)
135+
136+
# Shutdown the pool, cancelling unstarted task
137+
pool.shutdown(wait=False, cancel_futures=True)
138+
self.assertTrue(f1.running())
139+
self.assertTrue(f2.cancelled())
140+
self.assertFalse(called_back_1.is_set())
141+
self.assertTrue(called_back_2.is_set())
142+
143+
completed_iter = futures.as_completed(fs, timeout=0)
144+
f = next(completed_iter)
145+
self.assertIs(f, f2)
146+
self.assertRaises(TimeoutError, next, completed_iter)
147+
148+
result = futures.wait(fs, timeout=0)
149+
self.assertEqual(result.not_done, {f1})
150+
self.assertEqual(result.done, {f2})
151+
152+
# Unblock and wait for the first future to complete
153+
barrier.wait(3)
154+
called_back_1.wait(3)
155+
self.assertTrue(f1.done())
156+
self.assertTrue(called_back_1.is_set())
157+
158+
completed = set(futures.as_completed(fs, timeout=0))
159+
self.assertEqual(set(fs), completed)
160+
161+
result = futures.wait(fs, timeout=0)
162+
self.assertEqual(result.not_done, set())
163+
self.assertEqual(result.done, set(fs))
164+
115165

116166
def setUpModule():
117167
setup_module()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Ensure :class:`concurrent.futures.ThreadPoolExecutor` notifies any futures
2+
it cancels on shutdown.

0 commit comments

Comments
 (0)