import boto3 from queue import Queue from threading import Event import logging import time from sorteerhoed.Signal import Signal import json class SqsListener: def __init__(self, config, eventQ: Queue, runningEvent: Event): self.isRunning = runningEvent self.eventQ = eventQ self.config = config self.logger = logging.getLogger('sorteerhoed').getChild('sqs') def start(self): # create a boto3 client sqs = boto3.client('sqs', aws_access_key_id = self.config['amazon']['user_id'], aws_secret_access_key = self.config['amazon']['user_secret'], region_name=self.config['amazon']['sqs_region_name'], endpoint_url=self.config['amazon']['sqs_endpoint_url'] ) while self.isRunning.is_set(): messages = sqs.receive_message( QueueUrl=self.config['amazon']['sqs_url'], MaxNumberOfMessages=1, WaitTimeSeconds=20) if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key for message in messages['Messages']: # 'Messages' is a list # process the messages self.logger.info(f"received: {message}") try: body = json.loads(message['Body']) # self.logger.critical(f"Try: {body['Events']}") for event in body['Events']: # self.logger.warning(f"{event}") self.eventQ.put(Signal( f"sqs.{event['EventType']}", {'event': event} )) except Exception as e: self.logger.exception(e) pass # next, we delete the message from the queue so no one else will process it again sqs.delete_message( QueueUrl=self.config['amazon']['sqs_url'], ReceiptHandle=message['ReceiptHandle'] ) else: self.logger.debug('SQS is empty') time.sleep(1) self.logger.info("Stopping SQS")