@@ -86,7 +86,7 @@ def _produce(cls, payload):
8686 rmq_settings = cls ._get_common_settings ()
8787 exchange = rmq_settings [- 1 ]
8888 # Decided not to create context-manager to stay within the class
89- _ , channel = cls ._get_producer_rmq_objects (* rmq_settings )
89+ _ , channel = cls ._get_producer_rmq_objects (* rmq_settings , signal_type = payload . signal_type )
9090
9191 cls ._produce_message (channel , exchange , payload )
9292 cls .log_produced (payload )
@@ -183,23 +183,36 @@ def _get_consumer_rmq_objects(cls, host, port, creds, exchange, queue_name, pref
183183 return connection , channel
184184
185185 @classmethod
186- def _get_producer_rmq_objects (cls , host , port , creds , exchange ):
187- if cls ._producer_connection is None :
188- connection = BlockingConnection (
189- ConnectionParameters (
190- host = host ,
191- port = port ,
192- credentials = creds ,
193- blocked_connection_timeout = 10 ,
194- ),
195- )
196- channel = connection .channel ()
197- cls ._declare_exchange (channel , exchange )
186+ def _get_producer_rmq_objects (cls , host , port , creds , exchange , signal_type = None ):
187+ """
188+ Use shared connection in case of sync mode, otherwise create new connection for each
189+ message
190+ """
191+ if signal_type == SignalType .SYNC :
192+ if cls ._producer_connection is None :
193+ connection , channel = cls ._create_connection (host , port , creds , exchange )
194+
195+ cls ._producer_connection = connection
196+ cls ._producer_channel = channel
197+
198+ return cls ._producer_connection , cls ._producer_channel
199+ else :
200+ return cls ._create_connection (host , port , creds , exchange )
198201
199- cls ._producer_connection = connection
200- cls ._producer_channel = channel
202+ @classmethod
203+ def _create_connection (cls , host , port , creds , exchange ):
204+ connection = BlockingConnection (
205+ ConnectionParameters (
206+ host = host ,
207+ port = port ,
208+ credentials = creds ,
209+ blocked_connection_timeout = 10 ,
210+ ),
211+ )
212+ channel = connection .channel ()
213+ cls ._declare_exchange (channel , exchange )
201214
202- return cls . _producer_connection , cls . _producer_channel
215+ return connection , channel
203216
204217 @staticmethod
205218 def _declare_exchange (channel , exchange ):
0 commit comments