From d3bf3d47ea49309881abf10070cd113dd44ebf7a Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Mon, 13 Jan 2020 13:49:59 +0100 Subject: [PATCH] Refactored to work with separate assignment table --- sorteerhoed/HITStore.py | 163 ++++++++++++++------ sorteerhoed/central_management.py | 243 +++++++++++++++++------------- sorteerhoed/webserver.py | 182 +++++++++++----------- www/assignment.js | 2 +- 4 files changed, 345 insertions(+), 245 deletions(-) diff --git a/sorteerhoed/HITStore.py b/sorteerhoed/HITStore.py index 28b8397..3fc48e4 100644 --- a/sorteerhoed/HITStore.py +++ b/sorteerhoed/HITStore.py @@ -9,9 +9,6 @@ import datetime from contextlib import contextmanager import uuid import os -import coloredlogs -import argparse -from sqlalchemy.sql.functions import func mainLogger = logging.getLogger("sorteerhoed") logger = mainLogger.getChild("store") @@ -43,6 +40,7 @@ class HIT(Base): created_at = Column(DateTime, default=datetime.datetime.utcnow) updated_at = Column(DateTime, default=datetime.datetime.utcnow) scanned_at = Column(DateTime, default=None) + plotted_at = Column(DateTime, default=None) deleted_at = Column(DateTime, default=None) assignments = relationship("Assignment", back_populates="hit", order_by="Assignment.created_at") fee = Column(Float(precision=2), default=None) @@ -53,6 +51,9 @@ class HIT(Base): def getImagePath(self): return os.path.join('scanimation/interfaces/frames', f"{self.id:06d}.jpg") + + def getImageUrl(self): + return os.path.join('/frames', f"{self.id:06d}.jpg") def getSvgImageUrl(self): return f"scans/{self.id:06d}.svg" @@ -68,23 +69,44 @@ class HIT(Base): def getAssignmentById(self, assignmentId): for a in self.assignments: if a.assignment_id == assignmentId: - return + return a + return None def getStatus(self): + assignment = self.getLastAssignment() + if self.deleted_at: + return "deleted" + if not self.hit_id: + return "creating" + if not assignment: + return "awaiting worker" if self.scanned_at: - return "completed" - if self.submit_hit_at: - return "submission confirmed" - if self.submit_page_at: - return "submitted by worker" - if self.open_page_at: - return "working" - if self.accept_time: - return "accepted by worker" - # on abandon: - if self.worker_id: - return "abandoned by worker" - return "awaiting worker" + return "scanned" + return assignment.getStatus() + + def toDict(self) -> dict: + values = {c.name: getattr(self, c.name) for c in self.__table__.columns} + assignment = self.getLastAssignment() + values['assignment'] = assignment.toDict() if assignment else None + values['state'] = self.getStatus() + values['scan_image'] = self.getImageUrl() if self.scanned_at else None + values['svg_image'] = self.getSvgImageUrl() if self.isSubmitted() else None + return values + + def delete(self): + self.deleted_at = datetime.datetime.utcnow() + + def isSubmitted(self) -> bool: + a = self.getLastAssignment() + if not a: + return False + return bool(a.submit_page_at) + + def isConfirmed(self) -> bool: + a = self.getLastAssignment() + if not a: + return False + return bool(a.confirmed_at) class Assignment(Base): __tablename__ = 'assignments' @@ -99,7 +121,7 @@ class Assignment(Base): assignment_id = Column(String(255), default = None) worker_id = Column(String(255), default = None) - accept_at = Column(DateTime, default=None) + accept_at = Column(DateTime, default=None) # accept time acccording to SQS # open_page_at = Column(DateTime, default=None) submit_page_at = Column(DateTime, default=None) # Submit the page confirmed_at = Column(DateTime, default=None) # validate with UUID when getting Message from Amazon @@ -109,6 +131,24 @@ class Assignment(Base): answer = Column(String(255), default=None) turk_ip = Column(String(255), default=None) turk_country = Column(String(255), default=None) + turk_os = Column(String(255), default=None) + turk_browser = Column(String(255), default=None) + + def getStatus(self): + if self.rejected_at: + return "rejected" + if self.abandoned_at: + return "abandoned" + if not self.submit_page_at: + return "working" + if not self.confirmed_at: + return "submitted" + return "confirmed" + + def toDict(self) -> dict: + values = {c.name: getattr(self, c.name) for c in self.__table__.columns} + return values + class Store: def __init__(self, db_filename, logLevel=0): @@ -122,6 +162,19 @@ class Store: self.session = self.Session() self.currentHit = None # mirrors Centralmanagmenet, stored here so we can quickly access it from webserver classes + self.updateHooks = [] + + def registerUpdateHook(self, hook): + if hook not in self.updateHooks: + logger.info(f"Register update hook: {hook}") + self.updateHooks.append(hook) + + def triggerUpdateHooks(self, hit = None): + for hook in self.updateHooks: + if callable(hook): # it's a method + hook(hit) + else: # assume it's an object + hook.update(hit) @contextmanager def getSession(self): @@ -133,8 +186,8 @@ class Store: self.session.rollback() raise - def getHits(self, session): - return self.session.query(Source).order_by(HIT.created_at.desc()) + def getHits(self): + return self.session.query(HIT).order_by(HIT.created_at.desc()) def getHitById(self, hitId): return self.session.query(HIT).\ @@ -147,8 +200,19 @@ class Store: def getLastSubmittedHit(self): return self.session.query(HIT).\ - filter(HIT.submit_page_at!=None).\ - order_by(HIT.submit_page_at.desc()).first() + join(Assignment).\ + filter(Assignment.submit_page_at!=None).\ + order_by(HIT.created_at.desc()).first() + + def getNewestHits(self, n = 2) -> list: + hits = list( + self.session.query(HIT).\ + filter(HIT.deleted_at==None).\ + order_by(HIT.created_at.desc()).limit(2) + ) + # select DESC, because we want latest, then reverse list to get in right order + hits.reverse() + return hits def createHIT(self) -> HIT: with self.getSession() as s: @@ -157,40 +221,57 @@ class Store: s.flush() s.refresh(hit) logger.info(f"Created HIT {hit.id}") + + self.triggerUpdateHooks(hit) + return hit - def newAssignment(self, hit: HIT) -> Assignment: + def newAssignment(self, hit: HIT, assignmentId) -> Assignment: + # TODO: reset() central management if has pending lastAssignment() + with self.getSession() as s: assignment = Assignment() + assignment.assignment_id = assignmentId hit.assignments.append(assignment) s.add(assignment) s.flush() s.refresh(hit) logger.info(f"Created Assignment {assignment.id}") + + self.triggerUpdateHooks(hit) + return assignment def saveHIT(self, hit): with self.getSession() as s: logger.info(f"Updating hit! {hit.id}") # s.flush() + self.triggerUpdateHooks(hit) - def addHIT(self, hit: HIT): + def saveAssignment(self, assignment): with self.getSession() as s: - s.add(hit) - s.flush() - s.refresh(hit) - logger.info(f"Added {hit.id}") + logger.info(f"Updating assignment! {assignment.id}") +# s.flush() + self.triggerUpdateHooks(assignment.hit) + +# def addHIT(self, hit: HIT): +# with self.getSession() as s: +# s.add(hit) +# s.flush() +# s.refresh(hit) +# logger.info(f"Added {hit.id}") def getAvgDurationOfPreviousNHits(self, n) -> int: - latest_hits = self.session.query(HIT).\ - filter(HIT.submit_hit_at!=None).\ - filter(HIT.accept_time!=None).\ - order_by(HIT.submit_hit_at.desc()).limit(n) + latest_assignments = self.session.query(Assignment).\ + filter(Assignment.created_at!=None).\ + filter(Assignment.submit_page_at!=None).\ + order_by(Assignment.created_at.desc()).limit(n) durations = [] - for hit in latest_hits: - durations.append((hit.submit_hit_at - hit.accept_time).total_seconds()) + + for assignment in latest_assignments: + durations.append((assignment.submit_page_at - assignment.created_at).total_seconds()) if not len(durations): - return int(2.5*60) + return int(2.5*60) # default to 2.5 minutes return int(sum(durations) / len(durations)) def getEstimatedHitDuration(self): @@ -203,15 +284,3 @@ class Store: return self.session.query(HIT).\ filter(HIT.submit_hit_at != None).\ order_by(HIT.submit_hit_at.desc()).limit(n) - -# def rmSource(self, id: int): -# with self.getSession() as session: -# source = session.query(Source).get(id) -# if not source: -# logging.warning(f"Source nr {id} not found") -# else: -# logging.info(f"Deleting source {source.id}: {source.url}") -# session.delete(source) -# -# def getRandomNewsItem(self, session) -> NewsItem: -# return session.query(NewsItem).order_by(func.random()).limit(1).first() diff --git a/sorteerhoed/central_management.py b/sorteerhoed/central_management.py index 6515097..81c5c85 100644 --- a/sorteerhoed/central_management.py +++ b/sorteerhoed/central_management.py @@ -17,10 +17,14 @@ from PIL import Image import datetime from shutil import copyfile import colorsys +import tqdm class Level(object): - # Level effect adapted from https://stackoverflow.com/a/3125421 + """ + Level image effect adapted from https://stackoverflow.com/a/3125421 + """ + def __init__(self, minv, maxv, gamma): self.minv= minv/255.0 self.maxv= maxv/255.0 @@ -81,6 +85,7 @@ class CentralManagement(): self.isScanning = threading.Event() self.scanLock = threading.Lock() self.notPaused = threading.Event() + self.lightStatus = 0 def loadConfig(self, filename, args): @@ -94,6 +99,7 @@ class CentralManagement(): 'hit_store.db' ) self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO) + self.store.registerUpdateHook(self.updateLightHook) # change light based on status self.logger.debug(f"Loaded configuration: {self.config}") @@ -142,6 +148,10 @@ class CentralManagement(): ExpireAt=datetime.datetime.fromisoformat('2015-01-01') ) self.mturk.delete_hit(HITId=pending_hit['HITId']) + staleHit = self.store.getHitByRemoteId(pending_hit['HITId']) + staleHit.delete() + self.store.saveHIT(staleHit) + self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning) sqsThread = threading.Thread(target=self.sqs.start, name='sqs') @@ -160,8 +170,6 @@ class CentralManagement(): # event listener: dispatcherThread = threading.Thread(target=self.eventListener, name='dispatcher') dispatcherThread.start() -# -# self.eventQueue.put(Signal('start', {'ding':'test'})) @@ -177,8 +185,9 @@ class CentralManagement(): self.expireCurrentHit() def expireCurrentHit(self): - if self.currentHit and self.currentHit.hit_id: # hit pending - self.logger.warn(f"Delete hit: {self.currentHit.hit_id}") + if self.currentHit and not self.currentHit.isConfirmed(): + if self.currentHit.hit_id: # hit pending at Amazon + self.logger.warn(f"Expire hit: {self.currentHit.hit_id}") self.mturk.update_expiration_for_hit( HITId=self.currentHit.hit_id, ExpireAt=datetime.datetime.fromisoformat('2015-01-01') @@ -187,7 +196,12 @@ class CentralManagement(): self.mturk.delete_hit(HITId=self.currentHit.hit_id) except Exception as e: self.logger.exception(e) - + + if not self.currentHit.isSubmitted(): + self.currentHit.delete() + self.store.saveHIT(self.currentHit) + + def eventListener(self): while self.isRunning.is_set(): try: @@ -206,27 +220,27 @@ class CentralManagement(): - Plotter complete - """ - #TODO: make level debug() self.logger.info(f"SIGNAL: {signal}") if signal.name == 'start': self.makeHit() self.lastHitTime = datetime.datetime.now() elif signal.name == 'hit.scan': + # start a scan if signal.params['id'] != self.currentHit.id: - self.logger.info(f"Hit.scanned had wrong id: {signal}") + self.logger.info(f"Hit.scan had wrong id: {signal}") continue - self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning')) +# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning')) elif signal.name == 'hit.scanned': # TODO: wrap up hit & make new HIT - if signal.params['id'] != self.currentHit.id: + if signal.params['hit_id'] != self.currentHit.id: self.logger.info(f"Hit.scanned had wrong id: {signal}") continue self.currentHit.scanned_at = datetime.datetime.utcnow() time_diff = datetime.datetime.now() - self.lastHitTime to_wait = 10 - time_diff.total_seconds() - self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan')) +# self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan')) if to_wait > 0: self.logger.warn(f"Sleep until next hit: {to_wait}s") @@ -237,10 +251,11 @@ class CentralManagement(): self.makeHit() self.lastHitTime = datetime.datetime.now() elif signal.name == 'hit.creating': - self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit')) +# self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit')) + pass elif signal.name == 'hit.created': - self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit')) - +# self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit')) + pass elif signal.name == 'scan.start': pass elif signal.name == 'scan.finished': @@ -251,38 +266,41 @@ class CentralManagement(): # Create new assignment if signal.params['hit_id'] != self.currentHit.id: continue - - assignment = self.store.newAssignment(self.currentHit) - assignment.assignment_id = signal.params['assignment_id'] - self.store.saveAssignment(assignment) - - self.statusPageQueue.add(dict(hit_id=self.currentHit.id, assignment_id=assignment.assignment_id, state='assignment')) - + assignment = self.currentHit.getAssignmentById(signal.params['assignment_id']) + elif signal.name == 'assignment.info': assignment = self.currentHit.getAssignmentById(signal.params['assignment_id']) if not assignment: self.logger.warning(f"assignment.info assignment.id not for current hit assignments: {signal}") + change = False for name, value in signal.params.items(): if name == 'ip': assignment.turk_ip = value if name == 'location': assignment.turk_country = value - + if name == 'os': + assignment.turk_os = value + if name == 'browser': + assignment.turk_browser = value + change = True self.logger.debug(f'Set assignment: {name} to {value}') - self.server.statusPage.set(name, value) - + + if change: + self.store.saveAssignment(assignment) + elif signal.name == 'server.open': self.currentHit.open_page_at = datetime.datetime.utcnow() self.store.saveHIT(self.currentHit) self.setLight(True) - self.server.statusPage.set('state', self.currentHit.getStatus()) - self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) - elif signal.name == 'server.submit': - self.currentHit.submit_page_at = datetime.datetime.utcnow() - self.store.saveHIT(self.currentHit) + elif signal.name == 'assignment.submit': + a = self.currentHit.getLastAssignment() + if a.assignment_id != signal.params['assignment_id']: + self.logger.critical(f"Submit of invalid assignment_id: {signal}") + + a.submit_page_at = datetime.datetime.utcnow() + self.store.saveAssignment(a) self.plotter.park() - self.server.statusPage.set('hit_opened', self.currentHit.open_page_at) # park always triggers a plotter.finished after being processed elif signal.name[:4] == 'sqs.': @@ -293,49 +311,37 @@ class CentralManagement(): else: sqsHit = self.currentHit updateStatus = True + + sqsAssignment = sqsHit.getAssignmentById(signal.params['event']['AssignmentId']) + if not sqsAssignment: + self.logger.critical(f"Invalid assignmentId given for hit: {signal.params['event']}") + continue if signal.name == 'sqs.AssignmentAccepted': self.logger.info(f'Set status progress to accepted') - sqsHit.accept_time = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") - sqsHit.worker_id = signal.params['event']['WorkerId'] - if updateStatus: - self.server.statusPage.set('worker_id', sqsHit.worker_id) + sqsAssignment.accept_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") + sqsAssignment.worker_id = signal.params['event']['WorkerId'] # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAccepted', 'EventTimestamp': '2019-10-23T20:16:10Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} elif signal.name == 'sqs.AssignmentAbandoned': self.logger.info(f'Set status progress to abandoned') #{'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentAbandoned', 'EventTimestamp': '2019-10-23T20:23:06Z', 'HITId': '3JHB4BPSFKKFQ263K4EFULI3LC79QJ', 'AssignmentId': '3U088ZLJVL450PB6MJZUIIUCB6VW0Y', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} - sqsHit.accept_time = None - sqsHit.open_page_at = None - if self.currentHit.id == sqsHit.id: - if not sqsHit.submit_page_at: - self.reset() - else: - sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit - if updateStatus: - self.setLight(False) + sqsAssignment.abandoned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") +# if updateStatus: +# self.setLight(False) self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Accepted task without working on it.') elif signal.name == 'sqs.AssignmentReturned': self.logger.info(f'Set status progress to returned') - sqsHit.accept_time = None - sqsHit.open_page_at = None - if self.currentHit.id == sqsHit.id: - if not sqsHit.submit_page_at: - self.reset() - else: - sqsHit.submit_hit_at = datetime.datetime.utcnow() # fake submit - if updateStatus: - self.setLight(False) + sqsAssignment.returned_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") # {'event': {'HITGroupId': '301G7MYOAJ85NEW128ZDGF5DSBW53S', 'EventType': 'AssignmentReturned', 'EventTimestamp': '2019-10-23T20:16:47Z', 'HITId': '3IH9TRB0FBAKKZFP3JUD6D9YWQ1I1F', 'AssignmentId': '3BF51CHDTWLN3ZGHRKDUHFKPWIJ0H3', 'WorkerId': 'A1CK46PK9VEUH5', 'HITTypeId': '3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0'}} elif signal.name == 'sqs.AssignmentSubmitted': # {'MessageId': '4b37dfdf-6a12-455d-a111-9a361eb54d88', 'ReceiptHandle': 'AQEBHc0yAdIrEmAV3S8TIoDCRxrItDEvjy0VQko56/Lb+ifszC0gdZ0Bbed24HGHZYr5DSnWkgBJ/H59ZXxFS1iVEH9sC8+YrmKKOTrKvW3gj6xYiBU2fBb8JRq+sEiNSxWLw2waxr1VYdpn/SWcoOJCv6PlS7P9EB/2IQ++rCklhVwV7RfARHy4J87bjk5R3+uEXUUi00INhCeunCbu642Mq4c239TFRHq3mwM6gkdydK+AP1MrXGKKAE1W5nMbwEWAwAN8KfoM1NkkUg5rTSYWmxxZMdVs/QRNcMFKVSf1bop2eCALSoG6l3Iu7+UXIl4HLh+rHp4bc8NoftbUJUii8YXeiNGU3wCM9T1kOerwYVgksK93KQrioD3ee8navYExQRXne2+TrUZUDkxRIdtPGA==', 'MD5OfBody': '01ccb1efe47a84b68704c4dc611a4d8d', 'Body': '{"Events":[{"Answer":"surveycode<\\/QuestionIdentifier>test<\\/FreeText><\\/Answer><\\/QuestionFormAnswers>","HITGroupId":"301G7MYOAJ85NEW128ZDGF5DSBW53S","EventType":"AssignmentSubmitted","EventTimestamp":"2019-10-30T08:01:43Z","HITId":"3NSCTNUR2ZY42ZXASI4CS5YWV0S5AB","AssignmentId":"3ZAZR5XV02TTOCBR9MCLCNQV1XKCZL","WorkerId":"A1CK46PK9VEUH5","HITTypeId":"3EYXOXDEN7RX0YSMN4UMVN01AYKZJ0"}],"EventDocId":"34af4cd7f2829216f222d4b6e66f3a3ff9ad8ea6","SourceAccount":"600103077174","CustomerId":"A1CK46PK9VEUH5","EventDocVersion":"2014-08-15"}'} self.logger.info(f'Set status progress to submitted') - # TODO: validate the content of the submission by parsing signal.params['event']['Answer'] and comparing it with sqsHit.uuid - sqsHit.answer = signal.params['event']['Answer'] - if sqsHit.uuid not in sqsHit.answer: - self.logger.critical(f"Not a valid answer given?! {sqsHit.answer}") + sqsAssignment.answer = signal.params['event']['Answer'] + if sqsAssignment.uuid not in sqsAssignment.answer: + self.logger.critical(f"Not a valid answer given?! {sqsAssignment.answer}") - if not sqsHit.submit_page_at: + if not sqsAssignment.submit_page_at: # page not submitted, hit is. Nevertheless, create new hit. try: self.mturk.reject_assignment(AssignmentId=signal.params['event']['AssignmentId'], RequesterFeedback='Did not do the assignment') @@ -343,7 +349,8 @@ class CentralManagement(): self.logger.exception(e) self.makeHit() else: - sqsHit.submit_hit_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") + sqsAssignment.confirmed_at = datetime.datetime.strptime(signal.params['event']['EventTimestamp'],"%Y-%m-%dT%H:%M:%SZ") + # block de worker na succesvolle submit, om dubbele workers te voorkomen # TODO: Disabled after worker mail, use quals instead #self.mturk.create_worker_block(WorkerId=signal.params['event']['WorkerId'], Reason='Every worker can only work once on the taks.') @@ -351,21 +358,19 @@ class CentralManagement(): self.store.saveHIT(sqsHit) - - if updateStatus: - self.logger.warning(f'update status: {sqsHit.getStatus()}') - # TODO: have HITStore/HIT take care of this by emitting a signal - # only update status if it is the currentHit - self.server.statusPage.set('state', sqsHit.getStatus()) - else: - self.logger.warning('DO NOT update status') + elif signal.name == 'plotter.finished': - if self.currentHit and self.currentHit.submit_page_at: - self.setLight(False) + # is _always_ triggered after submit due to plotter.park() + if self.currentHit and self.currentHit.isSubmitted(): + self.currentHit.plotted_at = datetime.datetime.utcnow() + self.store.saveHIT(self.currentHit) + + self.logger.info("Start scan thread") scan = threading.Thread(target=self.scanImage, name='scan') scan.start() - self.server.statusPage.set('hit_submitted', self.currentHit.submit_page_at) - self.server.statusPage.set('state', self.currentHit.getStatus()) + elif signal.name == 'plotter.parked': + # should this have the code from plotter.finished? + pass else: self.logger.critical(f"Unknown signal: {signal.name}") except Exception as e: @@ -373,11 +378,10 @@ class CentralManagement(): self.logger.exception(e) def makeHit(self): - self.expireCurrentHit() # expire hit if it is there + self.expireCurrentHit() # expire hit if it is there self.eventQueue.put(Signal('hit.creating', {'id': self.currentHit.id if self.currentHit else 'start'})) - self.server.statusPage.reset() self.reloadConfig() # reload new config values if they are set # self.notPaused.wait() @@ -387,7 +391,6 @@ class CentralManagement(): self.logger.info(f"Make HIT {self.currentHit.id}") -# question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",str(self.currentHit.id)) question = ''' https://guest.rubenvandeven.com:8888/draw?id={HIT_NR} @@ -422,7 +425,6 @@ class CentralManagement(): self.currentHit.hit_id = new_hit['HIT']['HITId'] self.store.saveHIT(self.currentHit) - # TODO: have HITStore/HIT take care of this by emitting a signal # self.server.statusPage.set('hit_id', new_hit['HIT']['HITId']) # self.server.statusPage.set('hit_created', self.currentHit.created_at) # self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}") @@ -489,47 +491,62 @@ class CentralManagement(): """ Run scanimage on scaner and returns a string with the filename """ - - cmd = [ - 'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff', - '--resolution=100', # lower res, faster (more powerful) scan & wipe - '-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards - ,'-t','22', # x axis, margin from left side scanner (seen from the outside) - '-x',str(181), - '-y',str(245) - ] - self.logger.info(f"{cmd}") - filename = self.currentHit.getImagePath() - + with self.scanLock: + if self.config['dummy_plotter']: + self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id})) + self.eventQueue.put(Signal('scan.start')) + self.logger.warning("Fake scanner for a few seconds") + for i in tqdm.tqdm(range(5)): + time.sleep(1) + self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) + self.eventQueue.put(Signal('scan.finished')) + return + + + cmd = [ + 'sudo', 'scanimage', '-d', 'epkowa', '--format', 'tiff', + '--resolution=100', # lower res, faster (more powerful) scan & wipe + '-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards + ,'-t','22', # x axis, margin from left side scanner (seen from the outside) + '-x',str(181), + '-y',str(245) + ] + self.logger.info(f"{cmd}") + filename = self.currentHit.getImagePath() + self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id})) self.eventQueue.put(Signal('scan.start')) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # opens connection to scanner, but only starts scanning when output becomes ready: o, e = proc.communicate(80) - if e: - self.logger.critical(f"Scanner caused: {e.decode()}") - #TODO: should clear self.isRunning.clear() ? - - try: - f = io.BytesIO(o) - img = Image.open(f) - img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM) - tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma']) - tunedImg.save(filename) - except Exception as e: - self.logger.critical("Cannot create image from scan. Did scanner work?") - self.logger.exception(e) - # TODO: create - copyfile('www/basic.svg', filename) - - time.sleep(5) # sleep a few seconds for scanner to return to start position - - self.eventQueue.put(Signal('hit.scanned', {'id':self.currentHit.id})) - self.eventQueue.put(Signal('scan.finished')) + if e: + self.logger.critical(f"Scanner caused: {e.decode()}") + # Should this clear self.isRunning.clear() ? + + try: + f = io.BytesIO(o) + img = Image.open(f) + img = img.transpose(Image.ROTATE_90).transpose(Image.FLIP_TOP_BOTTOM) + tunedImg = Level.level_image(img, self.config['level']['min'], self.config['level']['max'], self.config['level']['gamma']) + tunedImg.save(filename) + except Exception as e: + self.logger.critical("Cannot create image from scan. Did scanner work?") + self.logger.exception(e) + copyfile('www/basic.svg', filename) + + time.sleep(5) # sleep a few seconds for scanner to return to start position + + self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) + self.eventQueue.put(Signal('scan.finished')) def setLight(self, on): value = 1 if on else 0 + + if self.lightStatus == value: + return + self.lightStatus = value + cmd = [ 'usbrelay', f'HURTM_1={value}' ] @@ -537,3 +554,17 @@ class CentralManagement(): code = subprocess.call(cmd) if code > 0: self.logger.warning(f"Error on light change: {code}") + + def updateLightHook(self, hit = None): + # ignore hit attribute, which comes from the HITstore + self.setLight(self.getLightStatus()) + + def getLightStatus(self) -> bool: + if not self.currentHit: + return False + a = self.currentHit.getLastAssignment() + if not a: + return False + if self.currentHit.plotted_at: # wait till plotter is done + return False + return True diff --git a/sorteerhoed/webserver.py b/sorteerhoed/webserver.py index ec5b5c2..bbd2b9f 100644 --- a/sorteerhoed/webserver.py +++ b/sorteerhoed/webserver.py @@ -21,6 +21,14 @@ import html logger = logging.getLogger("sorteerhoed").getChild("webserver") + +class DateTimeEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, datetime.datetime): + return o.isoformat(timespec='seconds') + + return super().default(self, o) + class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler): def set_extra_headers(self, path): """For subclass to add extra headers to the response""" @@ -36,7 +44,6 @@ class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler): self.set_header("Content-Type", "image/svg+xml") - class WebSocketHandler(tornado.websocket.WebSocketHandler): """ Websocket from the workers @@ -66,12 +73,15 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): self.hit = self.store.currentHit - - self.assignment_id = int(self.get_query_argument('assignment_id')) - + self.assignment_id = str(self.get_query_argument('assignmentId')) + self.assignment = self.hit.getLastAssignment() + + if self.assignment.assignment_id != self.assignment_id: + raise Exception(f"Opening websocket for invalid assignment {self.assignment_id}") + self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getHitTimeout()) - if self.hit.submit_hit_at: + if self.hit.isSubmitted(): raise Exception("Opening websocket for already submitted hit") #logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}") @@ -85,6 +95,7 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): if self.assignment_id != self.hit.getLastAssignment().assignment_id: logger.critical(f"Skip message for non-last assignment {message}") + return if datetime.datetime.now() > self.timeout: logger.critical("Close websocket after timeout (abandon?)") @@ -93,7 +104,6 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): try: msg = json.loads(message) - # TODO: sanitize input: min/max, limit strokes if msg['action'] == 'move': # TODO: min/max input point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])] @@ -127,8 +137,8 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): self.write_message(json.dumps({ 'action': 'submitted', - 'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.hit.uuid}", - 'code': str(self.hit.uuid) + 'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.assignment.uuid}", + 'code': str(self.assignment.uuid) })) self.close() @@ -136,8 +146,9 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): # not used, implicit in move? pass elif msg['action'] == 'info': - self.eventQ.put(Signal('hit.info', dict( + self.eventQ.put(Signal('assignment.info', dict( hit_id=self.hit.id, + assignment_id=self.assignment_id, resolution=msg['resolution'], browser=msg['browser'] ))) @@ -159,32 +170,35 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): if len(self.strokes) < 1: return False - self.eventQ.put(Signal("server.submit", dict(hit_id = self.hit.id))) + self.eventQ.put(Signal("assignment.submit", dict( + hit_id = self.hit.id, + assignment_id=self.assignment_id))) - if self.config['dummy_plotter']: - d = strokes2D(self.strokes) - svg = f""" - - - - """ - - filename = self.hit.getImagePath() - logger.info(f"Write to {filename}") - with open(filename, 'w') as fp: - fp.write(svg) +# deprecated: now done at scanner method: +# if self.config['dummy_plotter']: +# d = strokes2D(self.strokes) +# svg = f""" +# +# +# +# """ +# +# filename = self.hit.getImagePath() +# logger.info(f"Write to {filename}") +# with open(filename, 'w') as fp: +# fp.write(svg) # we fake a hit.scanned event - self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id})) +# self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id})) - return self.hit.uuid + return self.assignment.uuid @classmethod def rmConnection(cls, client): @@ -196,11 +210,9 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler): class StatusWebSocketHandler(tornado.websocket.WebSocketHandler): CORS_ORIGINS = ['localhost'] connections = set() - queue = queue.Queue() def initialize(self, statusPage): self.statusPage = statusPage - pass def check_origin(self, origin): parsed_origin = urlparse(origin) @@ -209,13 +221,9 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler): return valid # the client connected - def open(self, p = None): + def open(self): self.__class__.connections.add(self) - for prop, value in self.statusPage.__dict__.items(): - self.write_message(json.dumps({ - 'property': prop, - 'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value - })) + self.write_message(json.dumps(self.statusPage.fetch(), cls=DateTimeEncoder)) # client disconnected @@ -230,13 +238,10 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler): cls.connections.remove(client) @classmethod - def update_for_all(cls, prop, value): - logger.debug(f"update for all {prop} {value}") + def update_for_all(cls, data): + logger.debug(f"update for all {data}") for connection in cls.connections: - connection.write_message(json.dumps({ - 'property': prop, - 'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value - })) + connection.write_message(json.dumps(data, cls=DateTimeEncoder)) def strokes2D(strokes): # strokes to a d attribute for a path @@ -283,7 +288,7 @@ class DrawPageHandler(tornado.web.RequestHandler): except Exception: self.write("HIT not found") else: - if hit.submit_page_at: + if hit.isSubmitted(): self.write("HIT already submitted") return @@ -294,6 +299,15 @@ class DrawPageHandler(tornado.web.RequestHandler): previewOnly = False if assignmentId == 'ASSIGNMENT_ID_NOT_AVAILABLE': previewOnly = True + + if len(assignmentId) and not previewOnly: + # process/create assignment + assignment = self.store.currentHit.getAssignmentById(assignmentId) + if not assignment: + # new assignment + logger.warning(f"Create new assignment {assignmentId}") + assignment = self.store.newAssignment(self.store.currentHit, assignmentId) + self.store.saveAssignment(assignment) previous_hit = self.store.getLastSubmittedHit() if not previous_hit: @@ -330,7 +344,8 @@ class DrawPageHandler(tornado.web.RequestHandler): self.eventQ.put(Signal('hit.assignment', dict( hit_id=hit.id, ip=ip, assignment_id=assignmentId ))) - # self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, ip=ip))) + + self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, ip=ip))) try: geoip = self.geoip_reader.country(ip) @@ -383,50 +398,35 @@ class StatusPage(): """ Properties for on the status page, which are send over websockets the moment they are altered. - """ - def __init__(self): - self.reset() - - def reset(self): - logger.info("Resetting status") - self.hit_id = None - self.worker_id = None - self.ip = None - self.location = None - self.browser = None - self.os = None - self.resolution = None - self.state = None - self.fee = None - self.hit_created = None - self.hit_opened = None - self.hit_submitted = None - - def clearAssignment(self): - logger.info("Resetting hit assignment") - self.worker_id = None - self.ip = None - self.location = None - self.browser = None - self.os = None - self.resolution = None - self.hit_created = None - - def __setattr__(self, name, value): - if name in self.__dict__ and self.__dict__[name] == value: - logger.debug(f"Ignore setting status of {name}: it already is set to {value}") - return - - self.__dict__[name] =value - logger.info(f"Update status: {name}: {value}") + """ + + def __init__(self, store: HITStore): + self.store = store + self.store.registerUpdateHook(self) + + def update(self, hit = None): + """ + Send the given HIT formatted to the websocket clients + + If no hit is given, load the last 2 items + """ + if hit: + data = [hit.toDict()] + else: + hits = self.store.getNewestHits(2) + data = [hit.toDict() for hit in hits] + if Server.loop: - Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, name, value) + Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, data) else: logger.warn("Status: no server loop to call update command") - - - def set(self, name, value): - return self.__setattr__(name, value) + + def fetch(self): + """ + Fetch latest, used on connection of status page + """ + hits = self.store.getNewestHits(2) + return [hit.toDict() for hit in hits] @@ -434,8 +434,6 @@ class Server: """ Server for HIT -> plotter events As well as for the Status interface - - TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image """ loop = None @@ -452,7 +450,7 @@ class Server: self.server_loop = None self.store = store - self.statusPage = StatusPage() + self.statusPage = StatusPage(store) def start(self): @@ -489,6 +487,8 @@ class Server: store = self.store, path=self.web_root, )), + (r"/frames/(.*)", StaticFileWithHeaderHandler, + {"path": 'scanimation/interfaces/frames'}), (r"/(.*)", StaticFileWithHeaderHandler, {"path": self.web_root}), ], debug=True, autoreload=False) diff --git a/www/assignment.js b/www/assignment.js index 3a491b9..a59ce68 100644 --- a/www/assignment.js +++ b/www/assignment.js @@ -51,7 +51,7 @@ let draw = function(e) { strokeEl.setAttribute('d', d); } - console.log([pos['x'], pos['y']], isDrawing); + //console.log([pos['x'], pos['y']], isDrawing); socket.send(JSON.stringify({ 'action': 'move', 'direction': [pos['x'], pos['y']],