WIP. assignment has separate table. and prepare for more advanced status display

This commit is contained in:
mt 2020-01-10 18:03:18 +01:00
parent 5d8ef56686
commit ac04fb6082
4 changed files with 302 additions and 137 deletions

View file

@ -32,7 +32,7 @@ submitted
Actions: Actions:
creating Hit (creating hit with scanned image) creating Hit (creating hit with scanned image)
Scanning Scanning
""" """
@ -42,35 +42,34 @@ class HIT(Base):
hit_id = Column(String(255)) # amazon's hit id hit_id = Column(String(255)) # amazon's hit id
created_at = Column(DateTime, default=datetime.datetime.utcnow) created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.utcnow) updated_at = Column(DateTime, default=datetime.datetime.utcnow)
uuid = Column(String(32), default=lambda : uuid.uuid4().hex)
assignment_id = Column(String(255), default = None)
worker_id = Column(String(255), default = None)
accept_time = Column(DateTime, default=None)
open_page_at = Column(DateTime, default=None)
submit_page_at = Column(DateTime, default=None)
submit_hit_at = Column(DateTime, default=None)
answer = Column(String(255), default=None)
turk_ip = Column(String(255), default=None)
turk_country = Column(String(255), default=None)
turk_screen_width = Column(Integer, default = None)
turk_screen_height = Column(Integer, default = None)
scanned_at = Column(DateTime, default=None) scanned_at = Column(DateTime, default=None)
deleted_at = Column(DateTime, default=None)
assignments = relationship("Assignment", back_populates="hit", order_by="Assignment.created_at")
fee = Column(Float(precision=2), default=None) fee = Column(Float(precision=2), default=None)
abandoned = False
# previous hit so we can load the corrent image
# previous_hit_id = Column(Integer, ForeignKey('hits.id'), default=None)
# previous_hit = relationship("HIT")
def getImagePath(self): def getImagePath(self):
return os.path.join('scanimation/interfaces/frames', f"{self.id:06d}.jpg") return os.path.join('scanimation/interfaces/frames', f"{self.id:06d}.jpg")
# def getImageUrl(self):
# return f"{self.id}.jpg"
def getSvgImageUrl(self): def getSvgImageUrl(self):
return f"scans/{self.id:06d}.svg" return f"scans/{self.id:06d}.svg"
def getSvgImagePath(self): def getSvgImagePath(self):
return os.path.join('www', self.getSvgImageUrl()) return os.path.join('www', self.getSvgImageUrl())
def getLastAssignment(self):
if not len(self.assignments):
return None
return self.assignments[-1]
def getAssignmentById(self, assignmentId):
for a in self.assignments:
if a.assignment_id == assignmentId:
return
def getStatus(self): def getStatus(self):
if self.scanned_at: if self.scanned_at:
return "completed" return "completed"
@ -86,22 +85,44 @@ class HIT(Base):
if self.worker_id: if self.worker_id:
return "abandoned by worker" return "abandoned by worker"
return "awaiting worker" return "awaiting worker"
class Assignment(Base):
__tablename__ = 'assignments'
id = Column(Integer, Sequence('assignment_id'), primary_key=True) # our sequential hit id
assignment_id = Column(String(255)) # amazon's assignment id
hit_id = Column(Integer, ForeignKey('hits.id')) # our sequential hit id
hit = relationship("HIT", back_populates="assignments")
uuid = Column(String(32), default=lambda : uuid.uuid4().hex)
created_at = Column(DateTime, default=datetime.datetime.utcnow)
updated_at = Column(DateTime, default=datetime.datetime.utcnow)
assignment_id = Column(String(255), default = None)
worker_id = Column(String(255), default = None)
accept_at = Column(DateTime, default=None)
# open_page_at = Column(DateTime, default=None)
submit_page_at = Column(DateTime, default=None) # Submit the page
confirmed_at = Column(DateTime, default=None) # validate with UUID when getting Message from Amazon
abandoned_at = Column(DateTime, default=None)
rejected_at = Column(DateTime, default=None)
answer = Column(String(255), default=None)
turk_ip = Column(String(255), default=None)
turk_country = Column(String(255), default=None)
class Store: class Store:
def __init__(self, db_filename, logLevel=0): def __init__(self, db_filename, logLevel=0):
path = os.path.abspath(db_filename) path = os.path.abspath(db_filename)
if logLevel <= logging.DEBUG: if logLevel <= logging.DEBUG:
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
self.engine = create_engine('sqlite:///'+path, echo=False, connect_args={'check_same_thread': False}) self.engine = create_engine('sqlite:///'+path, echo=False, connect_args={'check_same_thread': False})
Base.metadata.create_all(self.engine) Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine) self.Session = sessionmaker(bind=self.engine)
self.session = self.Session() self.session = self.Session()
self.currentHit = None # mirrors Centralmanagmenet, stored here so we can quickly access it from webserver classes self.currentHit = None # mirrors Centralmanagmenet, stored here so we can quickly access it from webserver classes
@contextmanager @contextmanager
def getSession(self): def getSession(self):
"""Provide a transactional scope around a series of operations.""" """Provide a transactional scope around a series of operations."""
@ -111,25 +132,25 @@ class Store:
except: except:
self.session.rollback() self.session.rollback()
raise raise
def getHits(self, session): def getHits(self, session):
return self.session.query(Source).order_by(HIT.created_at.desc()) return self.session.query(Source).order_by(HIT.created_at.desc())
def getHitById(self, hitId): def getHitById(self, hitId):
return self.session.query(HIT).\ return self.session.query(HIT).\
filter(HIT.id==hitId).one() filter(HIT.id==hitId).one()
def getHitByRemoteId(self, amazonHitId): def getHitByRemoteId(self, amazonHitId):
return self.session.query(HIT).\ return self.session.query(HIT).\
filter(HIT.hit_id==amazonHitId).one() filter(HIT.hit_id==amazonHitId).one()
def getLastSubmittedHit(self): def getLastSubmittedHit(self):
return self.session.query(HIT).\ return self.session.query(HIT).\
filter(HIT.submit_page_at!=None).\ filter(HIT.submit_page_at!=None).\
order_by(HIT.submit_page_at.desc()).first() order_by(HIT.submit_page_at.desc()).first()
def createHIT(self): def createHIT(self) -> HIT:
with self.getSession() as s: with self.getSession() as s:
hit = HIT() hit = HIT()
s.add(hit) s.add(hit)
@ -137,19 +158,29 @@ class Store:
s.refresh(hit) s.refresh(hit)
logger.info(f"Created HIT {hit.id}") logger.info(f"Created HIT {hit.id}")
return hit return hit
def newAssignment(self, hit: HIT) -> Assignment:
with self.getSession() as s:
assignment = Assignment()
hit.assignments.append(assignment)
s.add(assignment)
s.flush()
s.refresh(hit)
logger.info(f"Created Assignment {assignment.id}")
return assignment
def saveHIT(self, hit): def saveHIT(self, hit):
with self.getSession() as s: with self.getSession() as s:
logger.info(f"Updating hit! {hit.id}") logger.info(f"Updating hit! {hit.id}")
# s.flush() # s.flush()
def addHIT(self, hit: HIT): def addHIT(self, hit: HIT):
with self.getSession() as s: with self.getSession() as s:
s.add(hit) s.add(hit)
s.flush() s.flush()
s.refresh(hit) s.refresh(hit)
logger.info(f"Added {hit.id}") logger.info(f"Added {hit.id}")
def getAvgDurationOfPreviousNHits(self, n) -> int: def getAvgDurationOfPreviousNHits(self, n) -> int:
latest_hits = self.session.query(HIT).\ latest_hits = self.session.query(HIT).\
filter(HIT.submit_hit_at!=None).\ filter(HIT.submit_hit_at!=None).\
@ -161,18 +192,18 @@ class Store:
if not len(durations): if not len(durations):
return int(2.5*60) return int(2.5*60)
return int(sum(durations) / len(durations)) return int(sum(durations) / len(durations))
def getEstimatedHitDuration(self): def getEstimatedHitDuration(self):
return self.getAvgDurationOfPreviousNHits(5) return self.getAvgDurationOfPreviousNHits(5)
def getHitTimeout(self): def getHitTimeout(self):
return 160 # max(160, self.getAvgDurationOfPreviousNHits(5)*2) return 160 # max(160, self.getAvgDurationOfPreviousNHits(5)*2)
def getHITs(self, n = 100): def getHITs(self, n = 100):
return self.session.query(HIT).\ return self.session.query(HIT).\
filter(HIT.submit_hit_at != None).\ filter(HIT.submit_hit_at != None).\
order_by(HIT.submit_hit_at.desc()).limit(n) order_by(HIT.submit_hit_at.desc()).limit(n)
# def rmSource(self, id: int): # def rmSource(self, id: int):
# with self.getSession() as session: # with self.getSession() as session:
# source = session.query(Source).get(id) # source = session.query(Source).get(id)
@ -181,7 +212,6 @@ class Store:
# else: # else:
# logging.info(f"Deleting source {source.id}: {source.url}") # logging.info(f"Deleting source {source.id}: {source.url}")
# session.delete(source) # session.delete(source)
# #
# def getRandomNewsItem(self, session) -> NewsItem: # def getRandomNewsItem(self, session) -> NewsItem:
# return session.query(NewsItem).order_by(func.random()).limit(1).first() # return session.query(NewsItem).order_by(func.random()).limit(1).first()

View file

@ -76,6 +76,7 @@ class CentralManagement():
self.lastHitTime = None self.lastHitTime = None
self.eventQueue = Queue() self.eventQueue = Queue()
self.statusPageQueue = Queue()
self.isRunning = threading.Event() self.isRunning = threading.Event()
self.isScanning = threading.Event() self.isScanning = threading.Event()
self.scanLock = threading.Lock() self.scanLock = threading.Lock()
@ -210,38 +211,67 @@ class CentralManagement():
if signal.name == 'start': if signal.name == 'start':
self.makeHit() self.makeHit()
self.lastHitTime = datetime.datetime.now() self.lastHitTime = datetime.datetime.now()
pass elif signal.name == 'hit.scan':
if signal.params['id'] != self.currentHit.id:
self.logger.info(f"Hit.scanned had wrong id: {signal}")
continue
self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='scanning'))
elif signal.name == 'hit.scanned': elif signal.name == 'hit.scanned':
# TODO: wrap up hit & make new HIT # TODO: wrap up hit & make new HIT
if signal.params['id'] != self.currentHit.id:
self.logger.info(f"Hit.scanned had wrong id: {signal}")
continue
self.currentHit.scanned_at = datetime.datetime.utcnow() self.currentHit.scanned_at = datetime.datetime.utcnow()
self.server.statusPage.set('state', self.currentHit.getStatus())
time_diff = datetime.datetime.now() - self.lastHitTime time_diff = datetime.datetime.now() - self.lastHitTime
to_wait = 10 - time_diff.total_seconds() to_wait = 10 - time_diff.total_seconds()
self.statusPageQueue.add(dict(hit_id=self.currentHit.id, state='scan'))
if to_wait > 0: if to_wait > 0:
self.logger.warn(f"Sleep until next hit: {to_wait}s") self.logger.warn(f"Sleep until next hit: {to_wait}s")
time.sleep(to_wait) time.sleep(to_wait)
else: else:
self.logger.info(f"No need to wait: {to_wait}s") self.logger.info(f"No need to wait: {to_wait}s")
self.makeHit() self.makeHit()
self.lastHitTime = datetime.datetime.now() self.lastHitTime = datetime.datetime.now()
elif signal.name == 'hit.creating':
self.statusPageQueue.add(dict(hit_id=signal.params['id'], transition='create_hit'))
elif signal.name == 'hit.created':
self.statusPageQueue.add(dict(hit_id=signal.params['id'], remote_id=signal.params['remote_id'], state='hit'))
elif signal.name == 'scan.start': elif signal.name == 'scan.start':
pass pass
elif signal.name == 'scan.finished': elif signal.name == 'scan.finished':
# probably see hit.scanned
pass pass
elif signal.name == 'hit.info':
if signal.params['hit_id'] != self.currentHit.id:
self.logger.warning(f"hit.info hit_id != currenthit.id: {signal}")
continue
for name, value in signal.params.items():
if name == 'hit_id':
continue
if name == 'ip':
self.currentHit.turk_ip = value
if name == 'location':
self.currentHit.turk_country = value
self.logger.debug(f'Set status: {name} to {value}') elif signal.name == 'hit.assignment':
# Create new assignment
if signal.params['hit_id'] != self.currentHit.id:
continue
assignment = self.store.newAssignment(self.currentHit)
assignment.assignment_id = signal.params['assignment_id']
self.store.saveAssignment(assignment)
self.statusPageQueue.add(dict(hit_id=self.currentHit.id, assignment_id=assignment.assignment_id, state='assignment'))
elif signal.name == 'assignment.info':
assignment = self.currentHit.getAssignmentById(signal.params['assignment_id'])
if not assignment:
self.logger.warning(f"assignment.info assignment.id not for current hit assignments: {signal}")
for name, value in signal.params.items():
if name == 'ip':
assignment.turk_ip = value
if name == 'location':
assignment.turk_country = value
self.logger.debug(f'Set assignment: {name} to {value}')
self.server.statusPage.set(name, value) self.server.statusPage.set(name, value)
elif signal.name == 'server.open': elif signal.name == 'server.open':
self.currentHit.open_page_at = datetime.datetime.utcnow() self.currentHit.open_page_at = datetime.datetime.utcnow()
self.store.saveHIT(self.currentHit) self.store.saveHIT(self.currentHit)
@ -345,12 +375,13 @@ class CentralManagement():
def makeHit(self): def makeHit(self):
self.expireCurrentHit() # expire hit if it is there self.expireCurrentHit() # expire hit if it is there
self.eventQueue.put(Signal('hit.creating', {'id': self.currentHit.id if self.currentHit else 'start'}))
self.server.statusPage.reset() self.server.statusPage.reset()
self.reloadConfig() # reload new config values if they are set self.reloadConfig() # reload new config values if they are set
# self.notPaused.wait() # self.notPaused.wait()
self.currentHit = self.store.createHIT() self.currentHit = self.store.createHIT()
self.store.currentHit = self.currentHit self.store.currentHit = self.currentHit
@ -392,10 +423,12 @@ class CentralManagement():
self.store.saveHIT(self.currentHit) self.store.saveHIT(self.currentHit)
# TODO: have HITStore/HIT take care of this by emitting a signal # TODO: have HITStore/HIT take care of this by emitting a signal
self.server.statusPage.set('hit_id', new_hit['HIT']['HITId']) # self.server.statusPage.set('hit_id', new_hit['HIT']['HITId'])
self.server.statusPage.set('hit_created', self.currentHit.created_at) # self.server.statusPage.set('hit_created', self.currentHit.created_at)
self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}") # self.server.statusPage.set('fee', f"${self.currentHit.fee:.2f}")
self.server.statusPage.set('state', self.currentHit.getStatus()) # self.server.statusPage.set('state', self.currentHit.getStatus())
self.eventQueue.put(Signal('hit.created', {'id': self.currentHit.id, 'remote_id': self.currentHit.hit_id}))
# mturk.send_test_event_notification() # mturk.send_test_event_notification()
if self.config['amazon']['sqs_url']: if self.config['amazon']['sqs_url']:
@ -432,8 +465,8 @@ class CentralManagement():
'sudo', 'scanimage', '-d', 'epkowa','--resolution=100', 'sudo', 'scanimage', '-d', 'epkowa','--resolution=100',
'-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards '-l','25' #y axis, margin from top of the scanner, hence increasing this, moves the scanned image upwards
,'-t','22', # x axis, margin from left side scanner (seen from the outside) ,'-t','22', # x axis, margin from left side scanner (seen from the outside)
'-x',str(181), '-x',str(181),
'-y',str(245) '-y',str(245)
] ]
proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready: # opens connection to scanner, but only starts scanning when output becomes ready:
@ -469,6 +502,7 @@ class CentralManagement():
filename = self.currentHit.getImagePath() filename = self.currentHit.getImagePath()
with self.scanLock: with self.scanLock:
self.eventQueue.put(Signal('hit.scan', {'id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.start')) self.eventQueue.put(Signal('scan.start'))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# opens connection to scanner, but only starts scanning when output becomes ready: # opens connection to scanner, but only starts scanning when output becomes ready:
@ -491,7 +525,7 @@ class CentralManagement():
time.sleep(5) # sleep a few seconds for scanner to return to start position time.sleep(5) # sleep a few seconds for scanner to return to start position
self.eventQueue.put(Signal('hit.scanned', {'hit_id':self.currentHit.id})) self.eventQueue.put(Signal('hit.scanned', {'id':self.currentHit.id}))
self.eventQueue.put(Signal('scan.finished')) self.eventQueue.put(Signal('scan.finished'))
def setLight(self, on): def setLight(self, on):

View file

@ -0,0 +1,89 @@
import datetime
class State():
def __init__(self, hit_id):
self.time = datetime.datetime.now()
self.hit_id = params['hit_id']
def transition(self, transitionName, params = {}):
raise Exception("Not implemented")
class StateMachine:
def __init__(self, initalState):
self.history = [('init',initialState)]
def current(self):
return self.history[-1][1]
def transition(self, transitionName, params):
# TODO: update Store & Interface
if transitionName not in self.current().availableTransitions:
raise Exception("Invalid transition")
newState = self.current().transition(transitionName, params)
if not newState:
raise RuntimeException(f"Invalid transition {transitionName} for {self.current()}")
self.history.append((transitionName, newState))
def getStateForHit(self, hit_id, stateCls = None):
states = [s for s in self.history if s[1].hit_id == hit_id and (stateCls is None or isinstance(s[1], stateCls))]
if len(states < 1):
return None
return states[-1]
class HITCreated(State):
availableTransitions = ['accept']
self.state = None
self.fee = None
self.hit_created = None
self.hit_opened = None
self.hit_submitted = None
def transition(self, transitionName, params = {}):
if transitionName == 'accept':
return HITAssigned(params['hit_id'], params['assignment_id'])
class HITAssigned(State):
availableTransitions = ['reject', 'abandon', 'submit']
def __init__(self, hit_id):
self.assignment_id = None
self.worker_id = None
self.ip = None
self.location = None
self.browser = None
self.os = None
self.resolution = None
def transition(self, transitionName, params = {}):
if transitionName == 'reject' or transitionName == 'abandon':
return HITAbandonedRejected(params['hit_id'])
if transitionName == 'submit':
return HITSubmitted(params['hit_id'])
class HITAbandonedRejected(State):
availableTransitions = ['accept']
def transition(self, transitionName, params = {}):
if transitionName == 'accept':
return HITAssigned(params['hit_id'])
class HITSubmitted(State):
availableTransitions = ['scan']
def transition(self, transitionName, params = {}):
if transitionName == 'scan':
return Scanning(params['hit_id'])
class Scanning(State):
availableTransitions = ['scan_complete', 'scan_failed']
def transition(self, transitionName, params = {}):
if transitionName == 'scan_complete':
return ImageAvailable(params['hit_id'])
if transitionName == 'scan_failed':
raise Exception("Scan failed, unknown state")
class ImageAvailable(State):
availableTransitions = ['create_hit']
def transition(self, transitionName, params = {}):
if transitionName == 'create_hit':
return HITCreated(params['hit_id'])

View file

@ -34,13 +34,16 @@ class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler):
print(mime) print(mime)
if mime == 'image/svg+xml': if mime == 'image/svg+xml':
self.set_header("Content-Type", "image/svg+xml") self.set_header("Content-Type", "image/svg+xml")
class WebSocketHandler(tornado.websocket.WebSocketHandler): class WebSocketHandler(tornado.websocket.WebSocketHandler):
"""
Websocket from the workers
"""
CORS_ORIGINS = ['localhost', '.mturk.com', 'here.rubenvandeven.com', 'guest.rubenvandeven.com'] CORS_ORIGINS = ['localhost', '.mturk.com', 'here.rubenvandeven.com', 'guest.rubenvandeven.com']
connections = set() connections = set()
def initialize(self, config, plotterQ: Queue, eventQ: Queue, store: HITStore): def initialize(self, config, plotterQ: Queue, eventQ: Queue, store: HITStore):
self.config = config self.config = config
self.plotterQ = plotterQ self.plotterQ = plotterQ
@ -60,35 +63,34 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
if hit_id != self.store.currentHit.id: if hit_id != self.store.currentHit.id:
self.close() self.close()
return return
self.hit = self.store.currentHit self.hit = self.store.currentHit
self.assignment_id = int(self.get_query_argument('assignment_id'))
self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getHitTimeout()) self.timeout = datetime.datetime.now() + datetime.timedelta(seconds=self.store.getHitTimeout())
if self.hit.submit_hit_at: if self.hit.submit_hit_at:
raise Exception("Opening websocket for already submitted hit") raise Exception("Opening websocket for already submitted hit")
#logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}") #logger.info(f"New client connected: {self.request.remote_ip} for {self.hit.id}/{self.hit.hit_id}")
self.eventQ.put(Signal('server.open', dict(hit_id=self.hit.id))) self.eventQ.put(Signal('server.open', dict(assignment_id=self.assignment_id)))
self.strokes = [] self.strokes = []
# Gather some initial information:
ua = self.request.headers.get('User-Agent', None)
if ua:
ua_info = httpagentparser.detect(ua)
self.eventQ.put(Signal('hit.info', dict(hit_id=self.hit.id, os=ua_info['os']['name'], browser=ua_info['browser']['name'])))
# self.write_message("hello!")
# the client sent the message # the client sent the message
def on_message(self, message): def on_message(self, message):
logger.debug(f"recieve: {message}") logger.debug(f"recieve: {message}")
if self.assignment_id != self.hit.getLastAssignment().assignment_id:
logger.critical(f"Skip message for non-last assignment {message}")
if datetime.datetime.now() > self.timeout: if datetime.datetime.now() > self.timeout:
logger.critical("Close websocket after timeout (abandon?)") logger.critical("Close websocket after timeout (abandon?)")
self.close() self.close()
return return
try: try:
msg = json.loads(message) msg = json.loads(message)
# TODO: sanitize input: min/max, limit strokes # TODO: sanitize input: min/max, limit strokes
@ -97,12 +99,12 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])] point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])]
self.strokes.append(point) self.strokes.append(point)
self.plotterQ.put(point) self.plotterQ.put(point)
elif msg['action'] == 'up': elif msg['action'] == 'up':
logger.info(f'up: {msg}') logger.info(f'up: {msg}')
point = [msg['direction'][0],msg['direction'][1], 1] point = [msg['direction'][0],msg['direction'][1], 1]
self.strokes.append(point) self.strokes.append(point)
elif msg['action'] == 'submit': elif msg['action'] == 'submit':
logger.info(f'submit: {msg}') logger.info(f'submit: {msg}')
id = self.submit_strokes() id = self.submit_strokes()
@ -112,24 +114,24 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
#store svg: #store svg:
d = html.escape(msg['d']) d = html.escape(msg['d'])
svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?> svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<svg <svg
xmlns:svg="http://www.w3.org/2000/svg" xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg" xmlns="http://www.w3.org/2000/svg"
version="1.0" viewBox="0 0 {self.config['scanner']['width']}0 {self.config['scanner']['height']}0" width="{self.config['scanner']['width']}mm" height="{self.config['scanner']['height']}mm" preserveAspectRatio="none"> version="1.0" viewBox="0 0 {self.config['scanner']['width']}0 {self.config['scanner']['height']}0" width="{self.config['scanner']['width']}mm" height="{self.config['scanner']['height']}mm" preserveAspectRatio="none">
<path d="{d}" style='stroke:gray;stroke-width:2mm;fill:none;' id="stroke" /> <path d="{d}" style='stroke:gray;stroke-width:2mm;fill:none;' id="stroke" />
</svg> </svg>
""" """
with open(self.store.currentHit.getSvgImagePath(), 'w') as fp: with open(self.store.currentHit.getSvgImagePath(), 'w') as fp:
fp.write(svg) fp.write(svg)
self.write_message(json.dumps({ self.write_message(json.dumps({
'action': 'submitted', 'action': 'submitted',
'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.hit.uuid}", 'msg': f"Submission ok, please copy this token to your HIT at Mechanical Turk: {self.hit.uuid}",
'code': str(self.hit.uuid) 'code': str(self.hit.uuid)
})) }))
self.close() self.close()
elif msg['action'] == 'down': elif msg['action'] == 'down':
# not used, implicit in move? # not used, implicit in move?
pass pass
@ -152,13 +154,13 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
def on_close(self): def on_close(self):
self.__class__.rmConnection(self) self.__class__.rmConnection(self)
logger.info(f"Client disconnected: {self.request.remote_ip}") logger.info(f"Client disconnected: {self.request.remote_ip}")
def submit_strokes(self): def submit_strokes(self):
if len(self.strokes) < 1: if len(self.strokes) < 1:
return False return False
self.eventQ.put(Signal("server.submit", dict(hit_id = self.hit.id))) self.eventQ.put(Signal("server.submit", dict(hit_id = self.hit.id)))
if self.config['dummy_plotter']: if self.config['dummy_plotter']:
d = strokes2D(self.strokes) d = strokes2D(self.strokes)
svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?> svg = f"""<?xml version="1.0" encoding="UTF-8" standalone="no"?>
@ -173,15 +175,15 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
<path d="{d}" style="stroke:black;stroke-width:2;fill:none;" /> <path d="{d}" style="stroke:black;stroke-width:2;fill:none;" />
</svg> </svg>
""" """
filename = self.hit.getImagePath() filename = self.hit.getImagePath()
logger.info(f"Write to {filename}") logger.info(f"Write to {filename}")
with open(filename, 'w') as fp: with open(filename, 'w') as fp:
fp.write(svg) fp.write(svg)
# we fake a hit.scanned event # we fake a hit.scanned event
self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id})) self.eventQ.put(Signal('hit.scanned', {'hit_id':self.hit.id}))
return self.hit.uuid return self.hit.uuid
@classmethod @classmethod
@ -195,7 +197,7 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
CORS_ORIGINS = ['localhost'] CORS_ORIGINS = ['localhost']
connections = set() connections = set()
queue = queue.Queue() queue = queue.Queue()
def initialize(self, statusPage): def initialize(self, statusPage):
self.statusPage = statusPage self.statusPage = statusPage
pass pass
@ -214,19 +216,19 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
'property': prop, 'property': prop,
'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value 'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value
})) }))
# client disconnected # client disconnected
def on_close(self): def on_close(self):
self.__class__.rmConnection(self) self.__class__.rmConnection(self)
logger.info(f"Client disconnected: {self.request.remote_ip}") logger.info(f"Client disconnected: {self.request.remote_ip}")
@classmethod @classmethod
def rmConnection(cls, client): def rmConnection(cls, client):
if client not in cls.connections: if client not in cls.connections:
return return
cls.connections.remove(client) cls.connections.remove(client)
@classmethod @classmethod
def update_for_all(cls, prop, value): def update_for_all(cls, prop, value):
logger.debug(f"update for all {prop} {value}") logger.debug(f"update for all {prop} {value}")
@ -235,7 +237,7 @@ class StatusWebSocketHandler(tornado.websocket.WebSocketHandler):
'property': prop, 'property': prop,
'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value 'value': value.isoformat(timespec='seconds') if type(value) is datetime.datetime else value
})) }))
def strokes2D(strokes): def strokes2D(strokes):
# strokes to a d attribute for a path # strokes to a d attribute for a path
d = ""; d = "";
@ -252,7 +254,7 @@ def strokes2D(strokes):
elif cmd != 'l': elif cmd != 'l':
d+=' l ' d+=' l '
cmd = 'l' cmd = 'l'
rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]]; rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]];
d += f"{rel_stroke[0]},{rel_stroke[1]} " d += f"{rel_stroke[0]},{rel_stroke[1]} "
last_stroke = stroke; last_stroke = stroke;
@ -270,7 +272,7 @@ class DrawPageHandler(tornado.web.RequestHandler):
self.left_padding = left_padding self.left_padding = left_padding
self.eventQ = eventQ self.eventQ = eventQ
self.geoip_reader = geoip_reader self.geoip_reader = geoip_reader
def get(self): def get(self):
try: try:
hit_id = int(self.get_query_argument('id')) hit_id = int(self.get_query_argument('id'))
@ -284,15 +286,15 @@ class DrawPageHandler(tornado.web.RequestHandler):
if hit.submit_page_at: if hit.submit_page_at:
self.write("HIT already submitted") self.write("HIT already submitted")
return return
assignmentId = self.get_query_argument('assignmentId', '') assignmentId = self.get_query_argument('assignmentId', '')
if len(assignmentId) < 1: if len(assignmentId) < 1:
logger.critical("Accessing page without assignment id. Allowing it for debug purposes... fingers crossed?") logger.critical("Accessing page without assignment id. Allowing it for debug purposes... fingers crossed?")
previewOnly = False previewOnly = False
if assignmentId == 'ASSIGNMENT_ID_NOT_AVAILABLE': if assignmentId == 'ASSIGNMENT_ID_NOT_AVAILABLE':
previewOnly = True previewOnly = True
previous_hit = self.store.getLastSubmittedHit() previous_hit = self.store.getLastSubmittedHit()
if not previous_hit: if not previous_hit:
# start with basic svg # start with basic svg
@ -301,7 +303,7 @@ class DrawPageHandler(tornado.web.RequestHandler):
else: else:
image = previous_hit.getSvgImageUrl() image = previous_hit.getSvgImageUrl()
logger.info(f"Image url: {image}") logger.info(f"Image url: {image}")
self.set_header("Access-Control-Allow-Origin", "*") self.set_header("Access-Control-Allow-Origin", "*")
contents = open(os.path.join(self.path, 'index.html'), 'r').read() contents = open(os.path.join(self.path, 'index.html'), 'r').read()
contents = contents.replace("{IMAGE_URL}", image)\ contents = contents.replace("{IMAGE_URL}", image)\
@ -313,34 +315,42 @@ class DrawPageHandler(tornado.web.RequestHandler):
.replace("{LEFT_PADDING}", str(self.left_padding))\ .replace("{LEFT_PADDING}", str(self.left_padding))\
.replace("{SCRIPT}", '' if previewOnly else '<script type="text/javascript" src="/assignment.js"></script>')\ .replace("{SCRIPT}", '' if previewOnly else '<script type="text/javascript" src="/assignment.js"></script>')\
.replace("{ASSIGNMENT}", '' if previewOnly else str(assignmentId)) # TODO: fix unsafe inserting of GET variable .replace("{ASSIGNMENT}", '' if previewOnly else str(assignmentId)) # TODO: fix unsafe inserting of GET variable
self.write(contents) self.write(contents)
if 'X-Forwarded-For' in self.request.headers: if 'X-Forwarded-For' in self.request.headers:
ip = self.request.headers['X-Forwarded-For'] ip = self.request.headers['X-Forwarded-For']
else: else:
ip = self.request.remote_ip ip = self.request.remote_ip
logger.info(f"Request from {ip}") logger.info(f"Request from {ip}")
if not previewOnly: if not previewOnly:
self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, ip=ip))) self.eventQ.put(Signal('hit.assignment', dict(
hit_id=hit.id, ip=ip, assignment_id=assignmentId
)))
# self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, ip=ip)))
try: try:
geoip = self.geoip_reader.country(ip) geoip = self.geoip_reader.country(ip)
logger.debug(f"Geo {geoip}") logger.debug(f"Geo {geoip}")
self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, location=geoip.country.name))) self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, location=geoip.country.name)))
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
logger.info("No geo IP possible") logger.info("No geo IP possible")
self.eventQ.put(Signal('hit.info', dict(hit_id=hit.id, location='Unknown'))) self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, location='Unknown')))
ua = self.request.headers.get('User-Agent', None)
if ua:
ua_info = httpagentparser.detect(ua)
self.eventQ.put(Signal('assignment.info', dict(assignment_id=assignmentId, os=ua_info['os']['name'], browser=ua_info['browser']['name'])))
class BackendHandler(tornado.web.RequestHandler): class BackendHandler(tornado.web.RequestHandler):
def initialize(self, store: HITStore, path: str): def initialize(self, store: HITStore, path: str):
self.store = store self.store = store
self.path = path self.path = path
def get(self): def get(self):
rows = [] rows = []
for hit in self.store.getHITs(100): for hit in self.store.getHITs(100):
@ -351,9 +361,9 @@ class BackendHandler(tornado.web.RequestHandler):
duration = (f"{duration_m}m" if duration_m else "") + f"{duration_s:02d}s" duration = (f"{duration_m}m" if duration_m else "") + f"{duration_s:02d}s"
else: else:
duration = "-" duration = "-"
fee = f"${hit.fee:.2}" if hit.fee else "-" fee = f"${hit.fee:.2}" if hit.fee else "-"
rows.append( rows.append(
f""" f"""
<tr><td></td><td>{hit.worker_id}</td> <tr><td></td><td>{hit.worker_id}</td>
@ -363,8 +373,8 @@ class BackendHandler(tornado.web.RequestHandler):
<td>{hit.accept_time}</td> <td>{hit.accept_time}</td>
<td>{duration}</td><td></td> <td>{duration}</td><td></td>
""" """
) )
contents = open(os.path.join(self.path, 'backend.html'), 'r').read() contents = open(os.path.join(self.path, 'backend.html'), 'r').read()
contents = contents.replace("{{TBODY}}", "".join(rows)) contents = contents.replace("{{TBODY}}", "".join(rows))
self.write(contents) self.write(contents)
@ -376,7 +386,7 @@ class StatusPage():
""" """
def __init__(self): def __init__(self):
self.reset() self.reset()
def reset(self): def reset(self):
logger.info("Resetting status") logger.info("Resetting status")
self.hit_id = None self.hit_id = None
@ -391,7 +401,7 @@ class StatusPage():
self.hit_created = None self.hit_created = None
self.hit_opened = None self.hit_opened = None
self.hit_submitted = None self.hit_submitted = None
def clearAssignment(self): def clearAssignment(self):
logger.info("Resetting hit assignment") logger.info("Resetting hit assignment")
self.worker_id = None self.worker_id = None
@ -401,54 +411,56 @@ class StatusPage():
self.os = None self.os = None
self.resolution = None self.resolution = None
self.hit_created = None self.hit_created = None
def __setattr__(self, name, value): def __setattr__(self, name, value):
if name in self.__dict__ and self.__dict__[name] == value: if name in self.__dict__ and self.__dict__[name] == value:
logger.debug(f"Ignore setting status of {name}: it already is set to {value}") logger.debug(f"Ignore setting status of {name}: it already is set to {value}")
return return
self.__dict__[name] =value self.__dict__[name] =value
logger.info(f"Update status: {name}: {value}") logger.info(f"Update status: {name}: {value}")
if Server.loop: if Server.loop:
Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, name, value) Server.loop.asyncio_loop.call_soon_threadsafe(StatusWebSocketHandler.update_for_all, name, value)
else: else:
logger.warn("Status: no server loop to call update command") logger.warn("Status: no server loop to call update command")
def set(self, name, value): def set(self, name, value):
return self.__setattr__(name, value) return self.__setattr__(name, value)
class Server: class Server:
""" """
Server for HIT -> plotter events Server for HIT -> plotter events
As well as for the Status interface As well as for the Status interface
TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image
""" """
loop = None loop = None
def __init__(self, config, eventQ: Queue, runningEvent: Event, plotterQ: Queue, store: HITStore): def __init__(self, config, eventQ: Queue, runningEvent: Event, plotterQ: Queue, store: HITStore):
self.isRunning = runningEvent self.isRunning = runningEvent
self.eventQ = eventQ self.eventQ = eventQ
self.config = config self.config = config
self.logger = logger self.logger = logger
self.plotterQ = plotterQ # communicate directly to plotter (skip main thread) self.plotterQ = plotterQ # communicate directly to plotter (skip main thread)
#self.config['server']['port'] #self.config['server']['port']
self.web_root = os.path.join('www') self.web_root = os.path.join('www')
self.server_loop = None self.server_loop = None
self.store = store self.store = store
self.statusPage = StatusPage() self.statusPage = StatusPage()
def start(self): def start(self):
if not os.path.exists('GeoLite2-Country.mmdb'): if not os.path.exists('GeoLite2-Country.mmdb'):
raise Exception("Please download the GeoLite2 Country database and place the 'GeoLite2-Country.mmdb' file in the project root.") raise Exception("Please download the GeoLite2 Country database and place the 'GeoLite2-Country.mmdb' file in the project root.")
self.geoip_reader = geoip2.database.Reader('GeoLite2-Country.mmdb') self.geoip_reader = geoip2.database.Reader('GeoLite2-Country.mmdb')
try: try:
asyncio.set_event_loop(asyncio.new_event_loop()) asyncio.set_event_loop(asyncio.new_event_loop())
application = tornado.web.Application([ application = tornado.web.Application([
@ -462,7 +474,7 @@ class Server:
(r"/draw", DrawPageHandler, (r"/draw", DrawPageHandler,
dict( dict(
store = self.store, store = self.store,
eventQ = self.eventQ, eventQ = self.eventQ,
path=self.web_root, path=self.web_root,
width=self.config['scanner']['width'], width=self.config['scanner']['width'],
height=self.config['scanner']['height'], height=self.config['scanner']['height'],
@ -488,12 +500,12 @@ class Server:
finally: finally:
self.logger.info("Stopping webserver") self.logger.info("Stopping webserver")
self.isRunning.clear() self.isRunning.clear()
def stop(self): def stop(self):
if self.server_loop: if self.server_loop:
self.logger.debug("Got call to stop") self.logger.debug("Got call to stop")
self.server_loop.asyncio_loop.call_soon_threadsafe(self._stop) self.server_loop.asyncio_loop.call_soon_threadsafe(self._stop)
def _stop(self): def _stop(self):
self.server_loop.stop() self.server_loop.stop()