1- import threading
21import time
2+ import logging
33
44from honeybadger import honeybadger
55from honeybadger .plugins import Plugin , default_plugin_manager
66from honeybadger .utils import filter_dict , extract_honeybadger_config , get_duration
77
8- _listener_started = False
8+ logger = logging . getLogger ( __name__ )
99
1010
1111class CeleryPlugin (Plugin ):
@@ -67,7 +67,13 @@ def init_app(self):
6767 """
6868 Initialize honeybadger and listen for errors.
6969 """
70- from celery .signals import task_failure , task_postrun , task_prerun , worker_ready
70+ from celery .signals import (
71+ task_failure ,
72+ task_postrun ,
73+ task_prerun ,
74+ before_task_publish ,
75+ worker_process_init ,
76+ )
7177
7278 self ._task_starts = {}
7379 self ._initialize_honeybadger (self .app .conf )
@@ -79,9 +85,9 @@ def init_app(self):
7985 if honeybadger .config .insights_enabled :
8086 # Enable task events, as we need to listen to
8187 # task-finished events
82- self .app . conf . worker_send_task_events = True
88+ worker_process_init . connect ( self ._on_worker_process_init , weak = False )
8389 task_prerun .connect (self ._on_task_prerun , weak = False )
84- worker_ready .connect (self ._start_task_event_listener , weak = False )
90+ before_task_publish .connect (self ._on_before_task_publish , weak = False )
8591
8692 def _initialize_honeybadger (self , config ):
8793 """
@@ -96,35 +102,34 @@ def _initialize_honeybadger(self, config):
96102 honeybadger .configure (** config_kwargs )
97103 honeybadger .config .set_12factor_config () # environment should override celery settings
98104
99- def _start_task_event_listener (self , * args , ** kwargs ):
100- # only start the listener once
101- global _listener_started
102- if _listener_started :
103- return
104- _listener_started = True
105-
106- from celery .events import EventReceiver # type: ignore[import]
107-
108- def run ():
109- with self .app .connection () as conn :
110- recv = EventReceiver (
111- conn , handlers = {"task-finished" : self ._on_task_finished }
112- )
113- recv .capture (limit = None , timeout = None , wakeup = False )
114-
115- self ._listen_thread = threading .Thread (target = run , daemon = True )
116- self ._listen_thread .start ()
117-
118- def _on_task_finished (self , payload , ** kwargs ):
119- honeybadger .event ("celery.task_finished" , payload ["payload" ])
105+ def _on_worker_process_init (self , * args , ** kwargs ):
106+ # Restart the events worker to ensure it is running in the new worker
107+ # process.
108+ try :
109+ honeybadger .events_worker .restart ()
110+ except Exception as e :
111+ logger .warning (f"Warning: Failed to restart Honeybadger events worker: { e } " )
112+
113+ def _on_before_task_publish (self , sender = None , body = None , headers = None , ** kwargs ):
114+ # Inject Honeybadger event context into task headers
115+ if headers is not None :
116+ current_context = honeybadger ._get_event_context ()
117+ if current_context :
118+ headers ["honeybadger_event_context" ] = current_context
120119
121120 def _on_task_prerun (self , task_id = None , task = None , * args , ** kwargs ):
122121 self ._task_starts [task_id ] = time .time ()
123122
123+ if task :
124+ context = getattr (task .request , "honeybadger_event_context" , None )
125+ if context :
126+ honeybadger .set_event_context (context )
127+
124128 def _on_task_postrun (self , task_id = None , task = None , * args , ** kwargs ):
125129 """
126130 Callback executed after a task is finished.
127131 """
132+
128133 insights_config = honeybadger .config .insights_config
129134
130135 exclude = insights_config .celery .exclude_tasks
@@ -159,7 +164,7 @@ def _on_task_postrun(self, task_id=None, task=None, *args, **kwargs):
159164 remove_keys = True ,
160165 )
161166
162- task . send_event ( "task-finished " , payload = payload )
167+ honeybadger . event ( "celery.task_finished " , payload = payload )
163168
164169 honeybadger .reset_context ()
165170
@@ -180,13 +185,15 @@ def tearDown(self):
180185 task_failure .disconnect (self ._on_task_failure )
181186
182187 if honeybadger .config .insights_enabled :
183- from celery .signals import worker_ready , task_prerun
188+ from celery .signals import (
189+ task_prerun ,
190+ worker_process_init ,
191+ before_task_publish ,
192+ )
184193
185194 task_prerun .disconnect (self ._on_task_prerun )
186- worker_ready .disconnect (self ._start_task_event_listener )
187-
188- if hasattr (self , "_listen_thread" ):
189- self ._listen_thread .join (timeout = 1 )
195+ worker_process_init .disconnect (self ._on_worker_process_init , weak = False )
196+ before_task_publish .disconnect (self ._on_before_task_publish , weak = False )
190197
191198 # Keep the misspelled method for backward compatibility
192199 def tearDowm (self ):
0 commit comments