diff --git a/Pipfile b/Pipfile index 915b0f7..4ff9cb1 100644 --- a/Pipfile +++ b/Pipfile @@ -6,6 +6,9 @@ name = "pypi" [packages] tornado = "*" coloredlogs = "*" +boto3 = "*" +PyYAML = "*" +SQLAlchemy = "*" [dev-packages] diff --git a/Pipfile.lock b/Pipfile.lock index 65780ae..f2d5668 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "58dec9ae9b9fda6e3430fe601c100edb07debda39e6f5530e0c8a5c3286938cf" + "sha256": "2ecd89c76c2fb319746f9616100668ec8f556d502ec72fe2644bc5b491307fa7" }, "pipfile-spec": 6, "requires": { @@ -19,6 +19,21 @@ "a209065": { "path": "./../../AxiDraw_API_v253r3" }, + "boto3": { + "hashes": [ + "sha256:839285fbd6f3ab16170af449ae9e33d0eccf97ca22de17d9ff68b8da2310ea06", + "sha256:d93f1774c4bc66e02acdda2067291acb9e228a035435753cb75f83ad2904cbe3" + ], + "index": "pypi", + "version": "==1.9.253" + }, + "botocore": { + "hashes": [ + "sha256:3baf129118575602ada9926f5166d82d02273c250d0feb313fc270944b27c48b", + "sha256:dc080aed4f9b220a9e916ca29ca97a9d37e8e1d296fe89cbaeef929bf0c8066b" + ], + "version": "==1.12.253" + }, "coloredlogs": { "hashes": [ "sha256:34fad2e342d5a559c31b6c889e8d14f97cb62c47d9a2ae7b5ed14ea10a79eff8", @@ -27,6 +42,14 @@ "index": "pypi", "version": "==10.0" }, + "docutils": { + "hashes": [ + "sha256:6c4f696463b79f1fb8ba0c594b63840ebd41f059e92b31957c46b74a4599b6d0", + "sha256:9e4d7ecfc600058e07ba661411a2b7de2fd0fafa17d1a7f7361cd47b1175c827", + "sha256:a2aeea129088da402665e92e0b25b04b073c04b2dce4ab65caaa38b7ce2e1a99" + ], + "version": "==0.15.2" + }, "humanfriendly": { "hashes": [ "sha256:23057b10ad6f782e7bc3a20e3cb6768ab919f619bbdc0dd75691121bbde5591d", @@ -34,6 +57,61 @@ ], "version": "==4.18" }, + "jmespath": { + "hashes": [ + "sha256:3720a4b1bd659dd2eecad0666459b9788813e032b83e7ba58578e48254e0a0e6", + "sha256:bde2aef6f44302dfb30320115b17d030798de8c4110e28d5cf6cf91a7a31074c" + ], + "version": "==0.9.4" + }, + "python-dateutil": { + "hashes": [ + "sha256:7e6584c74aeed623791615e26efd690f29817a27c73085b78e4bad02493df2fb", + "sha256:c89805f6f4d64db21ed966fda138f8a5ed7a4fdbc1a8ee329ce1b74e3c74da9e" + ], + "markers": "python_version >= '2.7'", + "version": "==2.8.0" + }, + "pyyaml": { + "hashes": [ + "sha256:0113bc0ec2ad727182326b61326afa3d1d8280ae1122493553fd6f4397f33df9", + "sha256:01adf0b6c6f61bd11af6e10ca52b7d4057dd0be0343eb9283c878cf3af56aee4", + "sha256:5124373960b0b3f4aa7df1707e63e9f109b5263eca5976c66e08b1c552d4eaf8", + "sha256:5ca4f10adbddae56d824b2c09668e91219bb178a1eee1faa56af6f99f11bf696", + "sha256:7907be34ffa3c5a32b60b95f4d95ea25361c951383a894fec31be7252b2b6f34", + "sha256:7ec9b2a4ed5cad025c2278a1e6a19c011c80a3caaac804fd2d329e9cc2c287c9", + "sha256:87ae4c829bb25b9fe99cf71fbb2140c448f534e24c998cc60f39ae4f94396a73", + "sha256:9de9919becc9cc2ff03637872a440195ac4241c80536632fffeb6a1e25a74299", + "sha256:a5a85b10e450c66b49f98846937e8cfca1db3127a9d5d1e31ca45c3d0bef4c5b", + "sha256:b0997827b4f6a7c286c01c5f60384d218dca4ed7d9efa945c3e1aa623d5709ae", + "sha256:b631ef96d3222e62861443cc89d6563ba3eeb816eeb96b2629345ab795e53681", + "sha256:bf47c0607522fdbca6c9e817a6e81b08491de50f3766a7a0e6a5be7905961b41", + "sha256:f81025eddd0327c7d4cfe9b62cf33190e1e736cc6e97502b3ec425f574b3e7a8" + ], + "index": "pypi", + "version": "==5.1.2" + }, + "s3transfer": { + "hashes": [ + "sha256:6efc926738a3cd576c2a79725fed9afde92378aa5c6a957e3af010cb019fac9d", + "sha256:b780f2411b824cb541dbcd2c713d0cb61c7d1bcadae204cdddda2b35cef493ba" + ], + "version": "==0.2.1" + }, + "six": { + "hashes": [ + "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", + "sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73" + ], + "version": "==1.12.0" + }, + "sqlalchemy": { + "hashes": [ + "sha256:0f0768b5db594517e1f5e1572c73d14cf295140756431270d89496dc13d5e46c" + ], + "index": "pypi", + "version": "==1.3.10" + }, "tornado": { "hashes": [ "sha256:349884248c36801afa19e342a77cc4458caca694b0eda633f5878e458a44cb2c", @@ -46,6 +124,14 @@ ], "index": "pypi", "version": "==6.0.3" + }, + "urllib3": { + "hashes": [ + "sha256:3de946ffbed6e6746608990594d08faac602528ac7015ac28d33cee6a45b7398", + "sha256:9a107b99a5393caf59c7aa3c1249c16e6879447533d0887f4336dde834c7be86" + ], + "markers": "python_version >= '3.4'", + "version": "==1.25.6" } }, "develop": {} diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..edbe538 --- /dev/null +++ b/config.yml @@ -0,0 +1,13 @@ +amazon: + user_id: ABCDEFGHIJKJLMNOP + user_secret: "213j234/234sksjdfus83jd" + mturk_sandbox: true + mturk_region: us-east-1 + sqs_url: "https://sqs.eu-west-3.amazonaws.com/60123456789/your_queue" + sqs_region_name: "eu-west-3" + task_xml: "mt_task.xml" +hit_db: store.db +hour_rate_aim: 15 +hit_lifetime: 54000 ;15*60*60 +hit_assignment_duration: 300 ; 5*60 +hit_autoapprove_delay: 3600 \ No newline at end of file diff --git a/server.py b/server_test.py similarity index 100% rename from server.py rename to server_test.py diff --git a/sorteerhoed.py b/sorteerhoed.py new file mode 100644 index 0000000..f251125 --- /dev/null +++ b/sorteerhoed.py @@ -0,0 +1,53 @@ +import argparse +import logging + +import coloredlogs + +from sorteerhoed.central_management import CentralManagement + + +if __name__ == '__main__': + argParser = argparse.ArgumentParser( + description='Start up the Sorteerhoed server') + argParser.add_argument( + '--config', + '-c', + required=True, + type=str, + help='The yaml config file to load' + ) + argParser.add_argument( + '--verbose', + '-v', + action='count', default=0 + ) + + args = argParser.parse_args() + + loglevel = logging.NOTSET if args.verbose > 1 else logging.DEBUG if args.verbose > 0 else logging.INFO + + coloredlogs.install( + level=loglevel, +# default: "%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s" + fmt="%(asctime)s %(hostname)s %(name)s[%(process)d,%(threadName)s] %(levelname)s %(message)s" + ) + + # File logging + formatter = logging.Formatter(fmt='%(asctime)s %(module)s:%(lineno)d %(levelname)8s | %(message)s', + datefmt='%Y/%m/%d %H:%M:%S') # %I:%M:%S %p AM|PM format + logFileHandler = logging.handlers.RotatingFileHandler( + 'mt_server.log', + maxBytes=1024*512, + backupCount=5 + ) + logFileHandler.setFormatter(formatter) + + logger = logging.getLogger("sorteerhoed") + logger.addHandler( + logFileHandler + ) + logger.info("Start server") + + command = CentralManagement(debug_mode=args.verbose > 0) + command.loadConfig(args.config) + command.start() diff --git a/sorteerhoed/HITStore.py b/sorteerhoed/HITStore.py new file mode 100644 index 0000000..a5d8197 --- /dev/null +++ b/sorteerhoed/HITStore.py @@ -0,0 +1,103 @@ +import logging +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import Column, Integer, String, DateTime +from sqlalchemy.orm import relationship +from sqlalchemy.sql.schema import ForeignKey, Sequence +from sqlalchemy.engine import create_engine +from sqlalchemy.orm.session import sessionmaker +import datetime +from contextlib import contextmanager +import uuid +import os +import coloredlogs +import argparse +from sqlalchemy.sql.functions import func + +mainLogger = logging.getLogger("sorteerhoed") +logger = mainLogger.getChild("store") + +Base = declarative_base() + +""" +HIT lifetime: + + +created +accepted +(returned!) +working +awaiting amazon confirmation (submitted on page) +submitted + +Actions: +creating Hit (creating hit with scanned image) + +Scanning + +""" + +class HIT(Base): + __tablename__ = 'hits' + id = Column(Integer, Sequence('hit_id'), primary_key=True) # our sequential hit id + hit_id = Column(String(255)) # amazon's hit id + created_at = Column(DateTime, default=datetime.datetime.now()) + updated_at = Column(DateTime, default=datetime.datetime.now()) + uniqid = Column(String(32), default=uuid.uuid4().hex) + assignment_id = Column(String(255), default = None) + worker_id = Column(String(255), default = None) + accept_time = Column(DateTime, default=None) + open_page_at = Column(DateTime, default=None) + submit_page_at = Column(DateTime, default=None) + submit_hit_at = Column(DateTime, default=None) + answer = Column(String(255), default=None) + turk_ip = Column(String(255), default=None) + turk_country = Column(String(255), default=None) + turk_screen_width = Column(Integer, default = None) + turk_screen_height = Column(Integer, default = None) + + +class Store: + def __init__(self, db_filename, logLevel=0): + path = os.path.abspath(db_filename) + if logLevel <= logging.DEBUG: + logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO) + + self.engine = create_engine('sqlite:///'+path, echo=False) + Base.metadata.create_all(self.engine) + self.Session = sessionmaker(bind=self.engine) + + @contextmanager + def getSession(self): + """Provide a transactional scope around a series of operations.""" + session = self.Session() + try: + yield session + session.commit() + except: + session.rollback() + raise + finally: + session.close() + + def getHits(self, session): + return session.query(Source).order_by(HIT.created_at.desc()) + + def addHIT(self, hit: HIT): + with self.getSession() as s: + s.add(hit) + s.flush() + s.refresh(hit) + logging.info(f"Added {hit.id}") + +# def rmSource(self, id: int): +# with self.getSession() as session: +# source = session.query(Source).get(id) +# if not source: +# logging.warning(f"Source nr {id} not found") +# else: +# logging.info(f"Deleting source {source.id}: {source.url}") +# session.delete(source) +# +# def getRandomNewsItem(self, session) -> NewsItem: +# return session.query(NewsItem).order_by(func.random()).limit(1).first() + diff --git a/sorteerhoed/Signal.py b/sorteerhoed/Signal.py new file mode 100644 index 0000000..f65e676 --- /dev/null +++ b/sorteerhoed/Signal.py @@ -0,0 +1,10 @@ +class Signal: + """ + An event, with possible parameters. + + Named 'signal' to avoid confusion with threading.Event + """ + + def __init__(self, name: str, params: dict): + self.name = name + self.params = params \ No newline at end of file diff --git a/sorteerhoed/__init__.py b/sorteerhoed/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sorteerhoed/central_management.py b/sorteerhoed/central_management.py new file mode 100644 index 0000000..e928f4c --- /dev/null +++ b/sorteerhoed/central_management.py @@ -0,0 +1,184 @@ +import logging +import yaml +from sorteerhoed import HITStore +import os +import subprocess +import boto3 +import threading +from queue import Queue +from sorteerhoed.plotter import Plotter +import queue +from sorteerhoed.sqs import SqsListener +from sorteerhoed.webserver import Server +import time + + +class CentralManagement(): + """ + Central management reads config and controls process flow + + The HIT Store is the archive of hits + mturk thread communicates with mturk + server thread is tornado communicating with the turkers and with the status interface on the installation + Plotter thread reads plotter queue and sends it to there + Scanner is for now a simpe imagescan command + SQS: thread that listens for updates from Amazon + """ + def __init__(self, debug_mode): + self.logger = logging.getLogger("sorteerhoed").getChild('management') + self.debug = debug_mode + self.currentHit = None + + self.eventQueue = Queue() + self.isRunning = threading.Event() + + + def loadConfig(self, filename): + with open(filename, 'r') as fp: + self.logger.debug('Load config from {}'.format(filename)) + self.config = yaml.safe_load(fp) + + varDb = os.path.join( +# self.config['storage_dir'], + 'hit_store.db' + ) + self.store = HITStore.Store(varDb, logLevel=logging.DEBUG if self.debug else logging.INFO) + + self.logger.debug(f"Loaded configuration: {self.config}") +# self.amazon = +# self.server + + +# self.panopticon = Panopticon(self, self.config, self.voiceStorage) + def start(self): + self.isRunning.set() + + try: + + # M-turk connection + MTURK_SANDBOX = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com' + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mturk.html#MTurk.Client + self.mturk = boto3.client('mturk', + aws_access_key_id = self.config['amazon']['user_id'], + aws_secret_access_key = self.config['amazon']['user_secret'], + region_name='us-east-1', + endpoint_url = MTURK_SANDBOX + ) + + self.logger.info(f"Mechanical turk: {self.mturk.get_account_balance()}") + + + + self.sqs = SqsListener(self.config, self.eventQueue, self.isRunning) + sqsThread = threading.Thread(target=self.sqs.start) + sqsThread.start() + + # the plotter itself + self.plotter = Plotter(self.config, self.eventQueue, self.isRunning) + plotterThread = threading.Thread(target=self.plotter.start) + plotterThread.start() + + # webserver for turks and status + self.server = Server(self.config, self.eventQueue, self.isRunning, self.plotter.q) + serverThread = threading.Thread(target=self.server.start) + serverThread.start() + + # event listener: + dispatcherThread = threading.Thread(target=self.eventListener) + dispatcherThread.start() + + self.makeHit() + + finally: + self.isRunning.clear() + self.server.stop() + + + def eventListener(self): + while self.isRunning.is_set(): + try: + signal = self.eventQueue.get(True, 1) + except queue.Empty: + pass +# self.logger.log(5, "Empty queue.") + else: + """ + Possible events: + - SQS events: accept/abandoned/returned/submitted + - webserver events: open, draw, submit + - scan complete + - HIT created + - Plotter complete + - + """ + print(signal) + # handle singals/events: + # TODO: next steps + # TODO: update status + + + def makeHit(self): + self.currentHit = HITStore.HIT() + self.store.addHIT(self.currentHit) + + self.logger(f"Make HIT {self.currentHit.id}") + + question = open(self.config['amazon']['task_xml'], mode='r').read().replace("{HIT_NR}",self.currentHit.id) + new_hit = self.mturk.create_hit( + Title = 'Trace the drawn line', + Description = 'Draw a line over the sketched line in the image', + Keywords = 'polygons, trace, draw', + Reward = '0.15', # TODO: make variable + MaxAssignments = 1, + LifetimeInSeconds = self.config['hit_lifetime'], + AssignmentDurationInSeconds = self.config['hit_assignment_duration'], + AutoApprovalDelayInSeconds = self.config['hit_autoapprove_delay'], + Question = question, + ) + + self.logger.info("Created hit:", new_hit) + self.logger.info("https://workersandbox.mturk.com/mturk/preview?groupId=" + new_hit['HIT']['HITGroupId']) + + self.currentHit.hit_id = new_hit['HIT']['HITId'] + + print(self.currentHit) + + # mturk.send_test_event_notification() + if self.config['amazon']['sqs_url']: + notification_info= self.mturk.update_notification_settings( + HITTypeId=new_hit['HIT']['HITTypeId'], + Notification = { + 'Destination' : self.config['amazon']['sqs_url'], + 'Transport': 'SQS', + 'Version': '2014-08-15', + 'EventTypes': [ + 'AssignmentAccepted', + 'AssignmentAbandoned', + 'AssignmentReturned', + 'AssignmentSubmitted', +# 'AssignmentRejected', + # 'AssignmentApproved', +# 'HITCreated', + # 'HITExpired', + # 'HITReviewable', + # 'HITExtended', + # 'HITDisposed', + # 'Ping', + ] + }, + Active=True + ) + self.logger.debug(notification_info) + + def scanImage(self) -> str: + """ + Run scanimage on scaner and returns a string with the filename + """ + cmd = [ + 'sudo', 'scanimage' + ] + filename = "" + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + o, e = proc.communicate(60) + + exec \ No newline at end of file diff --git a/sorteerhoed/plotter.py b/sorteerhoed/plotter.py new file mode 100644 index 0000000..b7dd08d --- /dev/null +++ b/sorteerhoed/plotter.py @@ -0,0 +1,78 @@ +import queue +import logging +from pyaxidraw import axidraw +from queue import Queue +from threading import Event +from sorteerhoed.Signal import Signal +import time + +class Plotter: + def __init__(self, config, eventQ: Queue, runningEvent: Event): + self.config = config + self.eventQ = eventQ + self.q = Queue() + self.isRunning = runningEvent + self.logger = logging.getLogger("sorteerhoed").getChild("plotter") + + def start(self): + self.axiDrawCueListener() + + def axiDrawCueListener(self): + if self.config['dummy_plotter']: + while self.isRunning.is_set(): + plotterRan = False + try: + move = self.q.get(True, 1) + plotterRan = True + except queue.Empty as e: + self.logger.log(5, "Empty queue.") + if plotterRan: + plotterRan = False + self.eventQ.put(Signal('plotter.finished')) + else: + time.sleep(.05) + self.logging.debug(f'Dummy plotter move: {move}') + self.logger.info("Stopping dummy plotter") + else: + ad = axidraw.AxiDraw() + + ad.interactive() + + connected = ad.connect() + if not connected: + raise Exception("Cannot connect to Axidraw") + try: + ad.options.units = 1 # set to use centimeters instead of inches + ad.options.accel = 100; + ad.options.speed_penup = 100 + ad.options.speed_pendown = 100 + ad.options.model = 2 # A3, set to 1 for A4 + + ad.moveto(0,0) + + plotterWidth = 22 + plotterHeight = 18 # 16? + + plotterRan = False + while self.isRunning.is_set(): + # TODO: set timeout on .get() with catch block, so we can escape if no moves come in + try: + move = self.q.get(True, 1) + plotterRan = True + except queue.Empty as e: + self.logger.log(5, "Empty queue.") + if plotterRan: + plotterRan = False + self.eventQ.put(Signal('plotter.finished')) + else: + ad.moveto(move[0]* plotterWidth, move[1]*plotterHeight) + self.logging.debug(f'handler! {move}') + except Exception as e: + self.logger.exception(e) + finally: + self.logger.warning("Close Axidraw connection") + ad.moveto(0,0) + ad.disconnect() + + # send shutdown signal (if not already set) + self.isRunning.clear() \ No newline at end of file diff --git a/sorteerhoed/sqs.py b/sorteerhoed/sqs.py new file mode 100644 index 0000000..8d4febd --- /dev/null +++ b/sorteerhoed/sqs.py @@ -0,0 +1,49 @@ +import boto3 +from queue import Queue +from threading import Event +import logging +import time +from sorteerhoed.Signal import Signal + +class SqsListener: + def __init__(self, config, eventQ: Queue, runningEvent: Event): + self.isRunning = runningEvent + self.eventQ = eventQ + self.config = config + self.logger = logging.getLogger('sorteerhoed').getChild('sqs') + + def start(self): + # create a boto3 client + sqs = boto3.client('sqs', + aws_access_key_id = self.config['amazon']['user_id'], + aws_secret_access_key = self.config['amazon']['user_secret'], + region_name=self.config['amazon']['sqs_region_name'], + endpoint_url=self.config['amazon']['sqs_endpoint_url'] + ) + + while self.isRunning.is_set(): + messages = sqs.receive_message( + QueueUrl=self.config['amazon']['sqs_url'], + MaxNumberOfMessages=1, + WaitTimeSeconds=20) + if 'Messages' in messages: # when the queue is exhausted, the response dict contains no 'Messages' key + for message in messages['Messages']: # 'Messages' is a list + # process the messages + self.debug(f"received: {message}") + try: + for event in message['Body']['Events']: + self.eventQ.put(Signal( + f"sqs.{event['EventType']}", + {'event': event} + )) + except Exception: + pass + # next, we delete the message from the queue so no one else will process it again + sqs.delete_message( + QueueUrl=self.config['amazon']['sqs_url'], + ReceiptHandle=message['ReceiptHandle'] + ) + else: + self.logger.debug('SQS is empty') + time.sleep(1) + self.logger.info("Stopping SQS") \ No newline at end of file diff --git a/sorteerhoed/webserver.py b/sorteerhoed/webserver.py new file mode 100644 index 0000000..4fdc85e --- /dev/null +++ b/sorteerhoed/webserver.py @@ -0,0 +1,215 @@ +import argparse +import json +import logging +import os +import tornado.ioloop +import tornado.web +import tornado.websocket +from urllib.parse import urlparse +import uuid + +import coloredlogs +import glob + +from pyaxidraw import axidraw # import module +from threading import Thread, Event +from queue import Queue, Empty +import threading +from server_test import generated_image_dir +import asyncio + + +logger = logging.getLogger("sorteerhoed").getChild("webserver") + +class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler): + def set_extra_headers(self, path): + """For subclass to add extra headers to the response""" + if path[-5:] == '.html': + self.set_header("Access-Control-Allow-Origin", "*") + if path[-4:] == '.svg': + self.set_header("Content-Type", "image/svg+xml") + +class WebSocketHandler(tornado.websocket.WebSocketHandler): + CORS_ORIGINS = ['localhost', '.mturk.com', 'here.rubenvandeven.com'] + connections = set() + + def initialize(self, draw_q: Queue, generated_image_dir: str): + self.draw_q = draw_q + self.generated_image_dir = generated_image_dir + + def check_origin(self, origin): + parsed_origin = urlparse(origin) + # parsed_origin.netloc.lower() gives localhost:3333 + valid = any([parsed_origin.hostname.endswith(origin) for origin in self.CORS_ORIGINS]) + return valid + + # the client connected + def open(self, p = None): + self.__class__.connections.add(self) + logger.info(f"New client connected: {self.request.remote_ip}") + self.strokes = [] +# self.write_message("hello!") + + # the client sent the message + def on_message(self, message): + logger.debug(f"recieve: {message}") + try: + msg = json.loads(message) + # TODO: sanitize input: min/max, limit strokes + if msg['action'] == 'move': + # TODO: min/max input + point = [float(msg['direction'][0]),float(msg['direction'][1]), bool(msg['mouse'])] + self.strokes.append(point) + self.draw_q.put(point) + + elif msg['action'] == 'up': + logger.info(f'up: {msg}') + point = [msg['direction'][0],msg['direction'][1], 1] + self.strokes.append(point) + + elif msg['action'] == 'submit': + logger.info(f'up: {msg}') + id = self.submit_strokes() + if not id: + self.write_message(json.dumps('error')) + return + + self.write_message(json.dumps({ + 'action': 'submitted', + 'msg': f"Submission ok, please refer to your submission as: {id}" + })) + elif msg['action'] == 'down': + # not used, implicit in move? + pass + else: + # self.send({'alert': 'Unknown request: {}'.format(message)}) + logger.warn('Unknown request: {}'.format(message)) + + except Exception as e: + # self.send({'alert': 'Invalid request: {}'.format(e)}) + logger.exception(e) + + # client disconnected + def on_close(self): + self.__class__.rmConnection(self) + logger.info(f"Client disconnected: {self.request.remote_ip}") + + def submit_strokes(self): + if len(self.strokes) < 1: + return False + + d = strokes2D(self.strokes) + svg = f""" + + + + """ + + id = uuid.uuid4().hex + + filename = os.path.join(self.generated_image_dir , id+'.svg') + with open(filename, 'w') as fp: + logger.info(f"Wrote {filename}") + fp.write(svg) + + return id + + @classmethod + def rmConnection(cls, client): + if client not in cls.connections: + return + cls.connections.remove(client) + + +class LatestImageHandler(tornado.web.RequestHandler): + + def initialize(self, generated_image_dir: str): + self.generated_image_dir = generated_image_dir + + def get(self): + self.set_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0') + self.set_header("Content-Type", "image/svg+xml") + + list_of_files = glob.glob(os.path.join(self.generated_image_dir,'*.svg')) + latest_file = max(list_of_files, key=os.path.getctime) + with open(latest_file, 'r') as fp: + self.write(fp.read()) + + +def strokes2D(strokes): + # strokes to a d attribute for a path + d = ""; + last_stroke = None; + cmd = ""; + for stroke in strokes: + if not last_stroke: + d += f"M{stroke[0]},{stroke[1]} " + cmd = 'M' + else: + if last_stroke[2] == 1: + d += " m" + cmd = 'm' + elif cmd != 'l': + d+=' l ' + cmd = 'l' + + rel_stroke = [stroke[0] - last_stroke[0], stroke[1] - last_stroke[1]]; + d += f"{rel_stroke[0]},{rel_stroke[1]} " + last_stroke = stroke; + return d; + + + +class Server: + """ + Server for HIT -> plotter events + As well as for the Status interface + + TODO: change to have the HIT_id as param to the page. Load hit from storage with previous image + """ + def __init__(self, config, eventQ: Queue, runningEvent: Event, plotterQ: Queue): + self.isRunning = runningEvent + self.eventQ = eventQ + self.config = config + self.logger = logger + + self.plotterQ = plotterQ # communicate directly to plotter (skip main thread) + + #self.config['server']['port'] + self.generated_image_dir = os.path.join('www','generated') + self.static_file_dir = os.path.join('www') + + self.server_loop = None + + def start(self): + try: + asyncio.set_event_loop(asyncio.new_event_loop()) + application = tornado.web.Application([ + (r"/ws(.*)", WebSocketHandler, {'draw_q': self.plotterQ, 'generated_image_dir': self.generated_image_dir}), + (r"/latest.svg", LatestImageHandler, {'generated_image_dir': self.generated_image_dir}), # TODO: have js request the right image, based on a 'start' button. This way we can trace the history of a drawing + (r"/(.*)", StaticFileWithHeaderHandler, + {"path": self.static_file_dir, "default_filename": 'index.html'}), + ], debug=True, autoreload=False) + application.listen(self.config['server']['port']) + self.server_loop = tornado.ioloop.IOLoop.current() + if self.isRunning.is_set(): + self.server_loop.start() + finally: + self.logger.info("Stopping webserver") + self.isRunning.clear() + + def stop(self): + if self.server_loop: + self.logger.debug("Got call to stop") + self.server_loop.asyncio_loop.call_soon_threadsafe(self._stop) + + + def _stop(self): + self.server_loop.stop()