hugvey/hugvey/panopticon.py

330 lines
13 KiB
Python

"""
The panopticon provides a way to observe (& control) all running Hugveys trough a web interface
"""
import logging
import tornado
import string
import random
import tornado.websocket
import tornado.web
import tornado.ioloop
import os
from pytz.reference import Central
import asyncio
import json
from urllib.parse import urlparse
from hugvey import central_command
from hugvey.voice import VoiceStorage
from multiprocessing import Queue
import threading
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("panopticon")
web_dir = os.path.join(os.path.split(__file__)[0], '..', 'www')
def getWebSocketHandler(central_command):
class WebSocketHandler(tornado.websocket.WebSocketHandler):
CORS_ORIGINS = ['localhost', 'hugveycmd.local']
connections = set()
selections = {}
def check_origin(self, origin):
parsed_origin = urlparse(origin)
# parsed_origin.netloc.lower() gives localhost:3333
valid = parsed_origin.hostname in self.CORS_ORIGINS
return valid
# the client connected
def open(self, p = None):
WebSocketHandler.connections.add(self)
logger.info("New client connected")
self.write_message("hello!")
# the client sent the message
def on_message(self, message):
logger.debug(f"recieve: {message}")
try:
msg = json.loads(message)
if msg['action'] == 'init':
self.msgInit()
elif msg['action'] == 'selection':
self.selectHugvey(msg['selected_id'])
# elif msg['action'] == 'get_status':
# self.msgStatus(msg['selected_id'])
elif msg['action'] == 'block':
self.msgBlock(msg['hugvey'])
elif msg['action'] == 'unblock':
self.msgUnblock(msg['hugvey'])
elif msg['action'] == 'resume':
self.msgResume(msg['hugvey'])
elif msg['action'] == 'pause':
self.msgPause(msg['hugvey'])
elif msg['action'] == 'restart':
self.msgRestart(msg['hugvey'])
elif msg['action'] == 'finish':
self.msgFinish(msg['hugvey'])
elif msg['action'] == 'change_language':
self.msgChangeLanguage(msg['hugvey'], msg['lang_code'])
elif msg['action'] == 'change_light':
self.msgChangeLightId(msg['hugvey'], int(msg['light_id']))
elif msg['action'] == 'change_light_status':
self.msgChangeLightStatus(msg['hugvey'], bool(msg['light_status']))
elif msg['action'] == 'play_msg':
self.msgPlayMsg(msg['hugvey'], msg['msg_id'], msg['reloadStory'])
else:
# self.send({'alert': 'Unknown request: {}'.format(message)})
logger.warn('Unknown request: {}'.format(message))
except Exception as e:
# self.send({'alert': 'Invalid request: {}'.format(e)})
logger.exception(e)
def send(self, message):
# Possible useless method: use self.write_message()
j = json.dumps(message)
self.write_message(j)
# client disconnected
def on_close(self):
WebSocketHandler.rmConnection(self)
logger.info("Client disconnected")
@classmethod
def getStatusMsg(cls):
msg = central_command.getStatusSummary(cls.selections.values())
msg['action'] = 'status'
return msg
@classmethod
def broadcastStatus(cls):
cls.write_to_clients(cls.getStatusMsg())
def msgInit(self):
msg = self.getStatusMsg()
self.send(msg)
def selectHugvey(self, hv_id):
if hv_id is None:
self.__class__.selections.pop(self, None)
else:
self.__class__.selections[self] = int(hv_id)
def msgBlock(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'block'})
def msgUnblock(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'unblock'})
def msgResume(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'resume'})
def msgPause(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'pause'})
def msgRestart(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'restart'})
def msgFinish(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'finish'})
def msgChangeLanguage(self, hv_id, lang_code):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'change_language', 'lang_code': lang_code})
def msgChangeLightId(self, hv_id, lightId):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'change_light', 'light_id': lightId})
def msgChangeLightStatus(self, hv_id, status):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'change_light_status', 'status': status})
def msgPlayMsg(self, hv_id, msg_id, reloadStory):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'play_msg', 'msg_id': msg_id, 'reloadStory':bool(reloadStory)})
@classmethod
def rmConnection(cls, client):
if client not in cls.connections:
return
if client in cls.selections:
cls.selections.pop(client, None)
cls.connections.remove(client)
@classmethod
def write_to_clients(wsHandlerClass, msg):
if msg is None:
logger.critical("Tried to send 'none' to Panopticon")
return
toRemove = []
for client in wsHandlerClass.connections:
try:
client.write_message(msg)
except tornado.websocket.WebSocketClosedError as e:
logger.warning(f"Not properly closed websocket connection")
toRemove.append(client) # If we remove it here from the set we get an exception about changing set size during iteration
for client in toRemove:
rmConnection(client)
return WebSocketHandler
class NonCachingStaticFileHandler(tornado.web.StaticFileHandler):
def set_extra_headers(self, path):
# Disable cache
self.set_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
def getUploadHandler(central_command):
class UploadHandler(tornado.web.RequestHandler):
def set_default_headers(self):
# headers for CORS
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
def options(self):
# OPTIONS request for CORS
self.set_status(204)
self.finish()
def post(self):
logger.info('upload')
langCode = self.get_argument("language")
langFile = os.path.join(central_command.config['web']['files_dir'] , central_command.languageFiles[langCode])
storyData = json.loads(self.request.files['json'][0]['body'])
# print(json.dumps(storyData))
# self.finish()
# return
if 'audio' in self.request.files:
msgId = self.get_argument("message_id")
audioFile = self.request.files['audio'][0]
original_fname = audioFile['filename']
fname = ''.join(random.choice(string.ascii_lowercase + string.digits) for x in range(10))
ext = os.path.splitext(original_fname)[1]
audioFilename = os.path.join(central_command.config['web']['files_dir'], langCode, fname + ext)
for i, data in enumerate(storyData):
if data['@id'] != msgId:
continue
if 'audio' in storyData[i] and storyData[i]['audio'] is not None and os.path.exists(storyData[i]['audio']['file']):
logger.info(f"Remove previous file {storyData[i]['audio']['file']} ({storyData[i]['audio']['original_name']})")
os.unlink(storyData[i]['audio']['file'])
storyData[i]['audio'] = {
'file': audioFilename,
'original_name': original_fname
}
with open(audioFilename, 'wb') as fp:
logger.info(f'Save {original_fname} to {audioFilename}')
fp.write(audioFile['body'])
break
# logger.info(os.path.abspath(langFile))
langFile = os.path.abspath(langFile)
with open(langFile, 'w') as json_fp:
logger.info(f'Save story to {langFile} {json_fp}')
json.dump(storyData, json_fp, indent=2)
# Reload language files for new instances
central_command.loadLanguages()
self.finish()
return UploadHandler
def getVoiceHandler(voiceStorage):
class VoiceHandler(tornado.web.RequestHandler):
async def get(self):
# TODO: we should be using ZMQ here...
text = self.get_argument('text')
lang_code = self.get_argument('lang')
isVariable = True if int(self.get_argument('variable')) >0 else False
# TODO: make zmq socket request/reply pattern:
fn = await voiceStorage.requestFile(lang_code, text, isVariable)
if not fn:
raise Exception(f"No Filename for text: {text}")
if int(self.get_argument('filename')) == 1:
self.set_header("Content-Type","text/plain")
self.write(fn)
else:
self.set_header("Content-Type","audio/wave")
with open(fn, 'rb') as fp:
self.write(fp.read())
self.finish()
return VoiceHandler
class StaticFileWithHeaderHandler(tornado.web.StaticFileHandler):
def set_extra_headers(self, path):
"""For subclass to add extra headers to the response"""
if path[-5:] == '.html':
self.set_header("Access-Control-Allow-Origin", "*")
class Panopticon(object):
def __init__(self, central_command, config, voiceStorage):
self.command = central_command
self.config = config
self.voiceStorage = voiceStorage
self.wsHandler = getWebSocketHandler(self.command)
self.application = tornado.web.Application([
(r"/ws(.*)", self.wsHandler),
(r"/local/(.*)", NonCachingStaticFileHandler,
{"path": config['web']['files_dir']}),
(r"/upload", getUploadHandler(self.command)),
(r"/voice", getVoiceHandler(self.voiceStorage)),
(r"/(.*)", StaticFileWithHeaderHandler,
{"path": web_dir, "default_filename": 'index.html'}),
], debug=True)
self.application.listen(config['web']['port'])
# self.loop.configure(evt_loop)
def start(self):
evt_loop = asyncio.new_event_loop()
asyncio.set_event_loop(evt_loop)
self.loop = tornado.ioloop.IOLoop.current()
thread = threading.Thread(
target=self.broadcastLoggingQueueToWs, kwargs={'wsHandler': self.wsHandler, 'q': self.command.logQueue}, name=f"panopticon/logws")
thread.start()
task = evt_loop.create_task(
self.statusSender(self.wsHandler))
logger.info(f"Start Panopticon on http://localhost:{self.config['web']['port']}")
self.loop.start()
def stop(self):
self.loop.stop()
async def statusSender(self, wsHandler):
while True:
try:
self.wsHandler.broadcastStatus()
await asyncio.sleep(3)
except Exception as e:
logger.exception(e)
def broadcastLoggingQueueToWs(self, wsHandler, q: Queue):
while True:
record = q.get()
# record: logging.LogRecord
assert isinstance(record, logging.LogRecord)
hugvey_id = record.name.split('.')[1]
items = record.msg.split(':', 2)
msg = {'action': 'log', 'id':hugvey_id, 'type': items[0], 'lvl':record.levelno, 'lvlname':record.levelname}
if len(items) > 1:
msg['info'] = items[1]
if len(items) > 2:
msg['args'] = items[2]
j = json.dumps(msg)
logger.debug(j)
self.loop.add_callback(wsHandler.write_to_clients, j)