Compare commits

...

3 Commits

Author SHA1 Message Date
Ruben van de Ven b2626244a4 New HIT status page 2020-01-13 16:13:42 +01:00
Ruben van de Ven d3bf3d47ea Refactored to work with separate assignment table 2020-01-13 13:49:59 +01:00
mt ac04fb6082 WIP. assignment has separate table. and prepare for more advanced status display 2020-01-10 18:03:18 +01:00
8 changed files with 755 additions and 439 deletions

View File

@ -9,9 +9,6 @@ import datetime
from contextlib import contextmanager
import uuid
import os
import coloredlogs
import argparse
from sqlalchemy.sql.functions import func
mainLogger = logging.getLogger("sorteerhoed")
logger = mainLogger.getChild("store")
@ -32,7 +29,7 @@ submitted
Actions:
creating Hit (creating hit with scanned image)
Scanning
Scanning
"""
@ -42,51 +39,116 @@ class HIT(Base):
hit_id = Column(String(255)) # amazon's hit id
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.utcnow)
uuid = Column(String(32), default=lambda : uuid.uuid4().hex)
assignment_id = Column(String(255), default = None)
worker_id = Column(String(255), default = None)
accept_time = Column(DateTime, default=None)
open_page_at = Column(DateTime, default=None)
submit_page_at = Column(DateTime, default=None)
submit_hit_at = Column(DateTime, default=None)
answer = Column(String(255), default=None)
turk_ip = Column(String(255), default=None)
turk_country = Column(String(255), default=None)
turk_screen_width = Column(Integer, default = None)
turk_screen_height = Column(Integer, default = None)
scanned_at = Column(DateTime, default=None)
plotted_at = Column(DateTime, default=None)
deleted_at = Column(DateTime, default=None)
assignments = relationship("Assignment", back_populates="hit", order_by="Assignment.created_at")
fee = Column(Float(precision=2), default=None)
abandoned = False
# previous hit so we can load the corrent image
# previous_hit_id = Column(Integer, ForeignKey('hits.id'), default=None)
# previous_hit = relationship("HIT")
def getImagePath(self):
return os.path.join('scanimation/interfaces/frames', f"{self.id:06d}.jpg")
# def getImageUrl(self):
# return f"{self.id}.jpg"
def getImageUrl(self):
return os.path.join('/frames', f"{self.id:06d}.jpg")
def getSvgImageUrl(self):
return f"scans/{self.id:06d}.svg"
return f"/scans/{self.id:06d}.svg"
def getSvgImagePath(self):
return os.path.join('www', self.getSvgImageUrl())
# os.path.join on svgImageUrl leads to invalid absolute url
return os.path.join(f'www/scans/{self.id:06d}.svg')
def getLastAssignment(self):
if not len(self.assignments):
return None
return self.assignments[-1]
def getAssignmentById(self, assignmentId):
for a in self.assignments:
if a.assignment_id == assignmentId:
return a
return None
def getStatus(self):
assignment = self.getLastAssignment()
if self.deleted_at:
return "deleted"
if not self.hit_id:
return "creating"
if not assignment:
return "awaiting worker"
if self.scanned_at:
return "completed"
if self.submit_hit_at:
return "submission confirmed"
if self.submit_page_at:
return "submitted by worker"
if self.open_page_at:
return "working"
if self.accept_time:
return "accepted by worker"
# on abandon:
if self.worker_id:
return "abandoned by worker"
return "awaiting worker"
return "scanned"
return assignment.getStatus()
def toDict(self) -> dict:
values = {c.name: getattr(self, c.name) for c in self.__table__.columns}
assignment = self.getLastAssignment()
values['assignment'] = assignment.toDict() if assignment else None
values['state'] = self.getStatus()
values['scan_image'] = self.getImageUrl() if self.scanned_at else None
values['svg_image'] = self.getSvgImageUrl() if self.isSubmitted() else None
return values
def delete(self):
self.deleted_at = datetime.datetime.utcnow()
def isSubmitted(self) -> bool:
a = self.getLastAssignment()
if not a:
return False
return bool(a.submit_page_at)
def isConfirmed(self) -> bool:
a = self.getLastAssignment()
if not a:
return False
return bool(a.confirmed_at)
class Assignment(Base):
__tablename__ = 'assignments'
id = Column(Integer, Sequence('assignment_id'), primary_key=True) # our sequential hit id
assignment_id = Column(String(255)) # amazon's assignment id
hit_id = Column(Integer, ForeignKey('hits.id')) # our sequential hit id
hit = relationship("HIT", back_populates="assignments")
uuid = Column(String(32), default=lambda : uuid.uuid4().hex)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.utcnow)
assignment_id = Column(String(255), default = None)
worker_id = Column(String(255), default = None)
accept_at = Column(DateTime, default=None) # accept time acccording to SQS
# open_page_at = Column(DateTime, default=None)
submit_page_at = Column(DateTime, default=None) # Submit the page
confirmed_at = Column(DateTime, default=None) # validate with UUID when getting Message from Amazon
abandoned_at = Column(DateTime, default=None)
rejected_at = Column(DateTime, default=None)
answer = Column(String(255), default=None)
turk_ip = Column(String(255), default=None)
turk_country = Column(String(255), default=None)
turk_os = Column(String(255), default=None)
turk_browser = Column(String(255), default=None)
def getStatus(self):
if self.rejected_at:
return "rejected"
if self.abandoned_at:
return "abandoned"
if not self.submit_page_at:
return "working"
if not self.confirmed_at:
return "submitted"
return "confirmed"
def toDict(self) -> dict:
values = {c.name: getattr(self, c.name) for c in self.__table__.columns}
return values
class Store:
@ -94,14 +156,27 @@ class Store:
path = os.path.abspath(db_filename)
if logLevel <= logging.DEBUG:
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
self.engine = create_engine('sqlite:///'+path, echo=False, connect_args={'check_same_thread': False})
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self.session = self.Session()
self.currentHit = None # mirrors Centralmanagmenet, stored here so we can quickly access it from webserver classes
self.updateHooks = []
def registerUpdateHook(self, hook):
if hook not in self.updateHooks:
logger.info(f"Register update hook: {hook}")
self.updateHooks.append(hook)
def triggerUpdateHooks(self, hit = None):
for hook in self.updateHooks:
if callable(hook): # it's a method
hook(hit)
else: # assume it's an object
hook.update(hit)
@contextmanager
def getSession(self):
"""Provide a transactional scope around a series of operations."""
@ -111,77 +186,102 @@ class Store:
except:
self.session.rollback()
raise
def getHits(self, session):
return self.session.query(Source).order_by(HIT.created_at.desc())
def getHits(self):
return self.session.query(HIT).order_by(HIT.created_at.desc())
def getHitById(self, hitId):
return self.session.query(HIT).\
filter(HIT.id==hitId).one()
def getHitByRemoteId(self, amazonHitId):
return self.session.query(HIT).\
filter(HIT.hit_id==amazonHitId).one()
def getLastSubmittedHit(self):
return self.session.query(HIT).\
filter(HIT.submit_page_at!=None).\
order_by(HIT.submit_page_at.desc()).first()
join(Assignment).\
filter(Assignment.submit_page_at!=None).\
order_by(HIT.created_at.desc()).first()
def createHIT(self):
def getNewestHits(self, n = 2) -> list:
hits = list(
self.session.query(HIT).\
filter(HIT.deleted_at==None).\
order_by(HIT.created_at.desc()).limit(2)
)
# select DESC, because we want latest, then reverse list to get in right order
hits.reverse()
return hits
def createHIT(self) -> HIT:
with self.getSession() as s:
hit = HIT()
s.add(hit)
s.flush()
s.refresh(hit)
logger.info(f"Created HIT {hit.id}")
self.triggerUpdateHooks(hit)
return hit
def newAssignment(self, hit: HIT, assignmentId) -> Assignment:
# TODO: reset() central management if has pending lastAssignment()
with self.getSession() as s:
assignment = Assignment()
assignment.assignment_id = assignmentId
hit.assignments.append(assignment)
s.add(assignment)
s.flush()
s.refresh(hit)
logger.info(f"Created Assignment {assignment.id}")
self.triggerUpdateHooks(hit)
return assignment
def saveHIT(self, hit):
with self.getSession() as s:
logger.info(f"Updating hit! {hit.id}")
# s.flush()
def addHIT(self, hit: HIT):
self.triggerUpdateHooks(hit)
def saveAssignment(self, assignment):
with self.getSession() as s:
s.add(hit)
s.flush()
s.refresh(hit)
logger.info(f"Added {hit.id}")
logger.info(f"Updating assignment! {assignment.id}")
# s.flush()
self.triggerUpdateHooks(assignment.hit)
# def addHIT(self, hit: HIT):
# with self.getSession() as s:
# s.add(hit)
# s.flush()
# s.refresh(hit)
# logger.info(f"Added {hit.id}")
def getAvgDurationOfPreviousNHits(self, n) -> int:
latest_hits = self.session.query(HIT).\
filter(HIT.submit_hit_at!=None).\
filter(HIT.accept_time!=None).\
order_by(HIT.submit_hit_at.desc()).limit(n)
latest_assignments = self.session.query(Assignment).\
filter(Assignment.created_at!=None).\
filter(Assignment.submit_page_at!=None).\
order_by(Assignment.created_at.desc()).limit(n)
durations = []
for hit in latest_hits:
durations.append((hit.submit_hit_at - hit.accept_time).total_seconds())
for assignment in latest_assignments:
durations.append((assignment.submit_page_at - assignment.created_at).total_seconds())
if not len(durations):
return int(2.5*60)
return int(2.5*60) # default to 2.5 minutes
return int(sum(durations) / len(durations))
def getEstimatedHitDuration(self):
return self.getAvgDurationOfPreviousNHits(5)
def getHitTimeout(self):
return 160 # max(160, self.getAvgDurationOfPreviousNHits(5)*2)
def getHITs(self, n = 100):
return self.session.query(HIT).\
filter(HIT.submit_hit_at != None).\
order_by(HIT.submit_hit_at.desc()).limit(n)
# def rmSource(self, id: int):
# with self.getSession() as session:
# source = session.query(Source).get(id)
# if not source:
# logging.warning(f"Source nr {id} not found")
# else:
# logging.info(f"Deleting source {source.id}: {source.url}")
# session.delete(source)
#
# def getRandomNewsItem(self, session) -> NewsItem:
# return session.query(NewsItem).order_by(func.random()).limit(1).first()

View File

@ -17,10 +17,14 @@ from PIL import Image
import datetime
from shutil import copyfile
import colorsys
import tqdm
class Level(object):
# Level effect adapted from https://stackoverflow.com/a/3125421
"""
Level image effect adapted from https://stackoverflow.com/a/3125421
"""
def __init__(self, minv, maxv, gamma):
self.minv= minv/255.0
self.maxv= maxv/255.0
@ -76,10 +80,12 @@ class CentralManagement():
self.lastHitTime = None
self.eventQueue = Queue()
self.statusPageQueue = Queue()
self.isRunning = threading.Event()
self.isScanning = threading.Event()
self.scanLock = threading.Lock()
self.notPaused = threading.Event()
self.lightStatus = 0
def loadConfig(self, filename, args):
@ -93,6 +99,7 @@ class CentralManagement():
'hit_store.db'
)
self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO)
self.store.registerUpdateHook(self.updateLightHook) # change light based on status
self.logger.debug(f"Loaded configuration: {self.config}")
@ -141,6 +148,10 @@ class CentralManagement():
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
)
self.mturk.delete_hit(HITId=pending_hit['HITId'])
staleHit = self.store.getHitByRemoteId(pending_hit['HITId'])
staleHit.delete()
self.store.saveHIT(staleHit)
self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning)
sqsThread = threading.Thread(target=self.sqs.start, name='sqs')
@ -159,8 +170,6 @@ class CentralManagement():
# event listener:
dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher')
dispatcherThread.start()
#
#
self.eventQueue.put(Signal('start', {'ding':'test'}))
@ -176,8 +185,9 @@ class CentralManagement():
self.expireCurrentHit()
def expireCurrentHit(self):
if self.currentHit and self.currentHit.hit_id: # hit pending
self.logger.warn(f"Delete hit: {self.currentHit.hit_id}")
if self.currentHit and not self.currentHit.isConfirmed():
if self.currentHit.hit_id: # hit pending at Amazon
self.logger.warn(f"Expire hit: {self.currentHit.hit_id}")
self.mturk.update_expiration_for_hit(
HITId=self.currentHit.hit_id,
ExpireAt=datetime.datetime.fromisoformat('2015-01-01')
@ -186,7 +196,12 @@ class CentralManagement():
self.mturk.delete_hit(HITId=self.currentHit.hit_id)
except Exception as e:
self.logger.exception(e)
if not self.currentHit.isSubmitted():
self.currentHit.delete()
self.store.saveHIT(self.currentHit)
def eventListener(self):
while self.isRunning.is_set():
try:
@ -205,54 +220,89 @@ class CentralManagement():
- Plotter complete
-
"""
#TODO: make level debug()
self.logger.info(f"SIGNAL: {signal}")
if signal.name == 'start':
self.makeHit()
self.lastHitTime = datetime.datetime.now()
pass
elif signal.name == 'hit.scan':
# start a scan
if signal.params['id'] != self.currentHit.id:
self.logger.info(f"Hit.scan had wrong id: {signal}")
continue
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning'))
elif signal.name == 'hit.scanned':
# TODO: wrap up hit & make new HIT
if signal.params['hit_id'] != self.currentHit.id:
self.logger.info(f"Hit.scanned had wrong id: {signal}")
continue
self.currentHit.scanned_at = datetime.datetime.utcnow()
self.server.statusPage.set('state', self.currentHit.getStatus())
self.store.saveHIT(self.currentHit)
time_diff = datetime.datetime.now() - self.lastHitTime
to_wait = 10 - time_diff.total_seconds()
# self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan'))
if to_wait > 0:
self.logger.warn(f"Sleep until next hit: {to_wait}s")
time.sleep(to_wait)
else:
self.logger.info(f"No need to wait: {to_wait}s")
self.makeHit()
self.lastHitTime = datetime.datetime.now()
elif signal.name == 'hit.creating':
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit'))
pass
elif signal.name == 'hit.created':
# self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit'))
pass
elif signal.name == 'scan.start':
pass
elif signal.name == 'scan.finished':
# probably see hit.scanned
pass
elif signal.name == 'hit.info':
if signal.params['hit_id'] != self.currentHit.id:
self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}")
continue
for name, value in signal.params.items():
if name == 'hit_id':
continue
if name == 'ip':
self.currentHit.turk_ip = value
if name == 'location':
self.currentHit.turk_country = value
self.logger.debug(f'Set status: {name} to {value}')
self.server.statusPage.set(name, value)
elif signal.name == 'hit.assignment':
# Create new assignment
if signal.params['hit_id'] != self.currentHit.id:
continue
assignment = self.currentHit.getAssignmentById(signal.params['assignment_id'])
elif signal.name == 'assignment.info':
assignment = self.currentHit.getAssignmentById(signal.params['assignment_id'])
if not assignment:
self.logger.warning(f"assignment.info assignment.id not for current hit assignments: {signal}")
change = False
for name, value in signal.params.items():
if name == 'ip':
assignment.turk_ip = value
if name == 'location':
assignment.turk_country = value
if name == 'os':
assignment.turk_os = value
if name == 'browser':
assignment.turk_browser = value
change = True
self.logger.debug(f'Set assignment: {name} to {value}')
if change:
self.store.saveAssignment(assignment)
elif signal.name == 'server.open':
self.currentHit.open_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
self.setLight(True)
self.server.statusPage.set('state', self.currentHit.getStatus())
self.server.statusPage.set('hit_opened', self.currentHit.open_page_at)
elif signal.name == 'server.submit':
self.currentHit.submit_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
elif signal.name == 'assignment.submit':
a = self.currentHit.getLastAssignment()
if a.assignment_id != signal.params['assignment_id']:
self.logger.critical(f"Submit of invalid assignment_id: {signal}")
a.submit_page_at = datetime.datetime.utcnow()
self.store.saveAssignment(a)
self.plotter.park()
self.server.statusPage.set('hit_opened', self.currentHit.open_page_at)
# park always triggers a plotter.finished after being processed
elif signal.name[:4] == 'sqs.':
@ -263,49 +313,37 @@ class CentralManagement():
else:
sqsHit = self.currentHit
updateStatus = True
sqsAssignment = sqsHit.getAssignmentById(signal.params['event']['AssignmentId'])
if not sqsAssignment:
self.logger.critical(f"Invalid assignmentId given for hit: {signal.params['event']}")
continue
if signal.name == 'sqs.AssignmentAccepted':
self.logger.info(f'Set status progress to accepted')
sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsHit.worker_id = signal.params['event']['WorkerId']
if updateStatus:
self.server.statusPage.set('worker_id', sqsHit.worker_id)
sqsAssignment.accept_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsAssignment.worker_id = signal.params['event']['WorkerId']
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentAbandoned':
self.logger.info(f'Set status progress to abandoned')
#{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
sqsHit.accept_time = None
sqsHit.open_page_at = None
if self.currentHit.id == sqsHit.id:
if not sqsHit.submit_page_at:
self.reset()
else:
sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit
if updateStatus:
self.setLight(False)
sqsAssignment.abandoned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
# if updateStatus:
# self.setLight(False)
self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Accepted task without working on it.')
elif signal.name == 'sqs.AssignmentReturned':
self.logger.info(f'Set status progress to returned')
sqsHit.accept_time = None
sqsHit.open_page_at = None
if self.currentHit.id == sqsHit.id:
if not sqsHit.submit_page_at:
self.reset()
else:
sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit
if updateStatus:
self.setLight(False)
sqsAssignment.returned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
# {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}}
elif signal.name == 'sqs.AssignmentSubmitted':
# {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"<?xml version=\\"1.0\\" encoding=\\"ASCII\\"?><QuestionFormAnswers xmlns=\\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd\\"><Answer><QuestionIdentifier>surveycode<\\/QuestionIdentifier><FreeText>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'}
self.logger.info(f'Set status progress to submitted')
# TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid
sqsHit.answer = signal.params['event']['Answer']
if sqsHit.uuid not in sqsHit.answer:
self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}")
sqsAssignment.answer = signal.params['event']['Answer']
if sqsAssignment.uuid not in sqsAssignment.answer:
self.logger.critical(f"Not a valid answer given?! {sqsAssignment.answer}")
if not sqsHit.submit_page_at:
if not sqsAssignment.submit_page_at:
# page not submitted, hit is. Nevertheless, create new hit.
try:
self.mturk.reject_assignment(AssignmentId=signal.params['event']['AssignmentId'], RequesterFeedback='Did not do the assignment')
@ -313,7 +351,8 @@ class CentralManagement():
self.logger.exception(e)
self.makeHit()
else:
sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
sqsAssignment.confirmed_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ")
# block de worker na succesvolle submit, om dubbele workers te voorkomen
# TODO: Disabled after worker mail, use quals instead
#self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Every worker can only work once on the taks.')
@ -321,21 +360,19 @@ class CentralManagement():
self.store.saveHIT(sqsHit)
if updateStatus:
self.logger.warning(f'update status: {sqsHit.getStatus()}')
# TODO: have HITStore/HIT take care of this by emitting a signal
# only update status if it is the currentHit
self.server.statusPage.set('state', sqsHit.getStatus())
else:
self.logger.warning('DO NOT update status')
elif signal.name == 'plotter.finished':
if self.currentHit and self.currentHit.submit_page_at:
self.setLight(False)
# is _always_ triggered after submit due to plotter.park()
if self.currentHit and self.currentHit.isSubmitted():
self.currentHit.plotted_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit)
self.logger.info("Start scan thread")
scan = threading.Thread(target=self.scanImage, name='scan')
scan.start()
self.server.statusPage.set('hit_submitted', self.currentHit.submit_page_at)
self.server.statusPage.set('state', self.currentHit.getStatus())
elif signal.name == 'plotter.parked':
# should this have the code from plotter.finished?
pass
else:
self.logger.critical(f"Unknown signal: {signal.name}")
except Exception as e:
@ -343,20 +380,19 @@ class CentralManagement():
self.logger.exception(e)
def makeHit(self):
self.expireCurrentHit() # expire hit if it is there
self.expireCurrentHit() # expire hit if it is there
self.eventQueue.put(Signal('hit.creating', {'id': self.currentHit.id if self.currentHit else 'start'}))
self.server.statusPage.reset()
self.reloadConfig() # reload new config values if they are set
# self.notPaused.wait()
self.currentHit = self.store.createHIT()
self.store.currentHit = self.currentHit
self.logger.info(f"Make HIT {self.currentHit.id}")
# question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id))
question = '''<?xml version="1.0" encoding="UTF-8"?>
<ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd">
<ExternalURL>https://guest.rubenvandeven.com:8888/draw?id={HIT_NR}</ExternalURL>
@ -391,11 +427,12 @@ class CentralManagement():
self.currentHit.hit_id = new_hit['HIT']['HITId']
self.store.saveHIT(self.currentHit)
# TODO: have HITStore/HIT take care of this by emitting a signal
self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
self.server.statusPage.set('hit_created', self.currentHit.created_at)
self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}")
self.server.statusPage.set('state', self.currentHit.getStatus())
# self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
# self.server.statusPage.set('hit_created', self.currentHit.created_at)
# self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}")
# self.server.statusPage.set('state', self.currentHit.getStatus())
self.eventQueue.put(Signal('hit.created', {'id': self.currentHit.id, 'remote_id': self.currentHit.hit_id}))
# mturk.send_test_event_notification()
if self.config['amazon']['sqs_url']:
@ -432,8 +469,8 @@ class CentralManagement():
'sudo', 'scanimage', '-d', 'epkowa','--resolution=100',
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181),
'-y',str(245)
'-x',str(181),
'-y',str(245)
]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready:
@ -456,46 +493,62 @@ class CentralManagement():
"""
Run scanimage on scaner and returns a string with the filename
"""
cmd = [
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff',
'--resolution=100', # lower res, faster (more powerful) scan & wipe
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181),
'-y',str(245)
]
self.logger.info(f"{cmd}")
filename = self.currentHit.getImagePath()
with self.scanLock:
if self.config['dummy_plotter']:
self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.start'))
self.logger.warning("Fake scanner for a few seconds")
for i in tqdm.tqdm(range(5)):
time.sleep(1)
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))
return
cmd = [
'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff',
'--resolution=100', # lower res, faster (more powerful) scan & wipe
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181),
'-y',str(245)
]
self.logger.info(f"{cmd}")
filename = self.currentHit.getImagePath()
self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.start'))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready:
o, e = proc.communicate(80)
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
#TODO: should clear self.isRunning.clear() ?
try:
f = io.BytesIO(o)
img = Image.open(f)
img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM)
tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma'])
tunedImg.save(filename)
except Exception as e:
self.logger.critical("Cannot create image from scan. Did scanner work?")
self.logger.exception(e)
# TODO: create
copyfile('www/basic.svg', filename)
time.sleep(5) # sleep a few seconds for scanner to return to start position
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))
if e:
self.logger.critical(f"Scanner caused: {e.decode()}")
# Should this clear self.isRunning.clear() ?
try:
f = io.BytesIO(o)
img = Image.open(f)
img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM)
tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma'])
tunedImg.save(filename)
except Exception as e:
self.logger.critical("Cannot create image from scan. Did scanner work?")
self.logger.exception(e)
copyfile('www/basic.svg', filename)
time.sleep(5) # sleep a few seconds for scanner to return to start position
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished'))
def setLight(self, on):
value = 1 if on else 0
if self.lightStatus == value:
return
self.lightStatus = value
cmd = [
'usbrelay', f'HURTM_1={value}'
]
@ -503,3 +556,17 @@ class CentralManagement():
code = subprocess.call(cmd)
if code > 0:
self.logger.warning(f"Error on light change: {code}")
def updateLightHook(self, hit = None):
# ignore hit attribute, which comes from the HITstore
self.setLight(self.getLightStatus())
def getLightStatus(self) -> bool:
if not self.currentHit:
return False
a = self.currentHit.getLastAssignment()
if not a:
return False
if self.currentHit.plotted_at: # wait till plotter is done
return False
return True

View File

@ -0,0 +1,89 @@
import datetime
class State():
def __init__(self, hit_id):
self.time = datetime.datetime.now()
self.hit_id = params['hit_id']
def transition(self, transitionName, params = {}):
raise Exception("Not implemented")
class StateMachine:
def __init__(self, initalState):
self.history = [('init',initialState)]
def current(self):
return self.history[-1][1]
def transition(self, transitionName, params):
# TODO: update Store & Interface
if transitionName not in self.current().availableTransitions:
raise Exception("Invalid transition")
newState = self.current().transition(transitionName, params)
if not newState:
raise RuntimeException(f"Invalid transition {transitionName} for {self.current()}")
self.history.append((transitionName, newState))
def getStateForHit(self, hit_id, stateCls = None):
states = [s for s in self.history if s[1].hit_id == hit_id and (stateCls is None or isinstance(s[1], stateCls))]
if len(states < 1):
return None
return states[-1]
class HITCreated(State):
availableTransitions = ['accept']
self.state = None
self.fee = None
self.hit_created = None
self.hit_opened = None
self.hit_submitted = None
def transition(self, transitionName, params = {}):
if transitionName == 'accept':
return HITAssigned(params['hit_id'], params['assignment_id'])
class HITAssigned(State):
availableTransitions = ['reject', 'abandon', 'submit']
def __init__(self, hit_id):
self.assignment_id = None
self.worker_id = None
self.ip = None
self.location = None
self.browser = None
self.os = None
self.resolution = None
def transition(self, transitionName, params = {}):
if transitionName == 'reject' or transitionName == 'abandon':
return HITAbandonedRejected(params['hit_id'])
if transitionName == 'submit':
return HITSubmitted(params['hit_id'])
class HITAbandonedRejected(State):
availableTransitions = ['accept']
def transition(self, transitionName, params = {}):
if transitionName == 'accept':
return HITAssigned(params['hit_id'])
class HITSubmitted(State):
availableTransitions = ['scan']
def transition(self, transitionName, params = {}):
if transitionName == 'scan':
return Scanning(params['hit_id'])
class Scanning(State):
availableTransitions = ['scan_complete', 'scan_failed']
def transition(self, transitionName, params = {}):
if transitionName == 'scan_complete':
return ImageAvailable(params['hit_id'])
if transitionName == 'scan_failed':
raise Exception("Scan failed, unknown state")
class ImageAvailable(State):
availableTransitions = ['create_hit']
def transition(self, transitionName, params = {}):
if transitionName == 'create_hit':
return HITCreated(params['hit_id'])

View File

@ -21,6 +21,14 @@ import html
logger = logging.getLogger("sorteerhoed").getChild("webserver")
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime.datetime):
return o.isoformat(timespec='seconds')
return super().default(self, o)
class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler):
def set_extra_headers(self, path):
"""For subclass to add extra headers to the response"""
@ -34,13 +42,15 @@ class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler):
print(mime)
if mime == 'image/svg+xml':
self.set_header("Content-Type", "image/svg+xml")
class WebSocketHandler(tornado.websocket.WebSocketHandler):
"""
Websocket from the workers
"""
CORS_ORIGINS = ['localhost', '.mturk.com', 'here.rubenvandeven.com', 'guest.rubenvandeven.com']
connections = set()
def initialize(self, config, plotterQ: Queue, eventQ: Queue, store: HITStore):
self.config = config
self.plotterQ = plotterQ
@ -60,49 +70,51 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
if hit_id != self.store.currentHit.id:
self.close()
return
self.hit = self.store.currentHit
self.assignment_id = str(self.get_query_argument('assignmentId'))
self.assignment = self.hit.getLastAssignment()
if self.assignment.assignment_id != self.assignment_id:
raise Exception(f"Opening websocket for invalid assignment {self.assignment_id}")
self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getHitTimeout())
if self.hit.submit_hit_at:
if self.hit.isSubmitted():
raise Exception("Opening websocket for already submitted hit")
#logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}")
self.eventQ.put(Signal('server.open', dict(hit_id=self.hit.id)))
self.eventQ.put(Signal('server.open', dict(assignment_id=self.assignment_id)))
self.strokes = []
# Gather some initial information:
ua = self.request.headers.get('User-Agent', None)
if ua:
ua_info = httpagentparser.detect(ua)
self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, os=ua_info['os']['name'], browser=ua_info['browser']['name'])))
# self.write_message("hello!")
# the client sent the message
def on_message(self, message):
logger.debug(f"recieve: {message}")
if self.assignment_id != self.hit.getLastAssignment().assignment_id:
logger.critical(f"Skip message for non-last assignment {message}")
return
if datetime.datetime.now() > self.timeout:
logger.critical("Close websocket after timeout (abandon?)")
self.close()
return
try:
msg = json.loads(message)
# TODO: sanitize input: min/max, limit strokes
if msg['action'] == 'move':
# TODO: min/max input
point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])]
self.strokes.append(point)
self.plotterQ.put(point)
elif msg['action'] == 'up':
logger.info(f'up: {msg}')
point = [msg['direction'][0],msg['direction'][1], 1]
self.strokes.append(point)
elif msg['action'] == 'submit':
logger.info(f'submit: {msg}')
id = self.submit_strokes()
@ -112,30 +124,31 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
#store svg:
d = html.escape(msg['d'])
svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg
<svg
xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg"
version="1.0" viewBox="0 0 {self.config['scanner']['width']}0 {self.config['scanner']['height']}0" width="{self.config['scanner']['width']}mm" height="{self.config['scanner']['height']}mm" preserveAspectRatio="none">
<path d="{d}" style='stroke:gray;stroke-width:2mm;fill:none;' id="stroke" />
</svg>
"""
with open(self.store.currentHit.getSvgImagePath(), 'w') as fp:
fp.write(svg)
self.write_message(json.dumps({
'action': 'submitted',
'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.hit.uuid}",
'code': str(self.hit.uuid)
'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.assignment.uuid}",
'code': str(self.assignment.uuid)
}))
self.close()
elif msg['action'] == 'down':
# not used, implicit in move?
pass
elif msg['action'] == 'info':
self.eventQ.put(Signal('hit.info', dict(
self.eventQ.put(Signal('assignment.info', dict(
hit_id=self.hit.id,
assignment_id=self.assignment_id,
resolution=msg['resolution'],
browser=msg['browser']
)))
@ -152,37 +165,41 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
def on_close(self):
self.__class__.rmConnection(self)
logger.info(f"Client disconnected: {self.request.remote_ip}")
# TODO: abandon assignment??
def submit_strokes(self):
if len(self.strokes) < 1:
return False
self.eventQ.put(Signal("server.submit", dict(hit_id = self.hit.id)))
if self.config['dummy_plotter']:
d = strokes2D(self.strokes)
svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg viewBox="0 0 600 600"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:cc="http://creativecommons.org/ns#"
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg"
version="1.1"
>
<path d="{d}" style="stroke:black;stroke-width:2;fill:none;" />
</svg>
"""
filename = self.hit.getImagePath()
logger.info(f"Write to {filename}")
with open(filename, 'w') as fp:
fp.write(svg)
self.eventQ.put(Signal("assignment.submit", dict(
hit_id = self.hit.id,
assignment_id=self.assignment_id)))
# deprecated: now done at scanner method:
# if self.config['dummy_plotter']:
# d = strokes2D(self.strokes)
# svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
# <svg viewBox="0 0 600 600"
# xmlns:dc="http://purl.org/dc/elements/1.1/"
# xmlns:cc="http://creativecommons.org/ns#"
# xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
# xmlns:svg="http://www.w3.org/2000/svg"
# xmlns="http://www.w3.org/2000/svg"
# version="1.1"
# >
# <path d="{d}" style="stroke:black;stroke-width:2;fill:none;" />
# </svg>
# """
#
# filename = self.hit.getImagePath()
# logger.info(f"Write to {filename}")
# with open(filename, 'w') as fp:
# fp.write(svg)
# we fake a hit.scanned event
self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id}))
return self.hit.uuid
# self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id}))
return self.assignment.uuid
@classmethod
def rmConnection(cls, client):
@ -194,11 +211,9 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
CORS_ORIGINS = ['localhost']
connections = set()
queue = queue.Queue()
def initialize(self, statusPage):
self.statusPage = statusPage
pass
def check_origin(self, origin):
parsed_origin = urlparse(origin)
@ -207,35 +222,28 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
return valid
# the client connected
def open(self, p = None):
def open(self):
self.__class__.connections.add(self)
for prop, value in self.statusPage.__dict__.items():
self.write_message(json.dumps({
'property': prop,
'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value
}))
self.write_message(json.dumps(self.statusPage.fetch(), cls=DateTimeEncoder))
# client disconnected
def on_close(self):
self.__class__.rmConnection(self)
logger.info(f"Client disconnected: {self.request.remote_ip}")
@classmethod
def rmConnection(cls, client):
if client not in cls.connections:
return
cls.connections.remove(client)
@classmethod
def update_for_all(cls, prop, value):
logger.debug(f"update for all {prop} {value}")
def update_for_all(cls, data):
logger.debug(f"update for all {data}")
for connection in cls.connections:
connection.write_message(json.dumps({
'property': prop,
'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value
}))
connection.write_message(json.dumps(data, cls=DateTimeEncoder))
def strokes2D(strokes):
# strokes to a d attribute for a path
d = "";
@ -252,7 +260,7 @@ def strokes2D(strokes):
elif cmd != 'l':
d+=' l '
cmd = 'l'
rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]];
d += f"{rel_stroke[0]},{rel_stroke[1]} "
last_stroke = stroke;
@ -270,7 +278,7 @@ class DrawPageHandler(tornado.web.RequestHandler):
self.left_padding = left_padding
self.eventQ = eventQ
self.geoip_reader = geoip_reader
def get(self):
try:
hit_id = int(self.get_query_argument('id'))
@ -281,18 +289,27 @@ class DrawPageHandler(tornado.web.RequestHandler):
except Exception:
self.write("HIT not found")
else:
if hit.submit_page_at:
if hit.isSubmitted():
self.write("HIT already submitted")
return
assignmentId = self.get_query_argument('assignmentId', '')
if len(assignmentId) < 1:
logger.critical("Accessing page without assignment id. Allowing it for debug purposes... fingers crossed?")
previewOnly = False
if assignmentId == 'ASSIGNMENT_ID_NOT_AVAILABLE':
previewOnly = True
if len(assignmentId) and not previewOnly:
# process/create assignment
assignment = self.store.currentHit.getAssignmentById(assignmentId)
if not assignment:
# new assignment
logger.warning(f"Create new assignment {assignmentId}")
assignment = self.store.newAssignment(self.store.currentHit, assignmentId)
self.store.saveAssignment(assignment)
previous_hit = self.store.getLastSubmittedHit()
if not previous_hit:
# start with basic svg
@ -301,7 +318,7 @@ class DrawPageHandler(tornado.web.RequestHandler):
else:
image = previous_hit.getSvgImageUrl()
logger.info(f"Image url: {image}")
self.set_header("Access-Control-Allow-Origin", "*")
contents = open(os.path.join(self.path, 'index.html'), 'r').read()
contents = contents.replace("{IMAGE_URL}", image)\
@ -313,34 +330,43 @@ class DrawPageHandler(tornado.web.RequestHandler):
.replace("{LEFT_PADDING}", str(self.left_padding))\
.replace("{SCRIPT}", '' if previewOnly else '<script type="text/javascript" src="/assignment.js"></script>')\
.replace("{ASSIGNMENT}", '' if previewOnly else str(assignmentId)) # TODO: fix unsafe inserting of GET variable
self.write(contents)
if 'X-Forwarded-For' in self.request.headers:
ip = self.request.headers['X-Forwarded-For']
else:
ip = self.request.remote_ip
logger.info(f"Request from {ip}")
if not previewOnly:
self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, ip=ip)))
self.eventQ.put(Signal('hit.assignment', dict(
hit_id=hit.id, ip=ip, assignment_id=assignmentId
)))
self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, ip=ip)))
try:
geoip = self.geoip_reader.country(ip)
logger.debug(f"Geo {geoip}")
self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, location=geoip.country.name)))
self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, location=geoip.country.name)))
except Exception as e:
logger.exception(e)
logger.info("No geo IP possible")
self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, location='Unknown')))
self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, location='Unknown')))
ua = self.request.headers.get('User-Agent', None)
if ua:
ua_info = httpagentparser.detect(ua)
self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, os=ua_info['os']['name'], browser=ua_info['browser']['name'])))
class BackendHandler(tornado.web.RequestHandler):
def initialize(self, store: HITStore, path: str):
self.store = store
self.path = path
def get(self):
rows = []
for hit in self.store.getHITs(100):
@ -351,9 +377,9 @@ class BackendHandler(tornado.web.RequestHandler):
duration = (f"{duration_m}m" if duration_m else "") + f"{duration_s:02d}s"
else:
duration = "-"
fee = f"${hit.fee:.2}" if hit.fee else "-"
rows.append(
f"""
<tr><td></td><td>{hit.worker_id}</td>
@ -363,8 +389,8 @@ class BackendHandler(tornado.web.RequestHandler):
<td>{hit.accept_time}</td>
<td>{duration}</td><td></td>
"""
)
)
contents = open(os.path.join(self.path, 'backend.html'), 'r').read()
contents = contents.replace("{{TBODY}}", "".join(rows))
self.write(contents)
@ -373,82 +399,67 @@ class StatusPage():
"""
Properties for on the status page, which are send over websockets the moment
they are altered.
"""
def __init__(self):
self.reset()
def reset(self):
logger.info("Resetting status")
self.hit_id = None
self.worker_id = None
self.ip = None
self.location = None
self.browser = None
self.os = None
self.resolution = None
self.state = None
self.fee = None
self.hit_created = None
self.hit_opened = None
self.hit_submitted = None
"""
def clearAssignment(self):
logger.info("Resetting hit assignment")
self.worker_id = None
self.ip = None
self.location = None
self.browser = None
self.os = None
self.resolution = None
self.hit_created = None
def __init__(self, store: HITStore):
self.store = store
self.store.registerUpdateHook(self)
def __setattr__(self, name, value):
if name in self.__dict__ and self.__dict__[name] == value:
logger.debug(f"Ignore setting status of {name}: it already is set to {value}")
return
def update(self, hit = None):
"""
Send the given HIT formatted to the websocket clients
self.__dict__[name] =value
logger.info(f"Update status: {name}: {value}")
If no hit is given, load the last 2 items
"""
if hit:
data = [hit.toDict()]
else:
hits = self.store.getNewestHits(2)
data = [hit.toDict() for hit in hits]
if Server.loop:
Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, name, value)
Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, data)
else:
logger.warn("Status: no server loop to call update command")
def set(self, name, value):
return self.__setattr__(name, value)
def fetch(self):
"""
Fetch latest, used on connection of status page
"""
hits = self.store.getNewestHits(2)
return [hit.toDict() for hit in hits]
class Server:
"""
Server for HIT -> plotter events
As well as for the Status interface
TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image
"""
loop = None
def __init__(self, config, eventQ: Queue, runningEvent: Event, plotterQ: Queue, store: HITStore):
self.isRunning = runningEvent
self.eventQ = eventQ
self.config = config
self.logger = logger
self.plotterQ = plotterQ # communicate directly to plotter (skip main thread)
#self.config['server']['port']
self.web_root = os.path.join('www')
self.server_loop = None
self.store = store
self.statusPage = StatusPage()
self.statusPage = StatusPage(store)
def start(self):
if not os.path.exists('GeoLite2-Country.mmdb'):
raise Exception("Please download the GeoLite2 Country database and place the 'GeoLite2-Country.mmdb' file in the project root.")
self.geoip_reader = geoip2.database.Reader('GeoLite2-Country.mmdb')
try:
asyncio.set_event_loop(asyncio.new_event_loop())
application = tornado.web.Application([
@ -462,7 +473,7 @@ class Server:
(r"/draw", DrawPageHandler,
dict(
store = self.store,
eventQ = self.eventQ,
eventQ = self.eventQ,
path=self.web_root,
width=self.config['scanner']['width'],
height=self.config['scanner']['height'],
@ -477,6 +488,8 @@ class Server:
store = self.store,
path=self.web_root,
)),
(r"/frames/(.*)", StaticFileWithHeaderHandler,
{"path": 'scanimation/interfaces/frames'}),
(r"/(.*)", StaticFileWithHeaderHandler,
{"path": self.web_root}),
], debug=True, autoreload=False)
@ -488,12 +501,12 @@ class Server:
finally:
self.logger.info("Stopping webserver")
self.isRunning.clear()
def stop(self):
if self.server_loop:
self.logger.debug("Got call to stop")
self.server_loop.asyncio_loop.call_soon_threadsafe(self._stop)
def _stop(self):
self.server_loop.stop()

View File

@ -51,7 +51,7 @@ let draw = function(e) {
strokeEl.setAttribute('d', d);
}
console.log([pos['x'], pos['y']], isDrawing);
//console.log([pos['x'], pos['y']], isDrawing);
socket.send(JSON.stringify({
'action': 'move',
'direction': [pos['x'], pos['y']],

View File

@ -5,11 +5,70 @@
<link rel="stylesheet" type="text/css" href="style.css" />
<script src='dateformat.js'></script>
<script src='reconnecting-websocket.min.js'></script>
<script src="https://cdn.jsdelivr.net/npm/vue/dist/vue.js"></script>
</head>
<body>
<div id="wrapper">
<div class='hit' v-for="(hit, hitId) in hits"
:class="[{'hugvey--on': hit.status != 'off'},'hugvey--' + hit.status]"
>
<div class='transition'>
<p v-if="hit.hit_id">Created HIT at {{hit.created_at}}</p>
<p v-else>Creating HIT</p>
</div>
<div class='state' v-if="hit.hit_id">
<h2>Human intelligence task</h2>
<p>{{hit.hit_id}}</p>
<p>Description</p>
</div>
<div class='transition' v-if="hit.hit_id">
<p v-if="!hit.assignment">Wait for human</p>
<template v-else>
<p v-if="hit.assignment.returned_at || hit.assignment.abandoned_at">Asignment abandoned</p>
<p v-else>Assignment accepted</p>
</template>
</div>
<template v-if="hit.assignment">
<div class='state' v-if="hit.assignment"
:class="[{'assignment-hidden': hit.assignment.returned_at || hit.assignment.abandoned_at}]">
<h2>Worker {{hit.assignment.worker_id}}</h2>
<p>{{hit.assignment.turk_browser}} / {{hit.assignment.turk_ip}} / {{hit.assignment.turk_country}} / {{hit.assignment.turk_os}}</p>
</div>
<div class='transition' v-if="hit.assignment.submit_page_at">
<p v-if="!hit.assignment.confirmed_at">Assignment submitted</p>
<p v-else>Confirmed submission</p> <!-- Validated submitted code through SQSmessage -->
</div>
<div class='state' v-if="hit.assignment.submit_page_at">
result: (add duration)
<img :src="hit.svg_image">
</div>
<div class='transition' v-if="hit.assignment.submit_page_at">
<p v-if="!hit.scanned_at">Scanning</p>
<p v-else>Scanned at {{hit.scanned_at}}</p>
</div>
<div class='state' v-if="hit.scanned_at">
scan:
<img :src="hit.scan_image">
</div>
</template>
<!-- <div class="phase" id="waiting_for_human">
<span class="narrative_phase_text">waiting for human worker to accept task</span>
</div>
@ -17,10 +76,10 @@
<div class="phase" id="human_accepted_task">
<span class="narrative_phase_text">task accepted by human worker</span>
</div> -->
<!--
<div class="phase" id="worker_specs">
<span class="grid-item spec_name" id="hit_id_descriptor">human intelligent task id</span>
<span class="grid-item spec_value" id="hit_id">&nbsp;</span>
<span class="grid-item spec_value" id="hit_id">{{hit.id}}</span>
<span class="grid-item spec_name" id="worker_id_descriptor">human worker id</span>
<span class="grid-item spec_value" id="worker_id">&nbsp;</span>
@ -43,11 +102,10 @@
<span class="grid-item spec_name" id="elapsed_time_descriptor">time elapsed</span>
<span class="grid-item spec_value" id="elapsed_time">&nbsp;</span>
</div>
</div>-->
</div>

View File

@ -1,38 +1,22 @@
// DOM STUFF ///////////////////////////////////////////////////////////////////
let divs = {},
spec_names = [
'worker_id',
'ip',
'location',
'browser',
'os',
'state',
'fee',
'hit_created',
'hit_opened',
'hit_submitted',
'elapsed_time',
'hit_id'
]
divs.linkDOM = function(name){
divs[name] = document.getElementById(`${name}`)
}
spec_names.forEach(function(name){
divs.linkDOM(name)
var app = new Vue({
el: '#wrapper',
data: {
message: 'Hello Vue!',
hits: {
}
},
// watch: {
// hits: {
// deep: true
// }
// }
})
let request_time = timeStamp(),
hit_started = false,
elapsed_time,
hit_finished = false
// SOCKET STUFF ////////////////////////////////////////////////////////////////
@ -43,76 +27,29 @@ ws.addEventListener('open', () => {
// ws.send('hi server')
})
ws.addEventListener('message', (event) => {
console.log('message: ' + event.data)
let data = JSON.parse(event.data)
if(data.property === 'hit_opened') {
if(data.value != null){
hit_started = true
hit_finished = false
request_time = new Date()
divs[data.property].innerHTML = `${request_time.format('dd mmm HH:MM:ss')}`
}else{
divs[data.property].innerHTML = `&mdash;`
hit_started = false
}
let hits = JSON.parse(event.data)
let a = {};
for(let hitid in app.hits) {
a[hitid] = app.hits[hitid];
}
else if(data.property === 'hit_submitted'){
hit_finished = true;
}
else if(divs[data.property]){
data.value === null ? divs[data.property].innerHTML = `&mdash;` : divs[data.property].innerHTML = `${data.value}`
for(let hit of hits){
a[hit.id] = hit;
}
app.hits = a;
})
// ANIMATION STUFF /////////////////////////////////////////////////////////////
let frames,
frames_per_sec = 10,
current_frame = 0
function makeAnimation(){
let now,
delta = 0,
last = timeStamp(),
step = 1/frames_per_sec
function frame() {
now = timeStamp()
delta += Math.min(1, (now - last) / 1000)
while(delta > step){
delta -= step
update(step)
}
last = now
requestAnimationFrame(frame)
}
requestAnimationFrame(frame)
}
function update(step){
if(!hit_finished) elapsed_time = `${new Date((Date.now() - request_time)).format('MM"m "ss"s"')}`
if(hit_started){
divs['elapsed_time'].innerHTML = elapsed_time
}else{
divs['elapsed_time'].innerHTML = `&mdash;`
}
}
makeAnimation()
function timeStamp(){return window.performance && window.performance.now ? window.performance.now() : new Date().getTime()}
//
//function update(step){
//
// if(!hit_finished) elapsed_time = `${new Date((Date.now() - request_time)).format('MM"m "ss"s"')}`
// if(hit_started){
// divs['elapsed_time'].innerHTML = elapsed_time
// }else{
// divs['elapsed_time'].innerHTML = `&mdash;`
// }
//}

View File

@ -47,9 +47,9 @@ html, body{
position: absolute;
left: var(--pos-x);
top: var(--pos-y);
bottom: calc(100vh - (var(--pos-y) + var(--height)));
width: var(--width);
height: var(--height);
height: auto;
background: var(--alt-color);
box-sizing: border-box;
@ -57,6 +57,57 @@ html, body{
}
#wrapper .hit{
display:block;
}
.transition{
overflow:hidden;
position:relative;
padding-left: calc(50%);
font-size: 12px;
height: 40px;
animation-duration: 3s;
animation-name: slidein;
}
.transition::before{
content:'⇩'; /*'⇓';*/
display:block;
position:absolute;
font-family:monospace;
font-size: 100px;
top:0;
left:calc(50% - 30px);
line-height:0;
}
.state{
overflow:hidden;
border:solid 1px black;
animation-duration: 3s;
animation-name: slidein;
transition: max-height 1s;
max-height: 200px;
}
.state.assignment-hidden {
/*On abandon etc.*/
max-height: 0px;
}
@keyframes slidein {
from {
max-height:0;
}
to {
max-height: 200px;
}
}
/*
#worker_specs{
display:grid;
grid-template-columns: 1fr ;
@ -97,3 +148,4 @@ html, body{
position: relative;
top: 5px;
}
*/