import logging import yaml from sorteerhoed import HITStore import os import subprocess import boto3 import threading from queue import Queue from sorteerhoed.plotter import Plotter import queue from sorteerhoed.sqs import SqsListener from sorteerhoed.webserver import Server import time from sorteerhoed.Signal import Signal import io from PIL import Image import datetime from shutil import copyfile class CentralManagement(): """ Central management reads config and controls process flow The HIT Store is the archive of hits mturk thread communicates with mturk server thread is tornado communicating with the turkers and with the status interface on the installation Plotter thread reads plotter queue and sends it to there Scanner is for now a simpe imagescan command SQS: thread that listens for updates from Amazon """ def __init__(self, debug_mode): self.logger = logging.getLogger("sorteerhoed").getChild('management') self.debug = debug_mode self.currentHit = None self.eventQueue = Queue() self.isRunning = threading.Event() self.isScanning = threading.Event() self.scanLock = threading.Lock() def loadConfig(self, filename): with open(filename, 'r') as fp: self.logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) varDb = os.path.join( # self.config['storage_dir'], 'hit_store.db' ) self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO) self.logger.debug(f"Loaded configuration: {self.config}") # self.amazon = # self.server # self.panopticon = Panopticon(self, self.config, self.voiceStorage) def start(self): self.isRunning.set() try: # M-turk connection MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com' # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client self.mturk = boto3.client('mturk', aws_access_key_id = self.config['amazon']['user_id'], aws_secret_access_key = self.config['amazon']['user_secret'], region_name='us-east-1', endpoint_url = MTURK_SANDBOX ) self.logger.info(f"Mechanical turk: {self.mturk.get_account_balance()}") self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning) sqsThread = threading.Thread(target=self.sqs.start, name='sqs') sqsThread.start() # the plotter itself self.plotter = Plotter(self.config, self.eventQueue, self.isRunning, self.scanLock) plotterThread = threading.Thread(target=self.plotter.start, name='plotter') plotterThread.start() # webserver for turks and status self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q, self.store) serverThread = threading.Thread(target=self.server.start, name='server') serverThread.start() # event listener: dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher') dispatcherThread.start() # # self.eventQueue.put(Signal('start', {'ding':'test'})) while self.isRunning.is_set(): time.sleep(.5) finally: self.logger.warning("Stopping Central Managment") self.isRunning.clear() self.server.stop() def eventListener(self): while self.isRunning.is_set(): try: signal = self.eventQueue.get(True, 1) except queue.Empty: pass # self.logger.log(5, "Empty queue.") else: """ Possible events: - SQS events: accept/abandoned/returned/submitted - webserver events: open, draw, submit - scan complete - HIT created - Plotter complete - """ #TODO: make level debug() self.logger.warn(f"SIGNAL: {signal}") if signal.name == 'start': self.makeHit() pass elif signal.name == 'hit.scanned': # TODO: wrap up hit & make new HIT self.currentHit.scanned_at = datetime.datetime.utcnow() self.server.statusPage.set('state', self.currentHit.getStatus()) self.makeHit() elif signal.name == 'scan.start': pass elif signal.name == 'scan.finished': pass elif signal.name == 'hit.info': if signal.params['hit_id'] != self.currentHit.id: self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}") continue for name, value in signal.params.items(): if name == 'hit_id': continue if name == 'ip': self.currentHit.turk_ip = value if name == 'location': self.currentHit.turk_country = value self.logger.debug(f'Set status: {name} to {value}') self.server.statusPage.set(name, value) elif signal.name == 'server.open': self.currentHit.open_page_at = datetime.datetime.utcnow() self.store.saveHIT(self.currentHit) self.setLight(True) self.server.statusPage.set('state', self.currentHit.getStatus()) self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) elif signal.name == 'server.submit': self.currentHit.submit_page_at = datetime.datetime.utcnow() self.store.saveHIT(self.currentHit) self.plotter.park() self.server.statusPage.set('state', self.currentHit.getStatus()) self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) # park always triggers a plotter.finished after being processed elif signal.name[:4] == 'sqs.': if signal.params['event']['HITId'] != self.currentHit.id: self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT") sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId']) updateStatus = False else: sqsHit = self.currentHit updateStatus = True if signal.name == 'sqs.AssignmentAccepted': self.logger.info(f'Set status progress to accepted') sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") sqsHit.worker_id = signal.params['event']['WorkerId'] if updateStatus: self.server.statusPage.set('worker_id', sqsHit.worker_id) # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} elif signal.name == 'sqs.AssignmentAbandoned': self.logger.info(f'Set status progress to abandoned') #{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} sqsHit.accept_time = None sqsHit.open_page_at = None if self.currentHit.id == sqsHit.id: if not sqsHit.submit_page_at: self.reset() else: sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit if updateStatus: self.setLight(False) elif signal.name == 'sqs.AssignmentReturned': self.logger.info(f'Set status progress to returned') sqsHit.accept_time = None sqsHit.open_page_at = None if self.currentHit.id == sqsHit.id: if not sqsHit.submit_page_at: self.reset() else: sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit if updateStatus: self.setLight(False) # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} elif signal.name == 'sqs.AssignmentSubmitted': # {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"surveycode<\\/QuestionIdentifier>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'} self.logger.info(f'Set status progress to submitted') # TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid sqsHit.answer = signal.params['event']['Answer'] if sqsHit.uuid not in sqsHit.answer: self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}") sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") self.store.saveHIT(sqsHit) if updateStatus: # TODO: have HITStore/HIT take care of this by emitting a signal # only update status if it is the currentHit self.server.statusPage.set('state', sqsHit.getStatus()) elif signal.name == 'plotter.finished': if self.currentHit.submit_page_at: self.setLight(False) scan = threading.Thread(target=self.scanImage, name='scan') scan.start() self.server.statusPage.set('hit_submitted', self.currentHit.submit_page_at) else: self.logger.critical(f"Unknown signal: {signal.name}") def makeHit(self): self.server.statusPage.reset() self.currentHit = self.store.createHIT() self.store.currentHit = self.currentHit self.logger.info(f"Make HIT {self.currentHit.id}") question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id)) estimatedHitDuration = self.store.getAvgDurationOfPreviousNHits(5) fee = (self.config['hour_rate_aim']/3600.) * estimatedHitDuration self.currentHit.fee = fee self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}") new_hit = self.mturk.create_hit( Title = 'Trace the drawn line', Description = 'Draw a line over the sketched line in the image', Keywords = 'polygons, trace, draw', Reward = "{:.2f}".format(fee), MaxAssignments = 1, LifetimeInSeconds = self.config['hit_lifetime'], AssignmentDurationInSeconds = self.config['hit_assignment_duration'], AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'], Question = question, ) self.logger.info(f"Created hit: {new_hit}") self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId']) self.currentHit.hit_id = new_hit['HIT']['HITId'] self.store.saveHIT(self.currentHit) # TODO: have HITStore/HIT take care of this by emitting a signal self.server.statusPage.set('hit_id', new_hit['HIT']['HITId']) self.server.statusPage.set('hit_created', self.currentHit.created_at) # mturk.send_test_event_notification() if self.config['amazon']['sqs_url']: notification_info= self.mturk.update_notification_settings( HITTypeId=new_hit['HIT']['HITTypeId'], Notification = { 'Destination' : self.config['amazon']['sqs_url'], 'Transport': 'SQS', 'Version': '2014-08-15', 'EventTypes': [ 'AssignmentAccepted', 'AssignmentAbandoned', 'AssignmentReturned', 'AssignmentSubmitted', # 'AssignmentRejected', # 'AssignmentApproved', # 'HITCreated', # 'HITExpired', # 'HITReviewable', # 'HITExtended', # 'HITDisposed', # 'Ping', ] }, Active=True ) self.logger.debug(notification_info) def cleanDrawing(self): with self.scanLock: self.eventQueue.put(Signal('scan.start')) # Scan to reset cmd = [ 'sudo', 'scanimage', '-d', 'epkowa' ] proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) # opens connection to scanner, but only starts scanning when output becomes ready: _, e = proc.communicate(80) if e: self.logger.critical(f"Scanner caused: {e.decode()}") self.eventQueue.put(Signal('system.reset')) self.eventQueue.put(Signal('scan.finished')) def reset(self) -> str: # TODO: for returns & abandons scan = threading.Thread(target=self.cleanDrawing, name='reset') scan.start() def scanImage(self) -> str: """ Run scanimage on scaner and returns a string with the filename """ cmd = [ 'sudo', 'scanimage', '-d', 'epkowa', '--format', 'jpeg', '--resolution=100', '-l','20','-t','30','-x',str(185), '-y',str(245) ] self.logger.info(f"{cmd}") filename = self.currentHit.getImagePath() with self.scanLock: self.eventQueue.put(Signal('scan.start')) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # opens connection to scanner, but only starts scanning when output becomes ready: o, e = proc.communicate(80) if e: self.logger.critical(f"Scanner caused: {e.decode()}") #TODO: should clear self.isRunning.clear() ? try: f = io.BytesIO(o) img = Image.open(f) img = img.transpose(Image.ROTATE_90) img.save(filename) except Exception as e: self.logger.critical("Cannot create image from scan. Did scanner work?") self.logger.exception(e) # TODO: create copyfile('www/basic.svg', filename) self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) self.eventQueue.put(Signal('scan.finished')) def setLight(self, on): value = 1 if on else 0 cmd = [ 'usbrelay', f'HURTM_1={value}' ] self.logger.info(f"Trigger light {cmd}") code = subprocess.call(cmd) if code > 0: self.logger.warning(f"Error on light change: {code}")