Further implementation of flow

This commit is contained in:
Ruben van de Ven 2019-10-30 12:44:25 +01:00
parent a9e8ec4069
commit 4d3eed56e9
4 changed files with 130 additions and 29 deletions

View file

@ -40,8 +40,8 @@ class HIT(Base):
__tablename__ = 'hits' __tablename__ = 'hits'
id = Column(Integer, Sequence('hit_id'), primary_key=True) # our sequential hit id id = Column(Integer, Sequence('hit_id'), primary_key=True) # our sequential hit id
hit_id = Column(String(255)) # amazon's hit id hit_id = Column(String(255)) # amazon's hit id
created_at = Column(DateTime, default=datetime.datetime.now()) created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.now()) updated_at = Column(DateTime, default=datetime.datetime.utcnow)
uuid = Column(String(32), default=lambda : uuid.uuid4().hex) uuid = Column(String(32), default=lambda : uuid.uuid4().hex)
assignment_id = Column(String(255), default = None) assignment_id = Column(String(255), default = None)
worker_id = Column(String(255), default = None) worker_id = Column(String(255), default = None)
@ -54,6 +54,7 @@ class HIT(Base):
turk_country = Column(String(255), default=None) turk_country = Column(String(255), default=None)
turk_screen_width = Column(Integer, default = None) turk_screen_width = Column(Integer, default = None)
turk_screen_height = Column(Integer, default = None) turk_screen_height = Column(Integer, default = None)
scanned_at = Column(DateTime, default=None)
def getImagePath(self): def getImagePath(self):
@ -62,6 +63,20 @@ class HIT(Base):
def getImageUrl(self): def getImageUrl(self):
return f"scans/{self.id}.png" return f"scans/{self.id}.png"
def getStatus(self):
if self.scanned_at:
return "completed"
if self.submit_hit_at:
return "submission confirmed"
if self.submit_page_at:
return "submitted by worker"
if self.open_page_at:
return "started working"
if self.accept_time:
return "accepted by worker"
return "created"
class Store: class Store:
def __init__(self, db_filename, logLevel=0): def __init__(self, db_filename, logLevel=0):
@ -122,7 +137,17 @@ class Store:
s.refresh(hit) s.refresh(hit)
logger.info(f"Added {hit.id}") logger.info(f"Added {hit.id}")
def getAvgDurationOfPreviousNHits(self, n) -> int:
latest_hits = self.session.query(HIT).\
filter(HIT.submit_hit_at!=None).\
filter(HIT.accept_time!=None).\
order_by(HIT.submit_hit_at.desc()).limit(n)
durations = []
for hit in latest_hits:
durations.append((hit.submit_hit_at - hit.accept_time).total_seconds())
if not len(durations):
return int(2.5*60)
return int(sum(durations) / len(durations))
# def rmSource(self, id: int): # def rmSource(self, id: int):
# with self.getSession() as session: # with self.getSession() as session:

View file

@ -35,6 +35,7 @@ class CentralManagement():
self.eventQueue = Queue() self.eventQueue = Queue()
self.isRunning = threading.Event() self.isRunning = threading.Event()
self.isScanning = threading.Event()
def loadConfig(self, filename): def loadConfig(self, filename):
@ -125,53 +126,94 @@ class CentralManagement():
pass pass
elif signal.name == 'hit.scanned': elif signal.name == 'hit.scanned':
# TODO: wrap up hit & make new HIT # TODO: wrap up hit & make new HIT
pass self.makeHit()
elif signal.name == 'scan.start':
self.isScanning.set()
elif signal.name == 'scan.finished':
self.isScanning.clear()
elif signal.name == 'hit.info': elif signal.name == 'hit.info':
if signal.params['hit_id'] != self.currentHit.id: if signal.params['hit_id'] != self.currentHit.id:
self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}") self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}")
continue continue
for name, value in enumerate(signal.params): for name, value in signal.params.items():
if name == 'hit_id':
continue
self.logger.debug(f'Set status: {name} to {value}') self.logger.debug(f'Set status: {name} to {value}')
self.server.statusPage.set(name, value) self.server.statusPage.set(name, value)
elif signal.name == 'server.open':
self.currentHit.open_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
# TODO: turn on light!
elif signal.name == 'server.submit': elif signal.name == 'server.submit':
self.currentHit.submit_page_at = datetime.datetime.now() self.currentHit.submit_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit) self.store.saveHIT(self.currentHit)
self.plotter.park() self.plotter.park()
# TODO: turn off light!
# park should alway triggers a plotter.finished after being processed # park should alway triggers a plotter.finished after being processed
elif signal.name == 'sqs.AssignmentAccepted': elif signal.name[:4] == 'sqs.':
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} if signal.params['event']['HITId'] != self.currentHit.id:
pass self.logger.warning(f"SQS hit.info hit_id != currenthit.id: {signal}, update status for older HIT")
elif signal.name == 'sqs.AssignmentAbandoned': sqsHit = self.store.getHitByRemoteId(signal.params['event']['HITId'])
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} updateStatus = False
pass else:
elif signal.name == 'sqs.AssignmentReturned': sqsHit = self.currentHit
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} updateStatus = True
pass
elif signal.name == 'sqs.AssignmentSubmitted': if signal.name == 'sqs.AssignmentAccepted':
pass self.logger.info(f'Set status progress to accepted')
sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsHit.worker_id = signal.params['event']['WorkerId']
if updateStatus:
self.server.statusPage.set('worker_id', sqsHit.worker_id)
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentAbandoned':
self.logger.info(f'Set status progress to abandoned')
sqsHit.accept_time = None
sqsHit.open_page_at = None
self.reset()
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentReturned':
self.logger.info(f'Set status progress to returned')
sqsHit.accept_time = None
sqsHit.open_page_at = None
self.reset()
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentSubmitted':
# {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"<?xml version=\\"1.0\\" encoding=\\"ASCII\\"?><QuestionFormAnswers xmlns=\\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd\\"><Answer><QuestionIdentifier>surveycode<\\/QuestionIdentifier><FreeText>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'}
self.logger.info(f'Set status progress to submitted')
# TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid
sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
self.store.saveHIT(sqsHit)
if updateStatus:
# TODO: have HITStore/HIT take care of this by emitting a signal
# only update status if it is the currentHit
self.server.statusPage.set('state', sqsHit.getStatus())
elif signal.name == 'plotter.finished': elif signal.name == 'plotter.finished':
if self.currentHit.submit_page_at: if self.currentHit.submit_page_at:
# TODO: scan! scan = threading.Thread(target=self.scanImage, name='scan')
pass scan.start()
elif signal.name == '': else:
pass self.logger.critical(f"Unknown signal: {signal.name}")
# handle singals/events:
# TODO: next steps
# TODO: update status
def makeHit(self): def makeHit(self):
self.server.statusPage.reset()
self.currentHit = self.store.createHIT() self.currentHit = self.store.createHIT()
self.logger.info(f"Make HIT {self.currentHit.id}") self.logger.info(f"Make HIT {self.currentHit.id}")
question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id)) question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
estimatedHitDuration = self.store.getAvgDurationOfPreviousNHits(5)
fee = (self.config['hour_rate_aim']/3600.) * estimatedHitDuration
self.logger.debug(f"Based on average duration of {estimatedHitDuration} fee should be {fee}/hit to get hourly rate of {self.config['hour_rate_aim']}")
new_hit = self.mturk.create_hit( new_hit = self.mturk.create_hit(
Title = 'Trace the drawn line', Title = 'Trace the drawn line',
Description = 'Draw a line over the sketched line in the image', Description = 'Draw a line over the sketched line in the image',
Keywords = 'polygons, trace, draw', Keywords = 'polygons, trace, draw',
Reward = '0.15', # TODO: make variable Reward = "{:.2f}".format(fee),
MaxAssignments = 1, MaxAssignments = 1,
LifetimeInSeconds = self.config['hit_lifetime'], LifetimeInSeconds = self.config['hit_lifetime'],
AssignmentDurationInSeconds = self.config['hit_assignment_duration'], AssignmentDurationInSeconds = self.config['hit_assignment_duration'],
@ -183,7 +225,10 @@ class CentralManagement():
self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId']) self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId'])
self.currentHit.hit_id = new_hit['HIT']['HITId'] self.currentHit.hit_id = new_hit['HIT']['HITId']
self.store.saveHIT(self.currentHit) self.store.saveHIT(self.currentHit)
# TODO: have HITStore/HIT take care of this by emitting a signal
self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
# mturk.send_test_event_notification() # mturk.send_test_event_notification()
if self.config['amazon']['sqs_url']: if self.config['amazon']['sqs_url']:
@ -212,10 +257,31 @@ class CentralManagement():
) )
self.logger.debug(notification_info) self.logger.debug(notification_info)
def cleanDrawing(self):
self.eventQueue.put(Signal('scan.start'))
# Scan to reset
cmd = [
'sudo', 'scanimage', '-d', 'epkowa'
]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready:
o, e = proc.communicate(80)
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
self.eventQueue.put(Signal('system.reset'))
self.eventQueue.put(Signal('scan.finished'))
def reset(self) -> str:
# TODO: for returns & abandons
scan = threading.Thread(target=self.cleanDrawing, name='reset')
scan.start()
def scanImage(self) -> str: def scanImage(self) -> str:
""" """
Run scanimage on scaner and returns a string with the filename Run scanimage on scaner and returns a string with the filename
""" """
self.eventQueue.put(Signal('scan.start'))
cmd = [ cmd = [
'sudo', 'scanimage', '-d', 'epkowa' 'sudo', 'scanimage', '-d', 'epkowa'
] ]
@ -225,10 +291,12 @@ class CentralManagement():
o, e = proc.communicate(80) o, e = proc.communicate(80)
if e: if e:
self.logger.critical(f"Scanner caused: {e.decode()}") self.logger.critical(f"Scanner caused: {e.decode()}")
#TODO: should clear self.isRunning.clear() ?
f = io.BytesIO(o) f = io.BytesIO(o)
img = Image.open(f) img = Image.open(f)
img.save(filename) img.save(filename)
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))

View file

@ -8,6 +8,7 @@ import time
class Plotter: class Plotter:
def __init__(self, config, eventQ: Queue, runningEvent: Event): def __init__(self, config, eventQ: Queue, runningEvent: Event):
#TODO: scanningEvent -> CentralManagement.isScanning -> prevent plotter move during scan, failsafe
self.config = config self.config = config
self.eventQ = eventQ self.eventQ = eventQ
self.q = Queue() self.q = Queue()

View file

@ -54,9 +54,10 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}") logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}")
self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, ip=self.request.remote_ip))) self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, ip=self.request.remote_ip)))
self.eventQ.put(Signal('server.open', dict(hit_id=self.hit.id)))
self.strokes = [] self.strokes = []
# Gather some initial information:
ua = self.request.headers.get('User-Agent', None) ua = self.request.headers.get('User-Agent', None)
if ua: if ua:
ua_info = httpagentparser.detect(ua) ua_info = httpagentparser.detect(ua)
@ -258,6 +259,7 @@ class StatusPage():
self.reset() self.reset()
def reset(self): def reset(self):
logger.info("Resetting status")
self.hit_id = None self.hit_id = None
self.worker_id = None self.worker_id = None
self.ip = None self.ip = None
@ -270,8 +272,13 @@ class StatusPage():
self.hit_created = None self.hit_created = None
self.hit_opened = None self.hit_opened = None
def __setattr__(self, name, value): def __setattr__(self, name, value):
if name in self.__dict__ and self.__dict__[name] == value:
logger.debug(f"Ignore setting status of {name}: it already is set to {value}")
return
self.__dict__[name] =value self.__dict__[name] =value
logger.info(f"Update status: {name}: {value}")
StatusWebSocketHandler.update_for_all(name, value) StatusWebSocketHandler.update_for_all(name, value)
def set(self, name, value): def set(self, name, value):