""" The panopticon provides a way to observe (& control) all running Hugveys trough a web interface """ import logging import tornado import string import random import tornado.websocket import tornado.web import tornado.ioloop import os from pytz.reference import Central import asyncio import json from urllib.parse import urlparse from hugvey import central_command from hugvey.voice import VoiceStorage from multiprocessing import Queue import threading mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("panopticon") web_dir = os.path.join(os.path.split(__file__)[0], '..', 'www') def getWebSocketHandler(central_command): class WebSocketHandler(tornado.websocket.WebSocketHandler): CORS_ORIGINS = ['localhost', 'hugveycmd.local', 'r3.local','192.168.1.155'] connections = set() selections = {} def check_origin(self, origin): parsed_origin = urlparse(origin) # parsed_origin.netloc.lower() gives localhost:3333 valid = parsed_origin.hostname in self.CORS_ORIGINS return valid # the client connected def open(self, p = None): WebSocketHandler.connections.add(self) logger.info("New client connected") self.write_message("hello!") # the client sent the message def on_message(self, message): logger.debug(f"recieve: {message}") try: msg = json.loads(message) if msg['action'] == 'init': self.msgInit() elif msg['action'] == 'selection': self.selectHugvey(msg['selected_id']) # elif msg['action'] == 'get_status': # self.msgStatus(msg['selected_id']) elif msg['action'] == 'block': self.msgBlock(msg['hugvey']) elif msg['action'] == 'unblock': self.msgUnblock(msg['hugvey']) elif msg['action'] == 'resume': self.msgResume(msg['hugvey']) elif msg['action'] == 'pause': self.msgPause(msg['hugvey']) elif msg['action'] == 'restart': self.msgRestart(msg['hugvey']) elif msg['action'] == 'finish': self.msgFinish(msg['hugvey']) elif msg['action'] == 'change_language': self.msgChangeLanguage(msg['hugvey'], msg['lang_code']) elif msg['action'] == 'change_language_for_available': self.msgChangeLanguageForAllAvailableHugveys(msg['lang_code']) elif msg['action'] == 'change_light': self.msgChangeLightId(msg['hugvey'], int(msg['light_id'])) elif msg['action'] == 'change_light_status': self.msgChangeLightStatus(msg['hugvey'], bool(msg['light_status'])) elif msg['action'] == 'play_msg': self.msgPlayMsg(msg['hugvey'], msg['msg_id'], msg['reloadStory']) elif msg['action'] == 'loop_time': self.msgSetLoopTime(msg['time']) else: # self.send({'alert': 'Unknown request: {}'.format(message)}) logger.warn('Unknown request: {}'.format(message)) except Exception as e: # self.send({'alert': 'Invalid request: {}'.format(e)}) logger.exception(e) def send(self, message): # Possible useless method: use self.write_message() j = json.dumps(message) self.write_message(j) # client disconnected def on_close(self): self.__class__.rmConnection(self) logger.info("Client disconnected") @classmethod def getStatusMsg(cls): msg = central_command.getStatusSummary(cls.selections.values()) msg['action'] = 'status' return msg @classmethod def broadcastStatus(cls): cls.write_to_clients(cls.getStatusMsg()) def msgInit(self): msg = self.getStatusMsg() self.send(msg) def selectHugvey(self, hv_id): if hv_id is None: self.__class__.selections.pop(self, None) else: self.__class__.selections[self] = int(hv_id) def msgBlock(self, hv_id): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'block'}) def msgUnblock(self, hv_id): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'unblock'}) def msgResume(self, hv_id): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'resume'}) def msgPause(self, hv_id): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'pause'}) def msgRestart(self, hv_id): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'restart'}) def msgFinish(self, hv_id): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'finish'}) def msgChangeLanguage(self, hv_id, lang_code): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'change_language', 'lang_code': lang_code}) def msgChangeLanguageForAllAvailableHugveys(self, lang_code): """ Set language for all future hugvey runs (so after a 'finish') """ central_command.setFutureLanguage(lang_code) def msgSetLoopTime(self, loop_time): parts = loop_time.split(":") if len(parts) == 2: h, m = parts s = 0 elif len(parts) == 3: h, m, s = parts else: logger.critical(f"Invalid time to set: {loop_time}") return seconds = int(h)*3600+int(m)*60+int(s) central_command.setLoopTime(seconds) def msgChangeLightId(self, hv_id, lightId): central_command.setLightForHugvey(hv_id, lightId) # if central_command.hugveys[hv_id].eventQueue: # central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'change_light', 'light_id': lightId}) def msgChangeLightStatus(self, hv_id, status): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'change_light_status', 'status': status}) def msgPlayMsg(self, hv_id, msg_id, reloadStory): if central_command.hugveys[hv_id].eventQueue: central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'play_msg', 'msg_id': msg_id, 'reloadStory':bool(reloadStory)}) @classmethod def rmConnection(cls, client): if client not in cls.connections: return if client in cls.selections: cls.selections.pop(client, None) cls.connections.remove(client) @classmethod def write_to_clients(wsHandlerClass, msg): if msg is None: logger.critical("Tried to send 'none' to Panopticon") return toRemove = [] for client in wsHandlerClass.connections: try: client.write_message(msg) except tornado.websocket.WebSocketClosedError as e: logger.warning(f"Not properly closed websocket connection") toRemove.append(client) # If we remove it here from the set we get an exception about changing set size during iteration for client in toRemove: wsHandlerClass.rmConnection(client) return WebSocketHandler class NonCachingStaticFileHandler(tornado.web.StaticFileHandler): def set_extra_headers(self, path): # Disable cache self.set_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0') def getUploadHandler(central_command): class UploadHandler(tornado.web.RequestHandler): def set_default_headers(self): # headers for CORS self.set_header("Access-Control-Allow-Origin", "*") self.set_header("Access-Control-Allow-Headers", "x-requested-with") self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS') def options(self): # OPTIONS request for CORS self.set_status(204) self.finish() def post(self): logger.info('upload') langCode = self.get_argument("language") langFile = os.path.join(central_command.config['web']['files_dir'] , central_command.languageFiles[langCode]) storyData = json.loads(self.request.files['json'][0]['body']) # print(json.dumps(storyData)) # self.finish() # return if 'audio' in self.request.files: msgId = self.get_argument("message_id") audioFile = self.request.files['audio'][0] original_fname = audioFile['filename'] fname = ''.join(random.choice(string.ascii_lowercase + string.digits) for x in range(10)) ext = os.path.splitext(original_fname)[1] audioFilename = os.path.join(central_command.config['web']['files_dir'], langCode, fname + ext) for i, data in enumerate(storyData): if data['@id'] != msgId: continue if 'audio' in storyData[i] and storyData[i]['audio'] is not None and os.path.exists(storyData[i]['audio']['file']): logger.info(f"Remove previous file {storyData[i]['audio']['file']} ({storyData[i]['audio']['original_name']})") os.unlink(storyData[i]['audio']['file']) storyData[i]['audio'] = { 'file': audioFilename, 'original_name': original_fname } with open(audioFilename, 'wb') as fp: logger.info(f'Save {original_fname} to {audioFilename}') fp.write(audioFile['body']) break # logger.info(os.path.abspath(langFile)) langFile = os.path.abspath(langFile) with open(langFile, 'w') as json_fp: logger.info(f'Save story to {langFile} {json_fp}') json.dump(storyData, json_fp, indent=2) # Reload language files for new instances central_command.loadLanguages() self.finish() return UploadHandler def getVoiceHandler(voiceStorage): class VoiceHandler(tornado.web.RequestHandler): async def get(self): # TODO: we should be using ZMQ here... text = self.get_argument('text') lang_code = self.get_argument('lang') isVariable = True if int(self.get_argument('variable')) >0 else False # TODO: make zmq socket request/reply pattern: fn = await voiceStorage.requestFile(lang_code, text, isVariable) if not fn: raise Exception(f"No Filename for text: {text}") if int(self.get_argument('filename')) == 1: self.set_header("Content-Type","text/plain") self.write(fn) else: self.set_header("Content-Type","audio/wave") with open(fn, 'rb') as fp: self.write(fp.read()) self.finish() return VoiceHandler class InfoHandler(tornado.web.RequestHandler): def initialize(self, command): self.command = command async def get(self): self.write(json.dumps(self.command.hugvey_ids)) class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler): def set_extra_headers(self, path): """For subclass to add extra headers to the response""" if path[-5:] == '.html': self.set_header("Access-Control-Allow-Origin", "*") class Panopticon(object): def __init__(self, central_command, config, voiceStorage): self.command = central_command self.config = config self.voiceStorage = voiceStorage self.wsHandler = getWebSocketHandler(self.command) self.application = tornado.web.Application([ (r"/ws(.*)", self.wsHandler), (r"/local/(.*)", NonCachingStaticFileHandler, {"path": config['web']['files_dir']}), (r"/upload", getUploadHandler(self.command)), (r"/voice", getVoiceHandler(self.voiceStorage)), (r"/count", InfoHandler, {'command': self.command}), (r"/(.*)", StaticFileWithHeaderHandler, {"path": web_dir, "default_filename": 'index.html'}), ], debug=True) # self.loop.configure(evt_loop) def start(self): evt_loop = asyncio.new_event_loop() asyncio.set_event_loop(evt_loop) self.loop = tornado.ioloop.IOLoop.current() self.application.listen(self.config['web']['port']) thread = threading.Thread( target=self.broadcastLoggingQueueToWs, kwargs={'wsHandler': self.wsHandler, 'q': self.command.logQueue}, name=f"panopticon/logws") thread.start() task = evt_loop.create_task( self.statusSender(self.wsHandler)) logger.info(f"Start Panopticon on http://localhost:{self.config['web']['port']}") self.loop.start() def stop(self): self.loop.stop() async def statusSender(self, wsHandler): while True: try: self.wsHandler.broadcastStatus() await asyncio.sleep(3) except Exception as e: logger.exception(e) def broadcastLoggingQueueToWs(self, wsHandler, q: Queue): while True: record = q.get() # record: logging.LogRecord assert isinstance(record, logging.LogRecord) hugvey_id = record.name.split('.')[1] items = record.msg.split(':', 2) msg = {'action': 'log', 'id':hugvey_id, 'type': items[0], 'lvl':record.levelno, 'lvlname':record.levelname} if len(items) > 1: msg['info'] = items[1] if len(items) > 2: msg['args'] = items[2] j = json.dumps(msg) logger.debug(j) self.loop.add_callback(wsHandler.write_to_clients, j)