-
Notifications
You must be signed in to change notification settings - Fork 6.9k
[core] Implement a thread pool and call the CPython API on all threads within the same concurrency group #52575
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
4aa0cee
0a2b539
2b692dc
1950902
e63ba54
f93ee4a
26feff9
ec0c28b
d39ac82
940cc81
6cf1eab
5356c7c
1130e71
40c0c04
a5d444e
430ef36
fd48457
3a641b4
4f19b33
8123c32
77879b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -197,8 +197,12 @@ def get_thread_local(self) -> Tuple[Any, int]: | |
|
|
||
| class TestThreadingLocalData: | ||
| """ | ||
| This test verifies that synchronous tasks can access thread local data | ||
| that was set by previous synchronous tasks. | ||
| This test verifies that synchronous tasks can access thread-local data that | ||
| was set by previous synchronous tasks when the concurrency group has only | ||
| one thread. For concurrency groups with multiple threads, it doesn't promise | ||
| access to the same thread-local data because Ray currently doesn't expose APIs | ||
| for users to specify which thread the task will be scheduled on in the same | ||
| concurrency group. | ||
| """ | ||
|
|
||
| def test_tasks_on_default_executor(self, ray_start_regular_shared): | ||
|
|
@@ -236,6 +240,39 @@ def test_tasks_on_different_executors(self, ray_start_regular_shared): | |
| assert value == "f2" | ||
|
|
||
|
|
||
| def test_multiple_threads_in_same_group(ray_start_regular_shared): | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| """ | ||
| This test verifies that all threads in the same concurrency group are still | ||
| alive from the Python interpreter's perspective even if Ray tasks have finished, so that | ||
| thread-local data will not be garbage collected. | ||
| """ | ||
|
|
||
| @ray.remote | ||
| class Actor: | ||
| def __init__(self): | ||
| self.data = 0 | ||
| self._thread_local_data = threading.local() | ||
|
|
||
| def set_thread_local(self, value: Any) -> int: | ||
| # If the thread-local data were garbage collected after the previous | ||
| # task on the same thread finished, `self.data` would be incremented | ||
| # more than once for the same thread. | ||
| if not hasattr(self._thread_local_data, "value"): | ||
| self._thread_local_data.value = self.data | ||
| self.data += 1 | ||
| assert self._thread_local_data.value <= self.data | ||
|
|
||
| def get_data(self) -> int: | ||
| return self.data | ||
|
|
||
| max_concurrency = 5 | ||
| a = Actor.options(max_concurrency=max_concurrency).remote() | ||
| for _ in range(200): | ||
| for i in range(max_concurrency): | ||
| ray.get(a.set_thread_local.remote(i)) | ||
| assert ray.get(a.get_data.remote()) == max_concurrency | ||
kevin85421 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def test_invalid_concurrency_group(): | ||
| """Verify that when a concurrency group has max concurrency set to 0, | ||
| an error is raised when the actor is created. This test uses | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,21 +14,57 @@ | |
|
|
||
| #include "ray/core_worker/transport/thread_pool.h" | ||
|
|
||
| #include <boost/asio/post.hpp> | ||
| #include <future> | ||
| #include <memory> | ||
| #include <utility> | ||
|
|
||
| namespace ray { | ||
| namespace core { | ||
|
|
||
| BoundedExecutor::BoundedExecutor(int max_concurrency) { | ||
| BoundedExecutor::BoundedExecutor( | ||
| int max_concurrency, | ||
| std::function<std::function<void()>()> initialize_thread_callback) | ||
kevin85421 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| : work_guard_(boost::asio::make_work_guard(io_context_)) { | ||
| RAY_CHECK(max_concurrency > 0) << "max_concurrency must be greater than 0"; | ||
| pool_ = std::make_unique<boost::asio::thread_pool>(max_concurrency); | ||
| threads_.reserve(max_concurrency); | ||
| for (int i = 0; i < max_concurrency; i++) { | ||
| std::promise<void> init_promise; | ||
| auto init_future = init_promise.get_future(); | ||
| threads_.emplace_back([this, initialize_thread_callback, &init_promise]() { | ||
kevin85421 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| std::function<void()> releaser; | ||
| if (initialize_thread_callback) { | ||
| releaser = initialize_thread_callback(); | ||
| } | ||
| init_promise.set_value(); | ||
| io_context_.run(); | ||
kevin85421 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (releaser) { | ||
| releaser(); | ||
| } | ||
| }); | ||
| init_future.wait(); | ||
kevin85421 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| void BoundedExecutor::Post(std::function<void()> fn) { | ||
| boost::asio::post(io_context_, std::move(fn)); | ||
| } | ||
|
|
||
| /// Stop the thread pool. | ||
| void BoundedExecutor::Stop() { pool_->stop(); } | ||
| void BoundedExecutor::Stop() { | ||
| work_guard_.reset(); | ||
| io_context_.stop(); | ||
| } | ||
|
|
||
| /// Join the thread pool. | ||
| void BoundedExecutor::Join() { pool_->join(); } | ||
| void BoundedExecutor::Join() { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this implement a timeout to avoid hanging in shutdown if one of the threads is hanging in user code?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what would a reasonable timeout be? this is running arbitrary user code right. Also user can always sigint and signal checking will exit out.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I’d prefer not to implement a timeout for
|
||
| work_guard_.reset(); | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maintain the previous behavior. We can’t assume that It's fine to call |
||
| for (auto &thread : threads_) { | ||
| if (thread.joinable()) { | ||
| thread.join(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| } // namespace core | ||
| } // namespace ray | ||
Uh oh!
There was an error while loading. Please reload this page.