import logging import yaml from sorteerhoed import HITStore import os import subprocess import boto3 import threading from queue import Queue from sorteerhoed.plotter import Plotter import queue from sorteerhoed.sqs import SqsListener from sorteerhoed.webserver import Server import time from sorteerhoed.Signal import Signal import io from PIL import Image import datetime from shutil import copyfile import colorsys import tqdm class Level(object): """ Level image effect adapted from https://stackoverflow.com/a/3125421 """ def __init__(self, minv, maxv, gamma): self.minv= minv/255.0 self.maxv= maxv/255.0 self._interval= self.maxv - self.minv self._invgamma= 1.0/gamma def new_level(self, value): if value <= self.minv: return 0.0 if value >= self.maxv: return 1.0 return ((value - self.minv)/self._interval)**self._invgamma def convert_and_level(self, band_values): h, s, v= colorsys.rgb_to_hsv(*(i/255.0 for i in band_values)) new_v= self.new_level(v) return tuple(int(255*i) for i in colorsys.hsv_to_rgb(h, s, new_v)) @classmethod def level_image(cls, image, minv=0, maxv=255, gamma=1.0): """Level the brightness of image (a PIL.Image instance) All values ≤ minv will become 0 All values ≥ maxv will become 255 gamma controls the curve for all values between minv and maxv""" if image.mode != "RGB": raise ValueError("this works with RGB images only") new_image= image.copy() leveller= cls(minv, maxv, gamma) levelled_data= [ leveller.convert_and_level(data) for data in image.getdata()] new_image.putdata(levelled_data) return new_image class CentralManagement(): """ Central management reads config and controls process flow The HIT Store is the archive of hits mturk thread communicates with mturk server thread is tornado communicating with the turkers and with the status interface on the installation Plotter thread reads plotter queue and sends it to there Scanner is for now a simpe imagescan command SQS: thread that listens for updates from Amazon """ def __init__(self, debug_mode): self.logger = logging.getLogger("sorteerhoed").getChild('management') self.debug = debug_mode self.currentHit = None self.lastHitTime = None self.eventQueue = Queue() self.statusPageQueue = Queue() self.isRunning = threading.Event() self.isScanning = threading.Event() self.scanLock = threading.Lock() self.notPaused = threading.Event() self.lightStatus = 0 def loadConfig(self, filename, args): self.configFile = filename self.args = args self.reloadConfig() varDb = os.path.join( # self.config['storage_dir'], 'hit_store.db' ) self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO) self.store.registerUpdateHook(self.updateLightHook) # change light based on status self.logger.debug(f"Loaded configuration: {self.config}") def reloadConfig(self): # reload config settings with open(self.configFile, 'r') as fp: self.logger.debug('Load config from {}'.format(self.configFile)) self.config = yaml.safe_load(fp) if self.args.no_plotter: self.config['dummy_plotter'] = True self.config['for_real'] = self.args.for_real # self.panopticon = Panopticon(self, self.config, self.voiceStorage) def start(self): self.isRunning.set() self.notPaused.set() try: # M-turk connection MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com' MTURK_REAL = 'https://mturk-requester.us-east-1.amazonaws.com' # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client self.mturk = boto3.client('mturk', aws_access_key_id = self.config['amazon']['user_id'], aws_secret_access_key = self.config['amazon']['user_secret'], region_name='us-east-1', endpoint_url = MTURK_REAL if self.config['for_real'] else MTURK_SANDBOX ) self.logger.info(f"Mechanical turk account balance: {self.mturk.get_account_balance()['AvailableBalance']}") if not self.config['for_real']: self.logger.info("Remove block from sandbox worker account") self.mturk.delete_worker_block(WorkerId='A1CK46PK9VEUH5', Reason='Myself on Sandbox') # clear any pending hits: pending_hits = self.mturk.list_hits(MaxResults=100) for pending_hit in pending_hits['HITs']: # print(pending_hit['HITId'], pending_hit['HITStatus']) if pending_hit['HITStatus'] == 'Assignable': self.logger.warn(f"Expire stale hit: {pending_hit['HITId']}: {pending_hit['HITStatus']}") self.mturk.update_expiration_for_hit( HITId=pending_hit['HITId'], ExpireAt=datetime.datetime.fromisoformat('2015-01-01') ) self.mturk.delete_hit(HITId=pending_hit['HITId']) staleHit = self.store.getHitByRemoteId(pending_hit['HITId']) staleHit.delete() self.store.saveHIT(staleHit) self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning) sqsThread = threading.Thread(target=self.sqs.start, name='sqs') sqsThread.start() # the plotter itself self.plotter = Plotter(self.config, self.eventQueue, self.isRunning, self.scanLock) plotterThread = threading.Thread(target=self.plotter.start, name='plotter') plotterThread.start() # webserver for turks and status self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q, self.store) serverThread = threading.Thread(target=self.server.start, name='server') serverThread.start() # event listener: dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher') dispatcherThread.start() self.eventQueue.put(Signal('start', {'ding':'test'})) while self.isRunning.is_set(): time.sleep(.5) except Exception as e: self.logger.exception(e) finally: self.logger.warning("Stopping Central Managment") self.isRunning.clear() self.server.stop() self.expireCurrentHit() def expireCurrentHit(self): if self.currentHit and not self.currentHit.isConfirmed(): if self.currentHit.hit_id: # hit pending at Amazon self.logger.warn(f"Expire hit: {self.currentHit.hit_id}") self.mturk.update_expiration_for_hit( HITId=self.currentHit.hit_id, ExpireAt=datetime.datetime.fromisoformat('2015-01-01') ) try: self.mturk.delete_hit(HITId=self.currentHit.hit_id) except Exception as e: self.logger.exception(e) if not self.currentHit.isSubmitted(): self.currentHit.delete() self.store.saveHIT(self.currentHit) def eventListener(self): while self.isRunning.is_set(): try: signal = self.eventQueue.get(True, 1) except queue.Empty: pass # self.logger.log(5, "Empty queue.") else: try: """ Possible events: - SQS events: accept/abandoned/returned/submitted - webserver events: open, draw, submit - scan complete - HIT created - Plotter complete - """ self.logger.info(f"SIGNAL: {signal}") if signal.name == 'start': self.makeHit() self.lastHitTime = datetime.datetime.now() elif signal.name == 'hit.scan': # start a scan if signal.params['id'] != self.currentHit.id: self.logger.info(f"Hit.scan had wrong id: {signal}") continue # self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning')) elif signal.name == 'hit.scanned': # TODO: wrap up hit & make new HIT if signal.params['hit_id'] != self.currentHit.id: self.logger.info(f"Hit.scanned had wrong id: {signal}") continue self.currentHit.scanned_at = datetime.datetime.utcnow() self.store.saveHIT(self.currentHit) time_diff = datetime.datetime.now() - self.lastHitTime to_wait = 10 - time_diff.total_seconds() # self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan')) if to_wait > 0: self.logger.warn(f"Sleep until next hit: {to_wait}s") time.sleep(to_wait) else: self.logger.info(f"No need to wait: {to_wait}s") self.makeHit() self.lastHitTime = datetime.datetime.now() elif signal.name == 'hit.creating': # self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit')) pass elif signal.name == 'hit.created': # self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit')) pass elif signal.name == 'scan.start': pass elif signal.name == 'scan.finished': # probably see hit.scanned pass elif signal.name == 'hit.assignment': # Create new assignment if signal.params['hit_id'] != self.currentHit.id: continue assignment = self.currentHit.getAssignmentById(signal.params['assignment_id']) elif signal.name == 'assignment.info': assignment = self.currentHit.getAssignmentById(signal.params['assignment_id']) if not assignment: self.logger.warning(f"assignment.info assignment.id not for current hit assignments: {signal}") change = False for name, value in signal.params.items(): if name == 'ip': assignment.turk_ip = value if name == 'location': assignment.turk_country = value if name == 'os': assignment.turk_os = value if name == 'browser': assignment.turk_browser = value change = True self.logger.debug(f'Set assignment: {name} to {value}') if change: self.store.saveAssignment(assignment) elif signal.name == 'server.open': self.currentHit.open_page_at = datetime.datetime.utcnow() self.store.saveHIT(self.currentHit) self.setLight(True) elif signal.name == 'server.close': if not signal.params['abandoned']: continue a = self.currentHit.getLastAssignment() if a.assignment_id != signal.params['assignment_id']: self.logger.info(f"Close of older assignment_id: {signal}") continue self.logger.critical(f"Websocket closed of active assignment_id: {signal}") a.abandoned_at = datetime.datetime.utcnow() self.store.saveAssignment(a) self.plotter.park() self.setLight(False) elif signal.name == 'assignment.submit': a = self.currentHit.getLastAssignment() if a.assignment_id != signal.params['assignment_id']: self.logger.critical(f"Submit of invalid assignment_id: {signal}") a.submit_page_at = datetime.datetime.utcnow() self.store.saveAssignment(a) self.plotter.park() # park always triggers a plotter.finished after being processed elif signal.name[:4] == 'sqs.': if signal.params['event']['HITId'] != self.currentHit.hit_id: self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT") sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId']) updateStatus = False else: sqsHit = self.currentHit updateStatus = True assId = signal.params['event']['AssignmentId'] + '_' + signal.params['event']['WorkerId'] sqsAssignment = sqsHit.getAssignmentById(assId) if not sqsAssignment: self.logger.critical(f"Invalid assignmentId given for hit: {signal.params['event']}") continue if signal.name == 'sqs.AssignmentAccepted': self.logger.info(f'Set status progress to accepted') sqsAssignment.accept_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") sqsAssignment.worker_id = signal.params['event']['WorkerId'] # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} elif signal.name == 'sqs.AssignmentAbandoned': self.logger.info(f'Set status progress to abandoned') #{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} sqsAssignment.abandoned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") # if updateStatus: # self.setLight(False) self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Accepted task without working on it.') elif signal.name == 'sqs.AssignmentReturned': self.logger.info(f'Set status progress to returned') sqsAssignment.returned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} elif signal.name == 'sqs.AssignmentSubmitted': # {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"surveycode<\\/QuestionIdentifier>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'} self.logger.info(f'Set status progress to submitted') sqsAssignment.answer = signal.params['event']['Answer'] if sqsAssignment.uuid not in sqsAssignment.answer: self.logger.critical(f"Not a valid answer given?! {sqsAssignment.answer}") if not sqsAssignment.submit_page_at: # page not submitted, hit is. Nevertheless, create new hit. try: self.mturk.reject_assignment(AssignmentId=signal.params['event']['AssignmentId'], RequesterFeedback='Did not do the assignment') except Exception as e: self.logger.exception(e) self.makeHit() else: sqsAssignment.confirmed_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") # block de worker na succesvolle submit, om dubbele workers te voorkomen # TODO: Disabled after worker mail, use quals instead #self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Every worker can only work once on the taks.') #self.logger.warn("Block worker after submission") self.store.saveHIT(sqsHit) elif signal.name == 'plotter.finished': # is _always_ triggered after submit due to plotter.park() if self.currentHit and self.currentHit.isSubmitted(): self.currentHit.plotted_at = datetime.datetime.utcnow() self.store.saveHIT(self.currentHit) self.logger.info("Start scan thread") scan = threading.Thread(target=self.scanImage, name='scan') scan.start() elif signal.name == 'plotter.parked': # should this have the code from plotter.finished? pass else: self.logger.critical(f"Unknown signal: {signal.name}") except Exception as e: self.logger.critical(f"Exception on handling {signal}") self.logger.exception(e) def makeHit(self): self.expireCurrentHit() # expire hit if it is there self.eventQueue.put(Signal('hit.creating', {'id': self.currentHit.id if self.currentHit else 'start'})) self.reloadConfig() # reload new config values if they are set # self.notPaused.wait() self.currentHit = self.store.createHIT() self.store.currentHit = self.currentHit self.logger.info(f"Make HIT {self.currentHit.id}") question = ''' https://guest.rubenvandeven.com:8888/draw?id={HIT_NR} 0 '''.replace("{HIT_NR}",str(self.currentHit.id)) estimatedHitDuration = self.store.getEstimatedHitDuration() # set minimum rate, if they take longer we increase the pay fee = max(self.config['minimum_fee'], (self.config['hour_rate_aim']/3600.) * estimatedHitDuration) self.currentHit.fee = fee self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}") new_hit = self.mturk.create_hit( Title = 'Trace the drawn line', Description = 'Draw a line over the sketched line in the image', Keywords = 'polygons, trace, draw', Reward = "{:.2f}".format(fee), MaxAssignments = 1, LifetimeInSeconds = self.config['hit_lifetime'], AssignmentDurationInSeconds = self.store.getHitTimeout(), AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'], Question = question, ) self.logger.info(f"Created hit: {new_hit}") if not self.config['for_real']: self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId']) else: self.logger.info("https://worker.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId']) self.currentHit.hit_id = new_hit['HIT']['HITId'] self.store.saveHIT(self.currentHit) # self.server.statusPage.set('hit_id', new_hit['HIT']['HITId']) # self.server.statusPage.set('hit_created', self.currentHit.created_at) # self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}") # self.server.statusPage.set('state', self.currentHit.getStatus()) self.eventQueue.put(Signal('hit.created', {'id': self.currentHit.id, 'remote_id': self.currentHit.hit_id})) # mturk.send_test_event_notification() if self.config['amazon']['sqs_url']: notification_info= self.mturk.update_notification_settings( HITTypeId=new_hit['HIT']['HITTypeId'], Notification = { 'Destination' : self.config['amazon']['sqs_url'], 'Transport': 'SQS', 'Version': '2014-08-15', 'EventTypes': [ 'AssignmentAccepted', 'AssignmentAbandoned', 'AssignmentReturned', 'AssignmentSubmitted', # 'AssignmentRejected', # 'AssignmentApproved', # 'HITCreated', # 'HITExpired', # 'HITReviewable', # 'HITExtended', # 'HITDisposed', # 'Ping', ] }, Active=True ) self.logger.debug(notification_info) def cleanDrawing(self): with self.scanLock: self.eventQueue.put(Signal('scan.start')) # Scan to reset cmd = [ 'sudo', 'scanimage', '-d', 'epkowa','--resolution=100', '-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards ,'-t','22', # x axis, margin from left side scanner (seen from the outside) '-x',str(181), '-y',str(245) ] proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) # opens connection to scanner, but only starts scanning when output becomes ready: _, e = proc.communicate(80) time.sleep(5) # sleep a few seconds for scanner to return to start position if e: self.logger.critical(f"Scanner caused: {e.decode()}") self.eventQueue.put(Signal('system.reset')) self.eventQueue.put(Signal('scan.finished')) def reset(self) -> str: self.plotter.park() # Very confusing to have scanning on a reset (because often nothing has happened), so don't do itd # scan = threading.Thread(target=self.cleanDrawing, name='reset') # scan.start() self.server.statusPage.clearAssignment() def scanImage(self) -> str: """ Run scanimage on scaner and returns a string with the filename """ with self.scanLock: if self.config['dummy_plotter']: self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id})) self.eventQueue.put(Signal('scan.start')) self.logger.warning("Fake scanner for a few seconds") for i in tqdm.tqdm(range(5)): time.sleep(1) self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) self.eventQueue.put(Signal('scan.finished')) return cmd = [ 'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff', '--resolution=100', # lower res, faster (more powerful) scan & wipe '-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards ,'-t','22', # x axis, margin from left side scanner (seen from the outside) '-x',str(181), '-y',str(245) ] self.logger.info(f"{cmd}") filename = self.currentHit.getImagePath() self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id})) self.eventQueue.put(Signal('scan.start')) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # opens connection to scanner, but only starts scanning when output becomes ready: o, e = proc.communicate(80) if e: self.logger.critical(f"Scanner caused: {e.decode()}") # Should this clear self.isRunning.clear() ? try: f = io.BytesIO(o) img = Image.open(f) img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM) tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma']) tunedImg.save(filename) except Exception as e: self.logger.critical("Cannot create image from scan. Did scanner work?") self.logger.exception(e) copyfile('www/basic.svg', filename) time.sleep(5) # sleep a few seconds for scanner to return to start position self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) self.eventQueue.put(Signal('scan.finished')) def setLight(self, on): value = 1 if on else 0 if self.lightStatus == value: return self.lightStatus = value cmd = [ 'usbrelay', f'HURTM_1={value}' ] self.logger.info(f"Trigger light {cmd}") code = subprocess.call(cmd) if code > 0: self.logger.warning(f"Error on light change: {code}") def updateLightHook(self, hit = None): # ignore hit attribute, which comes from the HITstore self.setLight(self.getLightStatus()) def getLightStatus(self) -> bool: if not self.currentHit: return False a = self.currentHit.getLastAssignment() if not a: return False if self.currentHit.plotted_at: # wait till plotter is done return False return True