77#include  " communicator.h" 
88#include  " recorders.h" 
99
10- namespace  celerity ::detail {
11- struct  runtime_testspy ;
12- }
13- 
1410namespace  celerity ::detail::divergence_checker_detail {
1511using  task_hash = size_t ;
1612using  divergence_map = std::unordered_map<task_hash, std::vector<node_id>>;
@@ -67,34 +63,38 @@ class divergence_block_chain {
6763	std::vector<task_record> m_task_records;
6864	size_t  m_tasks_checked = 0 ;
6965	size_t  m_hashes_added = 0 ;
66+ 	task_hash m_last_hash = 0 ;
7067
7168	std::vector<int > m_per_node_hash_counts;
7269	std::mutex m_task_records_mutex;
7370
7471	std::chrono::time_point<std::chrono::steady_clock> m_last_cleared = std::chrono::steady_clock::now();
72+ 	std::chrono::seconds m_time_of_last_warning = std::chrono::seconds(0 );
7573
7674	std::unique_ptr<communicator> m_communicator;
7775
78- 	void  divergence_out (const  divergence_map& check_map, const  int  task_num);
76+ 	void  reprot_divergence (const  divergence_map& check_map, const  int  task_num);
7977
8078	void  add_new_hashes ();
8179	void  clear (const  int  min_progress);
8280	std::pair<int , int > collect_hash_counts ();
8381	per_node_task_hashes collect_hashes (const  int  min_hash_count) const ;
84- 	divergence_map create_check_map (const  per_node_task_hashes& task_hashes, const  int  task_num) const ;
82+ 	divergence_map create_divergence_map (const  per_node_task_hashes& task_hashes, const  int  task_num) const ;
8583
86- 	void  check_for_deadlock ()  const ;
84+ 	void  check_for_deadlock ();
8785
88- 	static  void  log_node_divergences (const  divergence_map& check_map, const  int  task_num );
86+ 	static  void  log_node_divergences (const  divergence_map& check_map, const  int  task_id );
8987	static  void  log_task_record (const  divergence_map& check_map, const  task_record& task, const  task_hash hash);
9088	void  log_task_record_once (const  divergence_map& check_map, const  int  task_num);
9189
9290	void  add_new_task (const  task_record& task);
9391	task_record thread_save_get_task_record (const  size_t  task_num);
9492};
93+ }; //  namespace celerity::detail::divergence_checker_detail
9594
95+ namespace  celerity ::detail {
9696class  divergence_checker  {
97- 	friend  struct  ::celerity::detail:: runtime_testspy;
97+ 	friend  struct  runtime_testspy ;
9898
9999  public: 
100100	divergence_checker (task_recorder& task_recorder, std::unique_ptr<communicator> comm, bool  test_mode = false )
@@ -111,6 +111,10 @@ class divergence_checker {
111111	~divergence_checker () { stop (); }
112112
113113  private: 
114+ 	std::thread m_thread;
115+ 	bool  m_is_running = false ;
116+ 	divergence_checker_detail::divergence_block_chain m_block_chain;
117+ 
114118	void  start () {
115119		m_thread = std::thread (&divergence_checker::run, this );
116120		m_is_running = true ;
@@ -129,9 +133,5 @@ class divergence_checker {
129133			std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
130134		}
131135	}
132- 
133- 	std::thread m_thread;
134- 	bool  m_is_running = false ;
135- 	divergence_block_chain m_block_chain;
136136};
137- };  //  namespace celerity::detail::divergence_checker_detail 
137+ };
0 commit comments