guest_worker/sorteerhoed/sqs.py

49 lines
2.0 KiB
Python

import boto3
from queue import Queue
from threading import Event
import logging
import time
from sorteerhoed.Signal import Signal
class SqsListener:
def __init__(self, config, eventQ: Queue, runningEvent: Event):
self.isRunning = runningEvent
self.eventQ = eventQ
self.config = config
self.logger = logging.getLogger('sorteerhoed').getChild('sqs')
def start(self):
# create a boto3 client
sqs = boto3.client('sqs',
aws_access_key_id = self.config['amazon']['user_id'],
aws_secret_access_key = self.config['amazon']['user_secret'],
region_name=self.config['amazon']['sqs_region_name'],
endpoint_url=self.config['amazon']['sqs_endpoint_url']
)
while self.isRunning.is_set():
messages = sqs.receive_message(
QueueUrl=self.config['amazon']['sqs_url'],
MaxNumberOfMessages=1,
WaitTimeSeconds=20)
if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key
for message in messages['Messages']: # 'Messages' is a list
# process the messages
self.debug(f"received: {message}")
try:
for event in message['Body']['Events']:
self.eventQ.put(Signal(
f"sqs.{event['EventType']}",
{'event': event}
))
except Exception:
pass
# next, we delete the message from the queue so no one else will process it again
sqs.delete_message(
QueueUrl=self.config['amazon']['sqs_url'],
ReceiptHandle=message['ReceiptHandle']
)
else:
self.logger.debug('SQS is empty')
time.sleep(1)
self.logger.info("Stopping SQS")