11"""
2- script for running sqs listener
3-
42Created December 21st, 2016
53@author: Yaakov Gesher
64@version: 0.9.0
119# start imports
1210# ================
1311
14- import boto3
15- import boto3 .session
1612import json
17- import time
1813import logging
1914import os
2015import sys
21- from sqs_launcher import SqsLauncher
16+ import time
2217from abc import ABCMeta , abstractmethod
2318
19+ import boto3
20+ import boto3 .session
21+
22+ from sqs_launcher import SqsLauncher
23+
2424# ================
2525# start class
2626# ================
2727
2828sqs_logger = logging .getLogger ('sqs_listener' )
2929
30+
3031class SqsListener (object ):
3132 __metaclass__ = ABCMeta
3233
@@ -47,8 +48,8 @@ def __init__(self, queue, **kwargs):
4748 else :
4849 boto3_session = None
4950 if (
50- not os .environ .get ('AWS_ACCOUNT_ID' , None ) and
51- not (boto3 .Session ().get_credentials ().method in ['iam-role' , 'assume-role' , 'assume-role-with-web-identity' ])
51+ not os .environ .get ('AWS_ACCOUNT_ID' , None ) and
52+ not (boto3 .Session ().get_credentials ().method in ['iam-role' , 'assume-role' , 'assume-role-with-web-identity' ])
5253 ):
5354 raise EnvironmentError ('Environment variable `AWS_ACCOUNT_ID` not set and no role found.' )
5455
@@ -74,7 +75,6 @@ def __init__(self, queue, **kwargs):
7475 self ._region_name = kwargs .get ('region_name' , self ._session .region_name )
7576 self ._client = self ._initialize_client ()
7677
77-
7878 def _initialize_client (self ):
7979 # new session for each instantiation
8080 ssl = True
@@ -83,31 +83,30 @@ def _initialize_client(self):
8383
8484 sqs = self ._session .client ('sqs' , region_name = self ._region_name , endpoint_url = self ._endpoint_name , use_ssl = ssl )
8585 queues = sqs .list_queues (QueueNamePrefix = self ._queue_name )
86- mainQueueExists = False
87- errorQueueExists = False
86+ main_queue_exists = False
87+ error_queue_exists = False
8888 if 'QueueUrls' in queues :
8989 for q in queues ['QueueUrls' ]:
9090 qname = q .split ('/' )[- 1 ]
9191 if qname == self ._queue_name :
92- mainQueueExists = True
92+ main_queue_exists = True
9393 if self ._error_queue_name and qname == self ._error_queue_name :
94- errorQueueExists = True
95-
94+ error_queue_exists = True
9695
9796 # create queue if necessary.
9897 # creation is idempotent, no harm in calling on a queue if it already exists.
9998 if self ._queue_url is None :
100- if not mainQueueExists :
99+ if not main_queue_exists :
101100 sqs_logger .warning ("main queue not found, creating now" )
102101
103102 # is this a fifo queue?
104103 if self ._queue_name .endswith (".fifo" ):
105- fifoQueue = "true"
104+ fifo_queue = "true"
106105 q = sqs .create_queue (
107106 QueueName = self ._queue_name ,
108107 Attributes = {
109108 'VisibilityTimeout' : self ._queue_visibility_timeout , # 10 minutes
110- 'FifoQueue' :fifoQueue
109+ 'FifoQueue' : fifo_queue
111110 }
112111 )
113112 else :
@@ -120,7 +119,7 @@ def _initialize_client(self):
120119 )
121120 self ._queue_url = q ['QueueUrl' ]
122121
123- if self ._error_queue_name and not errorQueueExists :
122+ if self ._error_queue_name and not error_queue_exists :
124123 sqs_logger .warning ("error queue not found, creating now" )
125124 q = sqs .create_queue (
126125 QueueName = self ._error_queue_name ,
@@ -162,9 +161,8 @@ def _start_listening(self):
162161
163162 try :
164163 deserialized = self ._deserializer (m_body )
165- except Exception as e :
166- sqs_logger .error ("Unable to parse message" )
167- sqs_logger .exception (e )
164+ except :
165+ sqs_logger .exception ("Unable to parse message" )
168166 continue
169167
170168 if 'MessageAttributes' in m :
@@ -185,12 +183,11 @@ def _start_listening(self):
185183 ReceiptHandle = receipt_handle
186184 )
187185 except Exception as ex :
188- # need exception logtype to log stack trace
189186 sqs_logger .exception (ex )
190187 if self ._error_queue_name :
191188 exc_type , exc_obj , exc_tb = sys .exc_info ()
192189
193- sqs_logger .info ( "Pushing exception to error queue" )
190+ sqs_logger .info ("Pushing exception to error queue" )
194191 error_launcher = SqsLauncher (queue = self ._error_queue_name , create_queue = True )
195192 error_launcher .launch_message (
196193 {
@@ -203,9 +200,9 @@ def _start_listening(self):
203200 time .sleep (self ._poll_interval )
204201
205202 def listen (self ):
206- sqs_logger .info ( "Listening to queue " + self ._queue_name )
203+ sqs_logger .info ("Listening to queue " + self ._queue_name )
207204 if self ._error_queue_name :
208- sqs_logger .info ( "Using error queue " + self ._error_queue_name )
205+ sqs_logger .info ("Using error queue " + self ._error_queue_name )
209206
210207 self ._start_listening ()
211208
0 commit comments