import logging from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, DateTime, Float from sqlalchemy.orm import relationship from sqlalchemy.sql.schema import ForeignKey, Sequence from sqlalchemy.engine import create_engine from sqlalchemy.orm.session import sessionmaker, object_session import datetime from contextlib import contextmanager import uuid import os import country_converter from svgpathtools import svg2paths mainLogger = logging.getLogger("sorteerhoed") logger = mainLogger.getChild("store") Base = declarative_base() cc = country_converter.CountryConverter() """ HIT lifetime: created accepted (returned!) working awaiting amazon confirmation (submitted on page) submitted Actions: creating Hit (creating hit with scanned image) Scanning """ class HIT(Base): __tablename__ = 'hits' id = Column(Integer, Sequence('hit_id'), primary_key=True) # our sequential hit id hit_id = Column(String(255)) # amazon's hit id created_at = Column(DateTime, default=datetime.datetime.utcnow) updated_at = Column(DateTime, default=datetime.datetime.utcnow) scanned_at = Column(DateTime, default=None) plotted_at = Column(DateTime, default=None) deleted_at = Column(DateTime, default=None) assignments = relationship("Assignment", back_populates="hit", order_by="Assignment.created_at") fee = Column(Float(precision=2), default=None) # previous hit so we can load the corrent image # previous_hit_id = Column(Integer, ForeignKey('hits.id'), default=None) # previous_hit = relationship("HIT") def getImagePath(self): return os.path.join('scanimation/interfaces/frames', f"{self.id:06d}.jpg") def getImageUrl(self): return os.path.join('/frames', f"{self.id:06d}.jpg") def getSvgImageUrl(self): return f"/scans/{self.id:06d}.svg" def getSvgImagePath(self): # os.path.join on svgImageUrl leads to invalid absolute url return os.path.join(f'www/scans/{self.id:06d}.svg') def getLastAssignment(self): if not len(self.assignments): return None return self.assignments[-1] def getAssignmentById(self, assignmentId): for a in self.assignments: if a.assignment_id == assignmentId: return a return None def getStatus(self): assignment = self.getLastAssignment() if self.deleted_at: return "deleted" if not self.hit_id: return "creating" if not assignment: return "awaiting worker" if self.scanned_at: return "scanned" return assignment.getStatus() def toDict(self) -> dict: values = {c.name: getattr(self, c.name) for c in self.__table__.columns} assignment = self.getLastAssignment() values['assignment'] = assignment.toDict() if assignment else None values['state'] = self.getStatus() values['scan_image'] = self.getImageUrl() if self.scanned_at else None values['svg_image'] = self.getSvgImageUrl() if self.isSubmitted() else None values['preceding_assignments'] = [a.toShortDict() for a in self.getBasedOnAssignments()] values['preceding_assignments'].append({ 'worker_id': 'Ruben van de Ven & Merijn van Moll', 'turk_country': 'the Netherlands', 'turk_country_code': 'NL' }) if not values['svg_image'] or not os.path.exists(self.getSvgImagePath()): values['path_length'] = None else: try: paths, _ = svg2paths(self.getSvgImagePath()) values['path_length'] = round(paths[0].length()) except: values['path_length'] = None return values def delete(self): self.deleted_at = datetime.datetime.utcnow() def isSubmitted(self) -> bool: a = self.getLastAssignment() if not a: return False return bool(a.submit_page_at) def isConfirmed(self) -> bool: a = self.getLastAssignment() if not a: return False return bool(a.confirmed_at) def getBasedOnAssignments(self): """ Get preceding assignments, one per worker, excluding the one who did this HIT """ assignment = self.getLastAssignment() session = object_session(self) q = session.query(Assignment).\ filter(Assignment.submit_page_at < self.created_at).\ filter(Assignment.worker_id != None).\ group_by(Assignment.worker_id).\ order_by(Assignment.created_at.desc()) if assignment and assignment.worker_id: q = q.filter(Assignment.worker_id != assignment.worker_id) return q class Assignment(Base): __tablename__ = 'assignments' id = Column(Integer, Sequence('assignment_id'), primary_key=True) # our sequential hit id assignment_id = Column(String(255)) # amazon's assignment id hit_id = Column(Integer, ForeignKey('hits.id')) # our sequential hit id hit = relationship("HIT", back_populates="assignments") uuid = Column(String(32), default=lambda : uuid.uuid4().hex) created_at = Column(DateTime, default=datetime.datetime.utcnow) updated_at = Column(DateTime, default=datetime.datetime.utcnow) assignment_id = Column(String(255), default = None) worker_id = Column(String(255), default = None) accept_at = Column(DateTime, default=None) # accept time acccording to SQS # open_page_at = Column(DateTime, default=None) submit_page_at = Column(DateTime, default=None) # Submit the page confirmed_at = Column(DateTime, default=None) # validate with UUID when getting Message from Amazon abandoned_at = Column(DateTime, default=None) rejected_at = Column(DateTime, default=None) answer = Column(String(255), default=None) turk_ip = Column(String(255), default=None) turk_country = Column(String(255), default=None) turk_os = Column(String(255), default=None) turk_browser = Column(String(255), default=None) def getStatus(self): if self.rejected_at: return "rejected" if self.abandoned_at: return "abandoned" if not self.submit_page_at: return "working" if not self.confirmed_at: return "submitted" return "confirmed" def toDict(self) -> dict: values = {c.name: getattr(self, c.name) for c in self.__table__.columns} if self.turk_country: if self.turk_country == 'Unknown': values['turk_country_code'] = '--' else: values['turk_country_code'] = cc.convert([self.turk_country], to='ISO2') else: values['turk_country_code'] = None return values def toShortDict(self) -> dict: values = { 'worker_id': self.worker_id, 'turk_country': self.turk_country } if self.turk_country: if self.turk_country == 'Unknown': values['turk_country_code'] = '--' else: values['turk_country_code'] = cc.convert([self.turk_country], to='ISO2') else: values['turk_country_code'] = None return values def getOriginalAssignmentId(self): """ Initial assumption: assignmentId would be unique for each worker. But it is not. Hence assignment_id now gets the value assigmentId_workerId (confusing to say the least!) Sometimes we want to recover the assignment id from this combination """ return self.assignment_id.split('_')[0] class Store: def __init__(self, db_filename, logLevel=0): path = os.path.abspath(db_filename) if logLevel <= logging.DEBUG: logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) needsInitialization = not os.path.exists(path) self.engine = create_engine('sqlite:///'+path, echo=False, connect_args={'check_same_thread': False}) Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) self.session = self.Session() self.currentHit = None # mirrors Centralmanagmenet, stored here so we can quickly access it from webserver classes self.updateHooks = [] # if needsInitialization: # self.insertInitialContent() # # def insertInitialContent(self): # hit = self.createHIT() # assignment = self.newAssignment(hit, 'initial') # def registerUpdateHook(self, hook): if hook not in self.updateHooks: logger.info(f"Register update hook: {hook}") self.updateHooks.append(hook) def triggerUpdateHooks(self, hit = None): for hook in self.updateHooks: if callable(hook): # it's a method hook(hit) else: # assume it's an object hook.update(hit) @contextmanager def getSession(self): """Provide a transactional scope around a series of operations.""" try: yield self.session self.session.commit() except: self.session.rollback() raise def getHits(self): return self.session.query(HIT).order_by(HIT.created_at.desc()) def getHitById(self, hitId): return self.session.query(HIT).\ filter(HIT.id==hitId).one() def getHitByRemoteId(self, amazonHitId): return self.session.query(HIT).\ filter(HIT.hit_id==amazonHitId).one() def getLastSubmittedHit(self): return self.session.query(HIT).\ join(Assignment).\ filter(Assignment.submit_page_at!=None).\ order_by(HIT.created_at.desc()).first() def getNewestHits(self, n = 2) -> list: q = self.session.query(HIT).\ filter(HIT.deleted_at==None).\ order_by(HIT.created_at.desc()) if n is not None: q = q.limit(n) hits = list(q) # select DESC, because we want latest, then reverse list to get in right order hits.reverse() return hits def createHIT(self) -> HIT: with self.getSession() as s: hit = HIT() s.add(hit) s.flush() s.refresh(hit) logger.info(f"Created HIT {hit.id}") self.triggerUpdateHooks(hit) return hit def newAssignment(self, hit: HIT, assignmentId) -> Assignment: # TODO: reset() central management if has pending lastAssignment() with self.getSession() as s: assignment = Assignment() assignment.assignment_id = assignmentId hit.assignments.append(assignment) s.add(assignment) s.flush() s.refresh(hit) logger.info(f"Created Assignment {assignment.id}") self.triggerUpdateHooks(hit) return assignment def saveHIT(self, hit): with self.getSession() as s: logger.info(f"Updating hit! {hit.id}") # s.flush() self.triggerUpdateHooks(hit) def saveAssignment(self, assignment): with self.getSession() as s: logger.info(f"Updating assignment! {assignment.id}") # s.flush() self.triggerUpdateHooks(assignment.hit) # def addHIT(self, hit: HIT): # with self.getSession() as s: # s.add(hit) # s.flush() # s.refresh(hit) # logger.info(f"Added {hit.id}") def getAvgDurationOfPreviousNHits(self, n) -> int: latest_assignments = self.session.query(Assignment).\ filter(Assignment.created_at!=None).\ filter(Assignment.submit_page_at!=None).\ order_by(Assignment.created_at.desc()).limit(n) durations = [] for assignment in latest_assignments: durations.append((assignment.submit_page_at - assignment.created_at).total_seconds()) if not len(durations): return int(2.5*60) # default to 2.5 minutes return int(sum(durations) / len(durations)) def getEstimatedHitDuration(self): return self.getAvgDurationOfPreviousNHits(5) def getHitTimeout(self): return 160 # max(160, self.getAvgDurationOfPreviousNHits(5)*2) def getHITs(self, n = 100): return self.session.query(HIT).\ filter(HIT.submit_hit_at != None).\ order_by(HIT.submit_hit_at.desc()).limit(n)