Fix crash of HITs and enable expiration of hit after shutdown

This commit is contained in:
Ruben van de Ven 2019-11-02 22:31:16 +01:00
parent 43c327823e
commit ecafb1db80
4 changed files with 165 additions and 128 deletions

View file

@ -161,6 +161,9 @@ class Store:
def getEstimatedHitDuration(self): def getEstimatedHitDuration(self):
return self.getAvgDurationOfPreviousNHits(5) return self.getAvgDurationOfPreviousNHits(5)
def getHitTimeout(self):
return max(300, self.getAvgDurationOfPreviousNHits(5)*2)
def getHITs(self, n = 100): def getHITs(self, n = 100):
return self.session.query(HIT).\ return self.session.query(HIT).\
filter(HIT.submit_hit_at != None).\ filter(HIT.submit_hit_at != None).\

View file

@ -79,6 +79,18 @@ class CentralManagement():
self.logger.info(f"Mechanical turk account balance: {self.mturk.get_account_balance()['AvailableBalance']}") self.logger.info(f"Mechanical turk account balance: {self.mturk.get_account_balance()['AvailableBalance']}")
# clear any pending hits:
pending_hits = self.mturk.list_hits(MaxResults=100)
for pending_hit in pending_hits['HITs']:
# print(pending_hit['HITId'], pending_hit['HITStatus'])
if pending_hit['HITStatus'] == 'Assignable':
self.logger.warn(f"Expire stale hit: {pending_hit['HITId']}: {pending_hit['HITStatus']}")
self.mturk.update_expiration_for_hit(
HITId=pending_hit['HITId'],
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
)
self.mturk.delete_hit(HITId=pending_hit['HITId'])
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning) self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
sqsThread = threading.Thread(target=self.sqs.start, name='sqs') sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
sqsThread.start() sqsThread.start()
@ -103,13 +115,23 @@ class CentralManagement():
while self.isRunning.is_set(): while self.isRunning.is_set():
time.sleep(.5) time.sleep(.5)
except Exception as e:
self.logger.exception(e)
finally: finally:
self.logger.warning("Stopping Central Managment") self.logger.warning("Stopping Central Managment")
self.isRunning.clear() self.isRunning.clear()
self.server.stop() self.server.stop()
self.expireCurrentHit()
def expireCurrentHit(self):
if self.currentHit and self.currentHit.hit_id: # hit pending
self.logger.warn(f"Delete hit: {self.currentHit.hit_id}")
self.mturk.update_expiration_for_hit(
HITId=self.currentHit.hit_id,
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
)
self.mturk.delete_hit(HITId=self.currentHit.hit_id)
def eventListener(self): def eventListener(self):
while self.isRunning.is_set(): while self.isRunning.is_set():
try: try:
@ -118,6 +140,7 @@ class CentralManagement():
pass pass
# self.logger.log(5, "Empty queue.") # self.logger.log(5, "Empty queue.")
else: else:
try:
""" """
Possible events: Possible events:
- SQS events: accept/abandoned/returned/submitted - SQS events: accept/abandoned/returned/submitted
@ -237,9 +260,13 @@ class CentralManagement():
self.server.statusPage.set('state', self.currentHit.getStatus()) self.server.statusPage.set('state', self.currentHit.getStatus())
else: else:
self.logger.critical(f"Unknown signal: {signal.name}") self.logger.critical(f"Unknown signal: {signal.name}")
except Exception as e:
self.logger.critical(f"Exception on handling {signal}")
self.logger.exception(e)
def makeHit(self): def makeHit(self):
self.expireCurrentHit() # expire hit if it is there
self.server.statusPage.reset() self.server.statusPage.reset()
self.currentHit = self.store.createHIT() self.currentHit = self.store.createHIT()
self.store.currentHit = self.currentHit self.store.currentHit = self.currentHit
@ -249,7 +276,7 @@ class CentralManagement():
question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id)) question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
estimatedHitDuration = self.store.getEstimatedHitDuration() estimatedHitDuration = self.store.getEstimatedHitDuration()
fee = (self.config['hour_rate_aim']/3600.) * estimatedHitDuration fee = max(.2, (self.config['hour_rate_aim']/3600.) * estimatedHitDuration)
self.currentHit.fee = fee self.currentHit.fee = fee
self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}") self.logger.info(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}")
new_hit = self.mturk.create_hit( new_hit = self.mturk.create_hit(
@ -259,7 +286,7 @@ class CentralManagement():
Reward = "{:.2f}".format(fee), Reward = "{:.2f}".format(fee),
MaxAssignments = 1, MaxAssignments = 1,
LifetimeInSeconds = self.config['hit_lifetime'], LifetimeInSeconds = self.config['hit_lifetime'],
AssignmentDurationInSeconds = estimatedHitDuration * 2, # give people twice as long as we expect them to take AssignmentDurationInSeconds = self.store.getHitTimeout(),
AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'], AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'],
Question = question, Question = question,
) )

View file

@ -63,7 +63,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
self.hit = self.store.currentHit self.hit = self.store.currentHit
self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getEstimatedHitDuration() * 2) self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getHitTimeout())
if self.hit.submit_hit_at: if self.hit.submit_hit_at:
raise Exception("Opening websocket for already submitted hit") raise Exception("Opening websocket for already submitted hit")
@ -116,7 +116,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
xmlns:svg="http://www.w3.org/2000/svg" xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg" xmlns="http://www.w3.org/2000/svg"
version="1.0" viewBox="0 0 {self.config['scanner']['width']}0 {self.config['scanner']['height']}0" width="{self.config['scanner']['width']}mm" height="{self.config['scanner']['height']}mm" preserveAspectRatio="none"> version="1.0" viewBox="0 0 {self.config['scanner']['width']}0 {self.config['scanner']['height']}0" width="{self.config['scanner']['width']}mm" height="{self.config['scanner']['height']}mm" preserveAspectRatio="none">
<path d="{d}" style='stroke:gray;stroke-width:1mm;fill:none;' id="stroke" /> <path d="{d}" style='stroke:gray;stroke-width:2mm;fill:none;' id="stroke" />
</svg> </svg>
""" """
@ -125,7 +125,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
self.write_message(json.dumps({ self.write_message(json.dumps({
'action': 'submitted', 'action': 'submitted',
'msg': f"Submission ok, please refer to your submission as: {self.hit.uuid}" 'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.hit.uuid}"
})) }))
self.close() self.close()

View file

@ -29,7 +29,7 @@
path { path {
fill: none; fill: none;
stroke: red; stroke: red;
stroke-width: 2px; stroke-width: 3mm;
} }
body.submitted path{ body.submitted path{
stroke:darkgray; stroke:darkgray;
@ -116,7 +116,7 @@
} }
#info{ #info{
position: absolute; position: absolute;
bottom: 5px; bottom: 15px;
width: 600px; width: 600px;
left: calc(50% - 250px); left: calc(50% - 250px);
z-index: 999; z-index: 999;
@ -124,6 +124,13 @@
.buttons{ .buttons{
text-align: center; text-align: center;
} }
#submit{
background: lightblue;
border: solid 1px blue;
border-radius: 5px;
font-size: 110%;
padding: 5px 10px;
}
</style> </style>
</head> </head>
<body> <body>