From ecafb1db80c49c60b1c10c71afa37be938930952 Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Sat, 2 Nov 2019 22:31:16 +0100 Subject: [PATCH] Fix crash of HITs and enable expiration of hit after shutdown --- sorteerhoed/HITStore.py | 3 + sorteerhoed/central_management.py | 273 ++++++++++++++++-------------- sorteerhoed/webserver.py | 6 +- www/index.html | 11 +- 4 files changed, 165 insertions(+), 128 deletions(-) diff --git a/sorteerhoed/HITStore.py b/sorteerhoed/HITStore.py index c3caf9d..986e7d1 100644 --- a/sorteerhoed/HITStore.py +++ b/sorteerhoed/HITStore.py @@ -161,6 +161,9 @@ class Store: def getEstimatedHitDuration(self): return self.getAvgDurationOfPreviousNHits(5) + def getHitTimeout(self): + return max(300, self.getAvgDurationOfPreviousNHits(5)*2) + def getHITs(self, n = 100): return self.session.query(HIT).\ filter(HIT.submit_hit_at != None).\ diff --git a/sorteerhoed/central_management.py b/sorteerhoed/central_management.py index dfa062a..9213dfa 100644 --- a/sorteerhoed/central_management.py +++ b/sorteerhoed/central_management.py @@ -79,6 +79,18 @@ class CentralManagement(): self.logger.info(f"Mechanical turk account balance: {self.mturk.get_account_balance()['AvailableBalance']}") + # clear any pending hits: + pending_hits = self.mturk.list_hits(MaxResults=100) + for pending_hit in pending_hits['HITs']: +# print(pending_hit['HITId'], pending_hit['HITStatus']) + if pending_hit['HITStatus'] == 'Assignable': + self.logger.warn(f"Expire stale hit: {pending_hit['HITId']}: {pending_hit['HITStatus']}") + self.mturk.update_expiration_for_hit( + HITId=pending_hit['HITId'], + ExpireAt=datetime.datetime.fromisoformat('2015-01-01') + ) + self.mturk.delete_hit(HITId=pending_hit['HITId']) + self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning) sqsThread = threading.Thread(target=self.sqs.start, name='sqs') sqsThread.start() @@ -103,13 +115,23 @@ class CentralManagement(): while self.isRunning.is_set(): time.sleep(.5) - + except Exception as e: + self.logger.exception(e) finally: self.logger.warning("Stopping Central Managment") self.isRunning.clear() self.server.stop() - + self.expireCurrentHit() + + def expireCurrentHit(self): + if self.currentHit and self.currentHit.hit_id: # hit pending + self.logger.warn(f"Delete hit: {self.currentHit.hit_id}") + self.mturk.update_expiration_for_hit( + HITId=self.currentHit.hit_id, + ExpireAt=datetime.datetime.fromisoformat('2015-01-01') + ) + self.mturk.delete_hit(HITId=self.currentHit.hit_id) def eventListener(self): while self.isRunning.is_set(): try: @@ -118,128 +140,133 @@ class CentralManagement(): pass # self.logger.log(5, "Empty queue.") else: - """ - Possible events: - - SQS events: accept/abandoned/returned/submitted - - webserver events: open, draw, submit - - scan complete - - HIT created - - Plotter complete - - - """ - #TODO: make level debug() - self.logger.info(f"SIGNAL: {signal}") - if signal.name == 'start': - self.makeHit() - pass - elif signal.name == 'hit.scanned': - # TODO: wrap up hit & make new HIT - self.currentHit.scanned_at = datetime.datetime.utcnow() - self.server.statusPage.set('state', self.currentHit.getStatus()) - self.makeHit() - elif signal.name == 'scan.start': - pass - elif signal.name == 'scan.finished': - pass - elif signal.name == 'hit.info': - if signal.params['hit_id'] != self.currentHit.id: - self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}") - continue - for name, value in signal.params.items(): - if name == 'hit_id': - continue - if name == 'ip': - self.currentHit.turk_ip = value - if name == 'location': - self.currentHit.turk_country = value - - self.logger.debug(f'Set status: {name} to {value}') - self.server.statusPage.set(name, value) - elif signal.name == 'server.open': - self.currentHit.open_page_at = datetime.datetime.utcnow() - self.store.saveHIT(self.currentHit) - self.setLight(True) - self.server.statusPage.set('state', self.currentHit.getStatus()) - self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) - elif signal.name == 'server.submit': - self.currentHit.submit_page_at = datetime.datetime.utcnow() - self.store.saveHIT(self.currentHit) - self.plotter.park() - self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) - # park always triggers a plotter.finished after being processed - - elif signal.name[:4] == 'sqs.': - if signal.params['event']['HITId'] != self.currentHit.hit_id: - self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT") - sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId']) - updateStatus = False - else: - sqsHit = self.currentHit - updateStatus = True - - if signal.name == 'sqs.AssignmentAccepted': - self.logger.info(f'Set status progress to accepted') - sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") - sqsHit.worker_id = signal.params['event']['WorkerId'] - if updateStatus: - self.server.statusPage.set('worker_id', sqsHit.worker_id) - # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} - elif signal.name == 'sqs.AssignmentAbandoned': - self.logger.info(f'Set status progress to abandoned') - #{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} - sqsHit.accept_time = None - sqsHit.open_page_at = None - if self.currentHit.id == sqsHit.id: - if not sqsHit.submit_page_at: - self.reset() - else: - sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit - if updateStatus: - self.setLight(False) - - elif signal.name == 'sqs.AssignmentReturned': - self.logger.info(f'Set status progress to returned') - sqsHit.accept_time = None - sqsHit.open_page_at = None - if self.currentHit.id == sqsHit.id: - if not sqsHit.submit_page_at: - self.reset() - else: - sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit - if updateStatus: - self.setLight(False) - # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} - elif signal.name == 'sqs.AssignmentSubmitted': - # {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"surveycode<\\/QuestionIdentifier>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'} - self.logger.info(f'Set status progress to submitted') - # TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid - sqsHit.answer = signal.params['event']['Answer'] - if sqsHit.uuid not in sqsHit.answer: - self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}") - - sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") - - self.store.saveHIT(sqsHit) - - if updateStatus: - self.logger.warning(f'update status: {sqsHit.getStatus()}') - # TODO: have HITStore/HIT take care of this by emitting a signal - # only update status if it is the currentHit - self.server.statusPage.set('state', sqsHit.getStatus()) - else: - self.logger.warning('DO NOT update status') - elif signal.name == 'plotter.finished': - if self.currentHit and self.currentHit.submit_page_at: - self.setLight(False) - scan = threading.Thread(target=self.scanImage, name='scan') - scan.start() - self.server.statusPage.set('hit_submitted', self.currentHit.submit_page_at) + try: + """ + Possible events: + - SQS events: accept/abandoned/returned/submitted + - webserver events: open, draw, submit + - scan complete + - HIT created + - Plotter complete + - + """ + #TODO: make level debug() + self.logger.info(f"SIGNAL: {signal}") + if signal.name == 'start': + self.makeHit() + pass + elif signal.name == 'hit.scanned': + # TODO: wrap up hit & make new HIT + self.currentHit.scanned_at = datetime.datetime.utcnow() self.server.statusPage.set('state', self.currentHit.getStatus()) - else: - self.logger.critical(f"Unknown signal: {signal.name}") - + self.makeHit() + elif signal.name == 'scan.start': + pass + elif signal.name == 'scan.finished': + pass + elif signal.name == 'hit.info': + if signal.params['hit_id'] != self.currentHit.id: + self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}") + continue + for name, value in signal.params.items(): + if name == 'hit_id': + continue + if name == 'ip': + self.currentHit.turk_ip = value + if name == 'location': + self.currentHit.turk_country = value + + self.logger.debug(f'Set status: {name} to {value}') + self.server.statusPage.set(name, value) + elif signal.name == 'server.open': + self.currentHit.open_page_at = datetime.datetime.utcnow() + self.store.saveHIT(self.currentHit) + self.setLight(True) + self.server.statusPage.set('state', self.currentHit.getStatus()) + self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) + elif signal.name == 'server.submit': + self.currentHit.submit_page_at = datetime.datetime.utcnow() + self.store.saveHIT(self.currentHit) + self.plotter.park() + self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) + # park always triggers a plotter.finished after being processed + + elif signal.name[:4] == 'sqs.': + if signal.params['event']['HITId'] != self.currentHit.hit_id: + self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT") + sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId']) + updateStatus = False + else: + sqsHit = self.currentHit + updateStatus = True + + if signal.name == 'sqs.AssignmentAccepted': + self.logger.info(f'Set status progress to accepted') + sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") + sqsHit.worker_id = signal.params['event']['WorkerId'] + if updateStatus: + self.server.statusPage.set('worker_id', sqsHit.worker_id) + # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} + elif signal.name == 'sqs.AssignmentAbandoned': + self.logger.info(f'Set status progress to abandoned') + #{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} + sqsHit.accept_time = None + sqsHit.open_page_at = None + if self.currentHit.id == sqsHit.id: + if not sqsHit.submit_page_at: + self.reset() + else: + sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit + if updateStatus: + self.setLight(False) + + elif signal.name == 'sqs.AssignmentReturned': + self.logger.info(f'Set status progress to returned') + sqsHit.accept_time = None + sqsHit.open_page_at = None + if self.currentHit.id == sqsHit.id: + if not sqsHit.submit_page_at: + self.reset() + else: + sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit + if updateStatus: + self.setLight(False) + # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} + elif signal.name == 'sqs.AssignmentSubmitted': + # {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"surveycode<\\/QuestionIdentifier>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'} + self.logger.info(f'Set status progress to submitted') + # TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid + sqsHit.answer = signal.params['event']['Answer'] + if sqsHit.uuid not in sqsHit.answer: + self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}") + + sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") + + self.store.saveHIT(sqsHit) + + if updateStatus: + self.logger.warning(f'update status: {sqsHit.getStatus()}') + # TODO: have HITStore/HIT take care of this by emitting a signal + # only update status if it is the currentHit + self.server.statusPage.set('state', sqsHit.getStatus()) + else: + self.logger.warning('DO NOT update status') + elif signal.name == 'plotter.finished': + if self.currentHit and self.currentHit.submit_page_at: + self.setLight(False) + scan = threading.Thread(target=self.scanImage, name='scan') + scan.start() + self.server.statusPage.set('hit_submitted', self.currentHit.submit_page_at) + self.server.statusPage.set('state', self.currentHit.getStatus()) + else: + self.logger.critical(f"Unknown signal: {signal.name}") + except Exception as e: + self.logger.critical(f"Exception on handling {signal}") + self.logger.exception(e) def makeHit(self): + self.expireCurrentHit() # expire hit if it is there + self.server.statusPage.reset() self.currentHit = self.store.createHIT() self.store.currentHit = self.currentHit @@ -249,7 +276,7 @@ class CentralManagement(): question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id)) estimatedHitDuration = self.store.getEstimatedHitDuration() - fee = (self.config['hour_rate_aim']/3600.) * estimatedHitDuration + fee = max(.2, (self.config['hour_rate_aim']/3600.) * estimatedHitDuration) self.currentHit.fee = fee self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}") new_hit = self.mturk.create_hit( @@ -259,7 +286,7 @@ class CentralManagement(): Reward = "{:.2f}".format(fee), MaxAssignments = 1, LifetimeInSeconds = self.config['hit_lifetime'], - AssignmentDurationInSeconds = estimatedHitDuration * 2, # give people twice as long as we expect them to take + AssignmentDurationInSeconds = self.store.getHitTimeout(), AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'], Question = question, ) diff --git a/sorteerhoed/webserver.py b/sorteerhoed/webserver.py index 622154a..4095b09 100644 --- a/sorteerhoed/webserver.py +++ b/sorteerhoed/webserver.py @@ -63,7 +63,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): self.hit = self.store.currentHit - self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getEstimatedHitDuration() * 2) + self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getHitTimeout()) if self.hit.submit_hit_at: raise Exception("Opening websocket for already submitted hit") @@ -116,7 +116,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): xmlns:svg="http://www.w3.org/2000/svg" xmlns="http://www.w3.org/2000/svg" version="1.0" viewBox="0 0 {self.config['scanner']['width']}0 {self.config['scanner']['height']}0" width="{self.config['scanner']['width']}mm" height="{self.config['scanner']['height']}mm" preserveAspectRatio="none"> - + """ @@ -125,7 +125,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): self.write_message(json.dumps({ 'action': 'submitted', - 'msg': f"Submission ok, please refer to your submission as: {self.hit.uuid}" + 'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.hit.uuid}" })) self.close() diff --git a/www/index.html b/www/index.html index c0ea839..59e174e 100644 --- a/www/index.html +++ b/www/index.html @@ -29,7 +29,7 @@ path { fill: none; stroke: red; - stroke-width: 2px; + stroke-width: 3mm; } body.submitted path{ stroke:darkgray; @@ -116,7 +116,7 @@ } #info{ position: absolute; - bottom: 5px; + bottom: 15px; width: 600px; left: calc(50% - 250px); z-index: 999; @@ -124,6 +124,13 @@ .buttons{ text-align: center; } + #submit{ + background: lightblue; + border: solid 1px blue; + border-radius: 5px; + font-size: 110%; + padding: 5px 10px; + }