54 lines
No EOL
2.2 KiB
Python
54 lines
No EOL
2.2 KiB
Python
import boto3
|
|
from queue import Queue
|
|
from threading import Event
|
|
import logging
|
|
import time
|
|
from sorteerhoed.Signal import Signal
|
|
import json
|
|
|
|
class SqsListener:
|
|
def __init__(self, config, eventQ: Queue, runningEvent: Event):
|
|
self.isRunning = runningEvent
|
|
self.eventQ = eventQ
|
|
self.config = config
|
|
self.logger = logging.getLogger('sorteerhoed').getChild('sqs')
|
|
|
|
def start(self):
|
|
# create a boto3 client
|
|
sqs = boto3.client('sqs',
|
|
aws_access_key_id = self.config['amazon']['user_id'],
|
|
aws_secret_access_key = self.config['amazon']['user_secret'],
|
|
region_name=self.config['amazon']['sqs_region_name'],
|
|
endpoint_url=self.config['amazon']['sqs_endpoint_url']
|
|
)
|
|
|
|
while self.isRunning.is_set():
|
|
messages = sqs.receive_message(
|
|
QueueUrl=self.config['amazon']['sqs_url'],
|
|
MaxNumberOfMessages=1,
|
|
WaitTimeSeconds=20)
|
|
if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key
|
|
for message in messages['Messages']: # 'Messages' is a list
|
|
# process the messages
|
|
self.logger.info(f"received: {message}")
|
|
try:
|
|
body = json.loads(message['Body'])
|
|
# self.logger.critical(f"Try: {body['Events']}")
|
|
for event in body['Events']:
|
|
# self.logger.warning(f"{event}")
|
|
self.eventQ.put(Signal(
|
|
f"sqs.{event['EventType']}",
|
|
{'event': event}
|
|
))
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
pass
|
|
# next, we delete the message from the queue so no one else will process it again
|
|
sqs.delete_message(
|
|
QueueUrl=self.config['amazon']['sqs_url'],
|
|
ReceiptHandle=message['ReceiptHandle']
|
|
)
|
|
else:
|
|
self.logger.debug('SQS is empty')
|
|
time.sleep(1)
|
|
self.logger.info("Stopping SQS") |