Refactored to work with separate assignment table

This commit is contained in:
Ruben van de Ven 2020-01-13 13:49:59 +01:00
parent ac04fb6082
commit d3bf3d47ea
4 changed files with 345 additions and 245 deletions

View File

@ -9,9 +9,6 @@ import datetime
from contextlib import contextmanager
import uuid
import os
import coloredlogs
import argparse
from sqlalchemy.sql.functions import func
mainLogger = logging.getLogger("sorteerhoed")
logger = mainLogger.getChild("store")
@ -43,6 +40,7 @@ class HIT(Base):
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.utcnow)
scanned_at = Column(DateTime, default=None)
plotted_at = Column(DateTime, default=None)
deleted_at = Column(DateTime, default=None)
assignments = relationship("Assignment", back_populates="hit", order_by="Assignment.created_at")
fee = Column(Float(precision=2), default=None)
@ -53,6 +51,9 @@ class HIT(Base):
def getImagePath(self):
return os.path.join('scanimation/interfaces/frames', f"{self.id:06d}.jpg")
def getImageUrl(self):
return os.path.join('/frames', f"{self.id:06d}.jpg")
def getSvgImageUrl(self):
return f"scans/{self.id:06d}.svg"
@ -68,23 +69,44 @@ class HIT(Base):
def getAssignmentById(self, assignmentId):
for a in self.assignments:
if a.assignment_id == assignmentId:
return
return a
return None
def getStatus(self):
assignment = self.getLastAssignment()
if self.deleted_at:
return "deleted"
if not self.hit_id:
return "creating"
if not assignment:
return "awaiting worker"
if self.scanned_at:
return "completed"
if self.submit_hit_at:
return "submission confirmed"
if self.submit_page_at:
return "submitted by worker"
if self.open_page_at:
return "working"
if self.accept_time:
return "accepted by worker"
# on abandon:
if self.worker_id:
return "abandoned by worker"
return "awaiting worker"
return "scanned"
return assignment.getStatus()
def toDict(self) -> dict:
values = {c.name: getattr(self, c.name) for c in self.__table__.columns}
assignment = self.getLastAssignment()
values['assignment'] = assignment.toDict() if assignment else None
values['state'] = self.getStatus()
values['scan_image'] = self.getImageUrl() if self.scanned_at else None
values['svg_image'] = self.getSvgImageUrl() if self.isSubmitted() else None
return values
def delete(self):
self.deleted_at = datetime.datetime.utcnow()
def isSubmitted(self) -> bool:
a = self.getLastAssignment()
if not a:
return False
return bool(a.submit_page_at)
def isConfirmed(self) -> bool:
a = self.getLastAssignment()
if not a:
return False
return bool(a.confirmed_at)
class Assignment(Base):
__tablename__ = 'assignments'
@ -99,7 +121,7 @@ class Assignment(Base):
assignment_id = Column(String(255), default = None)
worker_id = Column(String(255), default = None)
accept_at = Column(DateTime, default=None)
accept_at = Column(DateTime, default=None) # accept time acccording to SQS
# open_page_at = Column(DateTime, default=None)
submit_page_at = Column(DateTime, default=None) # Submit the page
confirmed_at = Column(DateTime, default=None) # validate with UUID when getting Message from Amazon
@ -109,6 +131,24 @@ class Assignment(Base):
answer = Column(String(255), default=None)
turk_ip = Column(String(255), default=None)
turk_country = Column(String(255), default=None)
turk_os = Column(String(255), default=None)
turk_browser = Column(String(255), default=None)
def getStatus(self):
if self.rejected_at:
return "rejected"
if self.abandoned_at:
return "abandoned"
if not self.submit_page_at:
return "working"
if not self.confirmed_at:
return "submitted"
return "confirmed"
def toDict(self) -> dict:
values = {c.name: getattr(self, c.name) for c in self.__table__.columns}
return values
class Store:
def __init__(self, db_filename, logLevel=0):
@ -122,6 +162,19 @@ class Store:
self.session = self.Session()
self.currentHit = None # mirrors Centralmanagmenet, stored here so we can quickly access it from webserver classes
self.updateHooks = []
def registerUpdateHook(self, hook):
if hook not in self.updateHooks:
logger.info(f"Register update hook: {hook}")
self.updateHooks.append(hook)
def triggerUpdateHooks(self, hit = None):
for hook in self.updateHooks:
if callable(hook): # it's a method
hook(hit)
else: # assume it's an object
hook.update(hit)
@contextmanager
def getSession(self):
@ -133,8 +186,8 @@ class Store:
self.session.rollback()
raise
def getHits(self, session):
return self.session.query(Source).order_by(HIT.created_at.desc())
def getHits(self):
return self.session.query(HIT).order_by(HIT.created_at.desc())
def getHitById(self, hitId):
return self.session.query(HIT).\
@ -147,8 +200,19 @@ class Store:
def getLastSubmittedHit(self):
return self.session.query(HIT).\
filter(HIT.submit_page_at!=None).\
order_by(HIT.submit_page_at.desc()).first()
join(Assignment).\
filter(Assignment.submit_page_at!=None).\
order_by(HIT.created_at.desc()).first()
def getNewestHits(self, n = 2) -> list:
hits = list(
self.session.query(HIT).\
filter(HIT.deleted_at==None).\
order_by(HIT.created_at.desc()).limit(2)
)
# select DESC, because we want latest, then reverse list to get in right order
hits.reverse()
return hits
def createHIT(self) -> HIT:
with self.getSession() as s:
@ -157,40 +221,57 @@ class Store:
s.flush()
s.refresh(hit)
logger.info(f"Created HIT {hit.id}")
self.triggerUpdateHooks(hit)
return hit
def newAssignment(self, hit: HIT) -> Assignment:
def newAssignment(self, hit: HIT, assignmentId) -> Assignment:
# TODO: reset() central management if has pending lastAssignment()
with self.getSession() as s:
assignment = Assignment()
assignment.assignment_id = assignmentId
hit.assignments.append(assignment)
s.add(assignment)
s.flush()
s.refresh(hit)
logger.info(f"Created Assignment {assignment.id}")
self.triggerUpdateHooks(hit)
return assignment
def saveHIT(self, hit):
with self.getSession() as s:
logger.info(f"Updating hit! {hit.id}")
# s.flush()
self.triggerUpdateHooks(hit)
def addHIT(self, hit: HIT):
def saveAssignment(self, assignment):
with self.getSession() as s:
s.add(hit)
s.flush()
s.refresh(hit)
logger.info(f"Added {hit.id}")
logger.info(f"Updating assignment! {assignment.id}")
# s.flush()
self.triggerUpdateHooks(assignment.hit)
# def addHIT(self, hit: HIT):
# with self.getSession() as s:
# s.add(hit)
# s.flush()
# s.refresh(hit)
# logger.info(f"Added {hit.id}")
def getAvgDurationOfPreviousNHits(self, n) -> int:
latest_hits = self.session.query(HIT).\
filter(HIT.submit_hit_at!=None).\
filter(HIT.accept_time!=None).\
order_by(HIT.submit_hit_at.desc()).limit(n)
latest_assignments = self.session.query(Assignment).\
filter(Assignment.created_at!=None).\
filter(Assignment.submit_page_at!=None).\
order_by(Assignment.created_at.desc()).limit(n)
durations = []
for hit in latest_hits:
durations.append((hit.submit_hit_at - hit.accept_time).total_seconds())
for assignment in latest_assignments:
durations.append((assignment.submit_page_at - assignment.created_at).total_seconds())
if not len(durations):
return int(2.5*60)
return int(2.5*60) # default to 2.5 minutes
return int(sum(durations) / len(durations))
def getEstimatedHitDuration(self):
@ -203,15 +284,3 @@ class Store:
return self.session.query(HIT).\
filter(HIT.submit_hit_at != None).\
order_by(HIT.submit_hit_at.desc()).limit(n)
# def rmSource(self, id: int):
# with self.getSession() as session:
# source = session.query(Source).get(id)
# if not source:
# logging.warning(f"Source nr {id} not found")
# else:
# logging.info(f"Deleting source {source.id}: {source.url}")
# session.delete(source)
#
# def getRandomNewsItem(self, session) -> NewsItem:
# return session.query(NewsItem).order_by(func.random()).limit(1).first()

View File

@ -17,10 +17,14 @@ from PIL import Image
import datetime
from shutil import copyfile
import colorsys
import tqdm
class Level(object):
# Level effect adapted from https://stackoverflow.com/a/3125421
"""
Level image effect adapted from https://stackoverflow.com/a/3125421
"""
def __init__(self, minv, maxv, gamma):
self.minv= minv/255.0
self.maxv= maxv/255.0
@ -81,6 +85,7 @@ class CentralManagement():
self.isScanning = threading.Event()
self.scanLock = threading.Lock()
self.notPaused = threading.Event()
self.lightStatus = 0
def loadConfig(self, filename, args):
@ -94,6 +99,7 @@ class CentralManagement():
'hit_store.db'
)
self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO)
self.store.registerUpdateHook(self.updateLightHook) # change light based on status
self.logger.debug(f"Loaded configuration: {self.config}")
@ -142,6 +148,10 @@ class CentralManagement():
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
)
self.mturk.delete_hit(HITId=pending_hit['HITId'])
staleHit = self.store.getHitByRemoteId(pending_hit['HITId'])
staleHit.delete()
self.store.saveHIT(staleHit)
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
@ -160,8 +170,6 @@ class CentralManagement():
# event listener:
dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher')
dispatcherThread.start()
#
#
self.eventQueue.put(Signal('start', {'ding':'test'}))
@ -177,8 +185,9 @@ class CentralManagement():
self.expireCurrentHit()
def expireCurrentHit(self):
if self.currentHit and self.currentHit.hit_id: # hit pending
self.logger.warn(f"Delete hit: {self.currentHit.hit_id}")
if self.currentHit and not self.currentHit.isConfirmed():
if self.currentHit.hit_id: # hit pending at Amazon
self.logger.warn(f"Expire hit: {self.currentHit.hit_id}")
self.mturk.update_expiration_for_hit(
HITId=self.currentHit.hit_id,
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
@ -187,7 +196,12 @@ class CentralManagement():
self.mturk.delete_hit(HITId=self.currentHit.hit_id)
except Exception as e:
self.logger.exception(e)
if not self.currentHit.isSubmitted():
self.currentHit.delete()
self.store.saveHIT(self.currentHit)
def eventListener(self):
while self.isRunning.is_set():
try:
@ -206,27 +220,27 @@ class CentralManagement():
- Plotter complete
-
"""
#TODO: make level debug()
self.logger.info(f"SIGNAL: {signal}")
if signal.name == 'start':
self.makeHit()
self.lastHitTime = datetime.datetime.now()
elif signal.name == 'hit.scan':
# start a scan
if signal.params['id'] != self.currentHit.id:
self.logger.info(f"Hit.scanned had wrong id: {signal}")
self.logger.info(f"Hit.scan had wrong id: {signal}")
continue
self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning'))
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning'))
elif signal.name == 'hit.scanned':
# TODO: wrap up hit & make new HIT
if signal.params['id'] != self.currentHit.id:
if signal.params['hit_id'] != self.currentHit.id:
self.logger.info(f"Hit.scanned had wrong id: {signal}")
continue
self.currentHit.scanned_at = datetime.datetime.utcnow()
time_diff = datetime.datetime.now() - self.lastHitTime
to_wait = 10 - time_diff.total_seconds()
self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan'))
# self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan'))
if to_wait > 0:
self.logger.warn(f"Sleep until next hit: {to_wait}s")
@ -237,10 +251,11 @@ class CentralManagement():
self.makeHit()
self.lastHitTime = datetime.datetime.now()
elif signal.name == 'hit.creating':
self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit'))
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit'))
pass
elif signal.name == 'hit.created':
self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit'))
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit'))
pass
elif signal.name == 'scan.start':
pass
elif signal.name == 'scan.finished':
@ -251,38 +266,41 @@ class CentralManagement():
# Create new assignment
if signal.params['hit_id'] != self.currentHit.id:
continue
assignment = self.store.newAssignment(self.currentHit)
assignment.assignment_id = signal.params['assignment_id']
self.store.saveAssignment(assignment)
self.statusPageQueue.add(dict(hit_id=self.currentHit.id, assignment_id=assignment.assignment_id, state='assignment'))
assignment = self.currentHit.getAssignmentById(signal.params['assignment_id'])
elif signal.name == 'assignment.info':
assignment = self.currentHit.getAssignmentById(signal.params['assignment_id'])
if not assignment:
self.logger.warning(f"assignment.info assignment.id not for current hit assignments: {signal}")
change = False
for name, value in signal.params.items():
if name == 'ip':
assignment.turk_ip = value
if name == 'location':
assignment.turk_country = value
if name == 'os':
assignment.turk_os = value
if name == 'browser':
assignment.turk_browser = value
change = True
self.logger.debug(f'Set assignment: {name} to {value}')
self.server.statusPage.set(name, value)
if change:
self.store.saveAssignment(assignment)
elif signal.name == 'server.open':
self.currentHit.open_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
self.setLight(True)
self.server.statusPage.set('state', self.currentHit.getStatus())
self.server.statusPage.set('hit_opened', self.currentHit.open_page_at)
elif signal.name == 'server.submit':
self.currentHit.submit_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
elif signal.name == 'assignment.submit':
a = self.currentHit.getLastAssignment()
if a.assignment_id != signal.params['assignment_id']:
self.logger.critical(f"Submit of invalid assignment_id: {signal}")
a.submit_page_at = datetime.datetime.utcnow()
self.store.saveAssignment(a)
self.plotter.park()
self.server.statusPage.set('hit_opened', self.currentHit.open_page_at)
# park always triggers a plotter.finished after being processed
elif signal.name[:4] == 'sqs.':
@ -293,49 +311,37 @@ class CentralManagement():
else:
sqsHit = self.currentHit
updateStatus = True
sqsAssignment = sqsHit.getAssignmentById(signal.params['event']['AssignmentId'])
if not sqsAssignment:
self.logger.critical(f"Invalid assignmentId given for hit: {signal.params['event']}")
continue
if signal.name == 'sqs.AssignmentAccepted':
self.logger.info(f'Set status progress to accepted')
sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsHit.worker_id = signal.params['event']['WorkerId']
if updateStatus:
self.server.statusPage.set('worker_id', sqsHit.worker_id)
sqsAssignment.accept_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsAssignment.worker_id = signal.params['event']['WorkerId']
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentAbandoned':
self.logger.info(f'Set status progress to abandoned')
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
sqsHit.accept_time = None
sqsHit.open_page_at = None
if self.currentHit.id == sqsHit.id:
if not sqsHit.submit_page_at:
self.reset()
else:
sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit
if updateStatus:
self.setLight(False)
sqsAssignment.abandoned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
# if updateStatus:
# self.setLight(False)
self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Accepted task without working on it.')
elif signal.name == 'sqs.AssignmentReturned':
self.logger.info(f'Set status progress to returned')
sqsHit.accept_time = None
sqsHit.open_page_at = None
if self.currentHit.id == sqsHit.id:
if not sqsHit.submit_page_at:
self.reset()
else:
sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit
if updateStatus:
self.setLight(False)
sqsAssignment.returned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentSubmitted':
# {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"<?xml version=\\"1.0\\" encoding=\\"ASCII\\"?><QuestionFormAnswers xmlns=\\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd\\"><Answer><QuestionIdentifier>surveycode<\\/QuestionIdentifier><FreeText>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'}
self.logger.info(f'Set status progress to submitted')
# TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid
sqsHit.answer = signal.params['event']['Answer']
if sqsHit.uuid not in sqsHit.answer:
self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}")
sqsAssignment.answer = signal.params['event']['Answer']
if sqsAssignment.uuid not in sqsAssignment.answer:
self.logger.critical(f"Not a valid answer given?! {sqsAssignment.answer}")
if not sqsHit.submit_page_at:
if not sqsAssignment.submit_page_at:
# page not submitted, hit is. Nevertheless, create new hit.
try:
self.mturk.reject_assignment(AssignmentId=signal.params['event']['AssignmentId'], RequesterFeedback='Did not do the assignment')
@ -343,7 +349,8 @@ class CentralManagement():
self.logger.exception(e)
self.makeHit()
else:
sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsAssignment.confirmed_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
# block de worker na succesvolle submit, om dubbele workers te voorkomen
# TODO: Disabled after worker mail, use quals instead
#self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Every worker can only work once on the taks.')
@ -351,21 +358,19 @@ class CentralManagement():
self.store.saveHIT(sqsHit)
if updateStatus:
self.logger.warning(f'update status: {sqsHit.getStatus()}')
# TODO: have HITStore/HIT take care of this by emitting a signal
# only update status if it is the currentHit
self.server.statusPage.set('state', sqsHit.getStatus())
else:
self.logger.warning('DO NOT update status')
elif signal.name == 'plotter.finished':
if self.currentHit and self.currentHit.submit_page_at:
self.setLight(False)
# is _always_ triggered after submit due to plotter.park()
if self.currentHit and self.currentHit.isSubmitted():
self.currentHit.plotted_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
self.logger.info("Start scan thread")
scan = threading.Thread(target=self.scanImage, name='scan')
scan.start()
self.server.statusPage.set('hit_submitted', self.currentHit.submit_page_at)
self.server.statusPage.set('state', self.currentHit.getStatus())
elif signal.name == 'plotter.parked':
# should this have the code from plotter.finished?
pass
else:
self.logger.critical(f"Unknown signal: {signal.name}")
except Exception as e:
@ -373,11 +378,10 @@ class CentralManagement():
self.logger.exception(e)
def makeHit(self):
self.expireCurrentHit() # expire hit if it is there
self.expireCurrentHit() # expire hit if it is there
self.eventQueue.put(Signal('hit.creating', {'id': self.currentHit.id if self.currentHit else 'start'}))
self.server.statusPage.reset()
self.reloadConfig() # reload new config values if they are set
# self.notPaused.wait()
@ -387,7 +391,6 @@ class CentralManagement():
self.logger.info(f"Make HIT {self.currentHit.id}")
# question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
question = '''<?xml version="1.0" encoding="UTF-8"?>
<ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd">
<ExternalURL>https://guest.rubenvandeven.com:8888/draw?id={HIT_NR}</ExternalURL>
@ -422,7 +425,6 @@ class CentralManagement():
self.currentHit.hit_id = new_hit['HIT']['HITId']
self.store.saveHIT(self.currentHit)
# TODO: have HITStore/HIT take care of this by emitting a signal
# self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
# self.server.statusPage.set('hit_created', self.currentHit.created_at)
# self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}")
@ -489,47 +491,62 @@ class CentralManagement():
"""
Run scanimage on scaner and returns a string with the filename
"""
cmd = [
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff',
'--resolution=100', # lower res, faster (more powerful) scan & wipe
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181),
'-y',str(245)
]
self.logger.info(f"{cmd}")
filename = self.currentHit.getImagePath()
with self.scanLock:
if self.config['dummy_plotter']:
self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.start'))
self.logger.warning("Fake scanner for a few seconds")
for i in tqdm.tqdm(range(5)):
time.sleep(1)
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))
return
cmd = [
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff',
'--resolution=100', # lower res, faster (more powerful) scan & wipe
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181),
'-y',str(245)
]
self.logger.info(f"{cmd}")
filename = self.currentHit.getImagePath()
self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.start'))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready:
o, e = proc.communicate(80)
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
#TODO: should clear self.isRunning.clear() ?
try:
f = io.BytesIO(o)
img = Image.open(f)
img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM)
tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma'])
tunedImg.save(filename)
except Exception as e:
self.logger.critical("Cannot create image from scan. Did scanner work?")
self.logger.exception(e)
# TODO: create
copyfile('www/basic.svg', filename)
time.sleep(5) # sleep a few seconds for scanner to return to start position
self.eventQueue.put(Signal('hit.scanned', {'id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
# Should this clear self.isRunning.clear() ?
try:
f = io.BytesIO(o)
img = Image.open(f)
img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM)
tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma'])
tunedImg.save(filename)
except Exception as e:
self.logger.critical("Cannot create image from scan. Did scanner work?")
self.logger.exception(e)
copyfile('www/basic.svg', filename)
time.sleep(5) # sleep a few seconds for scanner to return to start position
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))
def setLight(self, on):
value = 1 if on else 0
if self.lightStatus == value:
return
self.lightStatus = value
cmd = [
'usbrelay', f'HURTM_1={value}'
]
@ -537,3 +554,17 @@ class CentralManagement():
code = subprocess.call(cmd)
if code > 0:
self.logger.warning(f"Error on light change: {code}")
def updateLightHook(self, hit = None):
# ignore hit attribute, which comes from the HITstore
self.setLight(self.getLightStatus())
def getLightStatus(self) -> bool:
if not self.currentHit:
return False
a = self.currentHit.getLastAssignment()
if not a:
return False
if self.currentHit.plotted_at: # wait till plotter is done
return False
return True

View File

@ -21,6 +21,14 @@ import html
logger = logging.getLogger("sorteerhoed").getChild("webserver")
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime.datetime):
return o.isoformat(timespec='seconds')
return super().default(self, o)
class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler):
def set_extra_headers(self, path):
"""For subclass to add extra headers to the response"""
@ -36,7 +44,6 @@ class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler):
self.set_header("Content-Type", "image/svg+xml")
class WebSocketHandler(tornado.websocket.WebSocketHandler):
"""
Websocket from the workers
@ -66,12 +73,15 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
self.hit = self.store.currentHit
self.assignment_id = int(self.get_query_argument('assignment_id'))
self.assignment_id = str(self.get_query_argument('assignmentId'))
self.assignment = self.hit.getLastAssignment()
if self.assignment.assignment_id != self.assignment_id:
raise Exception(f"Opening websocket for invalid assignment {self.assignment_id}")
self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getHitTimeout())
if self.hit.submit_hit_at:
if self.hit.isSubmitted():
raise Exception("Opening websocket for already submitted hit")
#logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}")
@ -85,6 +95,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
if self.assignment_id != self.hit.getLastAssignment().assignment_id:
logger.critical(f"Skip message for non-last assignment {message}")
return
if datetime.datetime.now() > self.timeout:
logger.critical("Close websocket after timeout (abandon?)")
@ -93,7 +104,6 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
try:
msg = json.loads(message)
# TODO: sanitize input: min/max, limit strokes
if msg['action'] == 'move':
# TODO: min/max input
point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])]
@ -127,8 +137,8 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
self.write_message(json.dumps({
'action': 'submitted',
'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.hit.uuid}",
'code': str(self.hit.uuid)
'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.assignment.uuid}",
'code': str(self.assignment.uuid)
}))
self.close()
@ -136,8 +146,9 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
# not used, implicit in move?
pass
elif msg['action'] == 'info':
self.eventQ.put(Signal('hit.info', dict(
self.eventQ.put(Signal('assignment.info', dict(
hit_id=self.hit.id,
assignment_id=self.assignment_id,
resolution=msg['resolution'],
browser=msg['browser']
)))
@ -159,32 +170,35 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
if len(self.strokes) < 1:
return False
self.eventQ.put(Signal("server.submit", dict(hit_id = self.hit.id)))
self.eventQ.put(Signal("assignment.submit", dict(
hit_id = self.hit.id,
assignment_id=self.assignment_id)))
if self.config['dummy_plotter']:
d = strokes2D(self.strokes)
svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg viewBox="0 0 600 600"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:cc="http://creativecommons.org/ns#"
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg"
version="1.1"
>
<path d="{d}" style="stroke:black;stroke-width:2;fill:none;" />
</svg>
"""
filename = self.hit.getImagePath()
logger.info(f"Write to {filename}")
with open(filename, 'w') as fp:
fp.write(svg)
# deprecated: now done at scanner method:
# if self.config['dummy_plotter']:
# d = strokes2D(self.strokes)
# svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
# <svg viewBox="0 0 600 600"
# xmlns:dc="http://purl.org/dc/elements/1.1/"
# xmlns:cc="http://creativecommons.org/ns#"
# xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
# xmlns:svg="http://www.w3.org/2000/svg"
# xmlns="http://www.w3.org/2000/svg"
# version="1.1"
# >
# <path d="{d}" style="stroke:black;stroke-width:2;fill:none;" />
# </svg>
# """
#
# filename = self.hit.getImagePath()
# logger.info(f"Write to {filename}")
# with open(filename, 'w') as fp:
# fp.write(svg)
# we fake a hit.scanned event
self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id}))
# self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id}))
return self.hit.uuid
return self.assignment.uuid
@classmethod
def rmConnection(cls, client):
@ -196,11 +210,9 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
CORS_ORIGINS = ['localhost']
connections = set()
queue = queue.Queue()
def initialize(self, statusPage):
self.statusPage = statusPage
pass
def check_origin(self, origin):
parsed_origin = urlparse(origin)
@ -209,13 +221,9 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
return valid
# the client connected
def open(self, p = None):
def open(self):
self.__class__.connections.add(self)
for prop, value in self.statusPage.__dict__.items():
self.write_message(json.dumps({
'property': prop,
'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value
}))
self.write_message(json.dumps(self.statusPage.fetch(), cls=DateTimeEncoder))
# client disconnected
@ -230,13 +238,10 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
cls.connections.remove(client)
@classmethod
def update_for_all(cls, prop, value):
logger.debug(f"update for all {prop} {value}")
def update_for_all(cls, data):
logger.debug(f"update for all {data}")
for connection in cls.connections:
connection.write_message(json.dumps({
'property': prop,
'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value
}))
connection.write_message(json.dumps(data, cls=DateTimeEncoder))
def strokes2D(strokes):
# strokes to a d attribute for a path
@ -283,7 +288,7 @@ class DrawPageHandler(tornado.web.RequestHandler):
except Exception:
self.write("HIT not found")
else:
if hit.submit_page_at:
if hit.isSubmitted():
self.write("HIT already submitted")
return
@ -294,6 +299,15 @@ class DrawPageHandler(tornado.web.RequestHandler):
previewOnly = False
if assignmentId == 'ASSIGNMENT_ID_NOT_AVAILABLE':
previewOnly = True
if len(assignmentId) and not previewOnly:
# process/create assignment
assignment = self.store.currentHit.getAssignmentById(assignmentId)
if not assignment:
# new assignment
logger.warning(f"Create new assignment {assignmentId}")
assignment = self.store.newAssignment(self.store.currentHit, assignmentId)
self.store.saveAssignment(assignment)
previous_hit = self.store.getLastSubmittedHit()
if not previous_hit:
@ -330,7 +344,8 @@ class DrawPageHandler(tornado.web.RequestHandler):
self.eventQ.put(Signal('hit.assignment', dict(
hit_id=hit.id, ip=ip, assignment_id=assignmentId
)))
# self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, ip=ip)))
self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, ip=ip)))
try:
geoip = self.geoip_reader.country(ip)
@ -383,50 +398,35 @@ class StatusPage():
"""
Properties for on the status page, which are send over websockets the moment
they are altered.
"""
def __init__(self):
self.reset()
def reset(self):
logger.info("Resetting status")
self.hit_id = None
self.worker_id = None
self.ip = None
self.location = None
self.browser = None
self.os = None
self.resolution = None
self.state = None
self.fee = None
self.hit_created = None
self.hit_opened = None
self.hit_submitted = None
def clearAssignment(self):
logger.info("Resetting hit assignment")
self.worker_id = None
self.ip = None
self.location = None
self.browser = None
self.os = None
self.resolution = None
self.hit_created = None
def __setattr__(self, name, value):
if name in self.__dict__ and self.__dict__[name] == value:
logger.debug(f"Ignore setting status of {name}: it already is set to {value}")
return
self.__dict__[name] =value
logger.info(f"Update status: {name}: {value}")
"""
def __init__(self, store: HITStore):
self.store = store
self.store.registerUpdateHook(self)
def update(self, hit = None):
"""
Send the given HIT formatted to the websocket clients
If no hit is given, load the last 2 items
"""
if hit:
data = [hit.toDict()]
else:
hits = self.store.getNewestHits(2)
data = [hit.toDict() for hit in hits]
if Server.loop:
Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, name, value)
Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, data)
else:
logger.warn("Status: no server loop to call update command")
def set(self, name, value):
return self.__setattr__(name, value)
def fetch(self):
"""
Fetch latest, used on connection of status page
"""
hits = self.store.getNewestHits(2)
return [hit.toDict() for hit in hits]
@ -434,8 +434,6 @@ class Server:
"""
Server for HIT -> plotter events
As well as for the Status interface
TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image
"""
loop = None
@ -452,7 +450,7 @@ class Server:
self.server_loop = None
self.store = store
self.statusPage = StatusPage()
self.statusPage = StatusPage(store)
def start(self):
@ -489,6 +487,8 @@ class Server:
store = self.store,
path=self.web_root,
)),
(r"/frames/(.*)", StaticFileWithHeaderHandler,
{"path": 'scanimation/interfaces/frames'}),
(r"/(.*)", StaticFileWithHeaderHandler,
{"path": self.web_root}),
], debug=True, autoreload=False)

View File

@ -51,7 +51,7 @@ let draw = function(e) {
strokeEl.setAttribute('d', d);
}
console.log([pos['x'], pos['y']], isDrawing);
//console.log([pos['x'], pos['y']], isDrawing);
socket.send(JSON.stringify({
'action': 'move',
'direction': [pos['x'], pos['y']],