guest_worker/sorteerhoed/sqs.py

54 lines
2.2 KiB
Python
Raw Normal View History

2019-10-23 10:56:28 +02:00
import boto3
from queue import Queue
from threading import Event
import logging
import time
from sorteerhoed.Signal import Signal
2019-10-23 22:33:37 +02:00
import json
2019-10-23 10:56:28 +02:00
class SqsListener:
def __init__(self, config, eventQ: Queue, runningEvent: Event):
self.isRunning = runningEvent
self.eventQ = eventQ
self.config = config
self.logger = logging.getLogger('sorteerhoed').getChild('sqs')
def start(self):
# create a boto3 client
sqs = boto3.client('sqs',
aws_access_key_id = self.config['amazon']['user_id'],
aws_secret_access_key = self.config['amazon']['user_secret'],
region_name=self.config['amazon']['sqs_region_name'],
endpoint_url=self.config['amazon']['sqs_endpoint_url']
)
while self.isRunning.is_set():
messages = sqs.receive_message(
QueueUrl=self.config['amazon']['sqs_url'],
MaxNumberOfMessages=1,
WaitTimeSeconds=20)
if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key
for message in messages['Messages']: # 'Messages' is a list
# process the messages
2019-10-23 22:33:37 +02:00
self.logger.info(f"received: {message}")
2019-10-23 10:56:28 +02:00
try:
2019-10-23 22:33:37 +02:00
body = json.loads(message['Body'])
# self.logger.critical(f"Try: {body['Events']}")
for event in body['Events']:
# self.logger.warning(f"{event}")
2019-10-23 10:56:28 +02:00
self.eventQ.put(Signal(
f"sqs.{event['EventType']}",
{'event': event}
))
2019-10-23 22:33:37 +02:00
except Exception as e:
self.logger.exception(e)
2019-10-23 10:56:28 +02:00
pass
# next, we delete the message from the queue so no one else will process it again
sqs.delete_message(
QueueUrl=self.config['amazon']['sqs_url'],
ReceiptHandle=message['ReceiptHandle']
)
else:
self.logger.debug('SQS is empty')
time.sleep(1)
self.logger.info("Stopping SQS")