From 4d3eed56e968092e8c14d9c66559e21499d7163f Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Wed, 30 Oct 2019 12:44:25 +0100 Subject: [PATCH] Further implementation of flow --- sorteerhoed/HITStore.py | 31 +++++++- sorteerhoed/central_management.py | 114 ++++++++++++++++++++++++------ sorteerhoed/plotter.py | 1 + sorteerhoed/webserver.py | 13 +++- 4 files changed, 130 insertions(+), 29 deletions(-) diff --git a/sorteerhoed/HITStore.py b/sorteerhoed/HITStore.py index 639ac01..5ed18be 100644 --- a/sorteerhoed/HITStore.py +++ b/sorteerhoed/HITStore.py @@ -40,8 +40,8 @@ class HIT(Base): __tablename__ = 'hits' id = Column(Integer, Sequence('hit_id'), primary_key=True) # our sequential hit id hit_id = Column(String(255)) # amazon's hit id - created_at = Column(DateTime, default=datetime.datetime.now()) - updated_at = Column(DateTime, default=datetime.datetime.now()) + created_at = Column(DateTime, default=datetime.datetime.utcnow) + updated_at = Column(DateTime, default=datetime.datetime.utcnow) uuid = Column(String(32), default=lambda : uuid.uuid4().hex) assignment_id = Column(String(255), default = None) worker_id = Column(String(255), default = None) @@ -54,6 +54,7 @@ class HIT(Base): turk_country = Column(String(255), default=None) turk_screen_width = Column(Integer, default = None) turk_screen_height = Column(Integer, default = None) + scanned_at = Column(DateTime, default=None) def getImagePath(self): @@ -62,6 +63,20 @@ class HIT(Base): def getImageUrl(self): return f"scans/{self.id}.png" + def getStatus(self): + if self.scanned_at: + return "completed" + if self.submit_hit_at: + return "submission confirmed" + if self.submit_page_at: + return "submitted by worker" + if self.open_page_at: + return "started working" + if self.accept_time: + return "accepted by worker" + return "created" + + class Store: def __init__(self, db_filename, logLevel=0): @@ -122,7 +137,17 @@ class Store: s.refresh(hit) logger.info(f"Added {hit.id}") - + def getAvgDurationOfPreviousNHits(self, n) -> int: + latest_hits = self.session.query(HIT).\ + filter(HIT.submit_hit_at!=None).\ + filter(HIT.accept_time!=None).\ + order_by(HIT.submit_hit_at.desc()).limit(n) + durations = [] + for hit in latest_hits: + durations.append((hit.submit_hit_at - hit.accept_time).total_seconds()) + if not len(durations): + return int(2.5*60) + return int(sum(durations) / len(durations)) # def rmSource(self, id: int): # with self.getSession() as session: diff --git a/sorteerhoed/central_management.py b/sorteerhoed/central_management.py index 092cc87..d37e4cd 100644 --- a/sorteerhoed/central_management.py +++ b/sorteerhoed/central_management.py @@ -35,6 +35,7 @@ class CentralManagement(): self.eventQueue = Queue() self.isRunning = threading.Event() + self.isScanning = threading.Event() def loadConfig(self, filename): @@ -125,53 +126,94 @@ class CentralManagement(): pass elif signal.name == 'hit.scanned': # TODO: wrap up hit & make new HIT - pass + self.makeHit() + elif signal.name == 'scan.start': + self.isScanning.set() + elif signal.name == 'scan.finished': + self.isScanning.clear() elif signal.name == 'hit.info': if signal.params['hit_id'] != self.currentHit.id: self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}") continue - for name, value in enumerate(signal.params): + for name, value in signal.params.items(): + if name == 'hit_id': + continue self.logger.debug(f'Set status: {name} to {value}') self.server.statusPage.set(name, value) + elif signal.name == 'server.open': + self.currentHit.open_page_at = datetime.datetime.utcnow() + self.store.saveHIT(self.currentHit) + # TODO: turn on light! elif signal.name == 'server.submit': - self.currentHit.submit_page_at = datetime.datetime.now() + self.currentHit.submit_page_at = datetime.datetime.utcnow() self.store.saveHIT(self.currentHit) self.plotter.park() + # TODO: turn off light! # park should alway triggers a plotter.finished after being processed - elif signal.name == 'sqs.AssignmentAccepted': - # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} - pass - elif signal.name == 'sqs.AssignmentAbandoned': - #{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} - pass - elif signal.name == 'sqs.AssignmentReturned': - # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} - pass - elif signal.name == 'sqs.AssignmentSubmitted': - pass + elif signal.name[:4] == 'sqs.': + if signal.params['event']['HITId'] != self.currentHit.id: + self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT") + sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId']) + updateStatus = False + else: + sqsHit = self.currentHit + updateStatus = True + + if signal.name == 'sqs.AssignmentAccepted': + self.logger.info(f'Set status progress to accepted') + sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") + sqsHit.worker_id = signal.params['event']['WorkerId'] + if updateStatus: + self.server.statusPage.set('worker_id', sqsHit.worker_id) + # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} + elif signal.name == 'sqs.AssignmentAbandoned': + self.logger.info(f'Set status progress to abandoned') + sqsHit.accept_time = None + sqsHit.open_page_at = None + self.reset() + #{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} + elif signal.name == 'sqs.AssignmentReturned': + self.logger.info(f'Set status progress to returned') + sqsHit.accept_time = None + sqsHit.open_page_at = None + self.reset() + # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} + elif signal.name == 'sqs.AssignmentSubmitted': + # {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"surveycode<\\/QuestionIdentifier>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'} + self.logger.info(f'Set status progress to submitted') + # TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid + sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") + + self.store.saveHIT(sqsHit) + + if updateStatus: + # TODO: have HITStore/HIT take care of this by emitting a signal + # only update status if it is the currentHit + self.server.statusPage.set('state', sqsHit.getStatus()) elif signal.name == 'plotter.finished': if self.currentHit.submit_page_at: - # TODO: scan! - pass - elif signal.name == '': - pass + scan = threading.Thread(target=self.scanImage, name='scan') + scan.start() + else: + self.logger.critical(f"Unknown signal: {signal.name}") - # handle singals/events: - # TODO: next steps - # TODO: update status - def makeHit(self): + self.server.statusPage.reset() self.currentHit = self.store.createHIT() self.logger.info(f"Make HIT {self.currentHit.id}") question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id)) + estimatedHitDuration = self.store.getAvgDurationOfPreviousNHits(5) + + fee = (self.config['hour_rate_aim']/3600.) * estimatedHitDuration + self.logger.debug(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}") new_hit = self.mturk.create_hit( Title = 'Trace the drawn line', Description = 'Draw a line over the sketched line in the image', Keywords = 'polygons, trace, draw', - Reward = '0.15', # TODO: make variable + Reward = "{:.2f}".format(fee), MaxAssignments = 1, LifetimeInSeconds = self.config['hit_lifetime'], AssignmentDurationInSeconds = self.config['hit_assignment_duration'], @@ -183,7 +225,10 @@ class CentralManagement(): self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId']) self.currentHit.hit_id = new_hit['HIT']['HITId'] + self.store.saveHIT(self.currentHit) + # TODO: have HITStore/HIT take care of this by emitting a signal + self.server.statusPage.set('hit_id', new_hit['HIT']['HITId']) # mturk.send_test_event_notification() if self.config['amazon']['sqs_url']: @@ -212,10 +257,31 @@ class CentralManagement(): ) self.logger.debug(notification_info) + def cleanDrawing(self): + self.eventQueue.put(Signal('scan.start')) + # Scan to reset + cmd = [ + 'sudo', 'scanimage', '-d', 'epkowa' + ] + proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) + # opens connection to scanner, but only starts scanning when output becomes ready: + o, e = proc.communicate(80) + if e: + self.logger.critical(f"Scanner caused: {e.decode()}") + + self.eventQueue.put(Signal('system.reset')) + self.eventQueue.put(Signal('scan.finished')) + + def reset(self) -> str: + # TODO: for returns & abandons + scan = threading.Thread(target=self.cleanDrawing, name='reset') + scan.start() + def scanImage(self) -> str: """ Run scanimage on scaner and returns a string with the filename """ + self.eventQueue.put(Signal('scan.start')) cmd = [ 'sudo', 'scanimage', '-d', 'epkowa' ] @@ -225,10 +291,12 @@ class CentralManagement(): o, e = proc.communicate(80) if e: self.logger.critical(f"Scanner caused: {e.decode()}") + #TODO: should clear self.isRunning.clear() ? f = io.BytesIO(o) img = Image.open(f) img.save(filename) self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) + self.eventQueue.put(Signal('scan.finished')) diff --git a/sorteerhoed/plotter.py b/sorteerhoed/plotter.py index a869da9..a6bc475 100644 --- a/sorteerhoed/plotter.py +++ b/sorteerhoed/plotter.py @@ -8,6 +8,7 @@ import time class Plotter: def __init__(self, config, eventQ: Queue, runningEvent: Event): + #TODO: scanningEvent -> CentralManagement.isScanning -> prevent plotter move during scan, failsafe self.config = config self.eventQ = eventQ self.q = Queue() diff --git a/sorteerhoed/webserver.py b/sorteerhoed/webserver.py index 8254eb7..21db072 100644 --- a/sorteerhoed/webserver.py +++ b/sorteerhoed/webserver.py @@ -54,9 +54,10 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}") self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, ip=self.request.remote_ip))) + self.eventQ.put(Signal('server.open', dict(hit_id=self.hit.id))) self.strokes = [] - - + + # Gather some initial information: ua = self.request.headers.get('User-Agent', None) if ua: ua_info = httpagentparser.detect(ua) @@ -258,6 +259,7 @@ class StatusPage(): self.reset() def reset(self): + logger.info("Resetting status") self.hit_id = None self.worker_id = None self.ip = None @@ -270,8 +272,13 @@ class StatusPage(): self.hit_created = None self.hit_opened = None - def __setattr__(self, name, value): + def __setattr__(self, name, value): + if name in self.__dict__ and self.__dict__[name] == value: + logger.debug(f"Ignore setting status of {name}: it already is set to {value}") + return + self.__dict__[name] =value + logger.info(f"Update status: {name}: {value}") StatusWebSocketHandler.update_for_all(name, value) def set(self, name, value):