77#include " communicator.h"
88#include " recorders.h"
99
10- namespace celerity ::detail {
11- struct runtime_testspy ;
12- }
13-
1410namespace celerity ::detail::divergence_checker_detail {
1511using task_hash = size_t ;
1612using divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
@@ -72,29 +68,32 @@ class divergence_block_chain {
7268 std::mutex m_task_records_mutex;
7369
7470 std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
71+ std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0 );
7572
7673 std::unique_ptr<communicator> m_communicator;
7774
78- void divergence_out (const divergence_map& check_map, const int task_num);
75+ void reprot_divergence (const divergence_map& check_map, const int task_num);
7976
8077 void add_new_hashes ();
8178 void clear (const int min_progress);
8279 std::pair<int , int > collect_hash_counts ();
8380 per_node_task_hashes collect_hashes (const int min_hash_count) const ;
84- divergence_map create_check_map (const per_node_task_hashes& task_hashes, const int task_num) const ;
81+ divergence_map create_divergence_map (const per_node_task_hashes& task_hashes, const int task_num) const ;
8582
86- void check_for_deadlock () const ;
83+ void check_for_deadlock ();
8784
88- static void log_node_divergences (const divergence_map& check_map, const int task_num );
85+ static void log_node_divergences (const divergence_map& check_map, const int task_id );
8986 static void log_task_record (const divergence_map& check_map, const task_record& task, const task_hash hash);
9087 void log_task_record_once (const divergence_map& check_map, const int task_num);
9188
9289 void add_new_task (const task_record& task);
9390 task_record thread_save_get_task_record (const size_t task_num);
9491};
92+ }; // namespace celerity::detail::divergence_checker_detail
9593
94+ namespace celerity ::detail {
9695class divergence_checker {
97- friend struct ::celerity::detail:: runtime_testspy;
96+ friend struct runtime_testspy ;
9897
9998 public:
10099 divergence_checker (task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool test_mode = false )
@@ -111,6 +110,10 @@ class divergence_checker {
111110 ~divergence_checker () { stop (); }
112111
113112 private:
113+ std::thread m_thread;
114+ bool m_is_running = false ;
115+ divergence_checker_detail::divergence_block_chain m_block_chain;
116+
114117 void start () {
115118 m_thread = std::thread (&divergence_checker::run, this );
116119 m_is_running = true ;
@@ -129,9 +132,5 @@ class divergence_checker {
129132 std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
130133 }
131134 }
132-
133- std::thread m_thread;
134- bool m_is_running = false ;
135- divergence_block_chain m_block_chain;
136135};
137- }; // namespace celerity::detail::divergence_checker_detail
136+ };
0 commit comments