hugvey/hugvey/panopticon.py

255 lines
9.7 KiB
Python
Raw Normal View History

"""
The panopticon provides a way to observe (& control) all running Hugveys trough a web interface
"""
import logging
import tornado
2019-01-23 21:38:27 +00:00
import string
import random
2019-01-18 18:39:35 +00:00
import tornado.websocket
import tornado.web
import tornado.ioloop
import os
from pytz.reference import Central
import asyncio
import json
2019-01-23 14:26:44 +00:00
from urllib.parse import urlparse
2019-01-23 21:38:27 +00:00
from hugvey import central_command
2019-02-18 19:38:54 +00:00
from hugvey.voice import VoiceStorage
from multiprocessing import Queue
import threading
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("panopticon")
web_dir = os.path.join(os.path.split(__file__)[0], '..', 'www')
2019-01-18 18:39:35 +00:00
def getWebSocketHandler(central_command):
class WebSocketHandler(tornado.websocket.WebSocketHandler):
2019-01-23 14:26:44 +00:00
CORS_ORIGINS = ['localhost']
connections = set()
2019-01-23 14:26:44 +00:00
def check_origin(self, origin):
parsed_origin = urlparse(origin)
# parsed_origin.netloc.lower() gives localhost:3333
valid = parsed_origin.hostname in self.CORS_ORIGINS
return valid
2019-01-18 18:39:35 +00:00
# the client connected
def open(self, p = None):
WebSocketHandler.connections.add(self)
logger.info("New client connected")
self.write_message("hello!")
2019-01-18 18:39:35 +00:00
# the client sent the message
def on_message(self, message):
2019-01-23 14:26:44 +00:00
logger.debug(f"recieve: {message}")
try:
msg = json.loads(message)
if msg['action'] == 'init':
self.msgInit()
2019-01-25 10:59:03 +00:00
elif msg['action'] == 'get_status':
self.msgStatus()
2019-01-25 10:59:03 +00:00
elif msg['action'] == 'resume':
self.msgResume(msg['hugvey'])
2019-01-25 10:59:03 +00:00
elif msg['action'] == 'pause':
self.msgPause(msg['hugvey'])
2019-01-25 10:59:03 +00:00
elif msg['action'] == 'restart':
self.msgRestart(msg['hugvey'])
2019-01-25 10:59:03 +00:00
elif msg['action'] == 'change_language':
self.msgChangeLanguage(msg['hugvey'], msg['lang_code'])
2019-01-25 14:45:46 +00:00
elif msg['action'] == 'play_msg':
self.msgPlayMsg(msg['hugvey'], msg['msg_id'])
2019-01-25 10:59:03 +00:00
else:
self.send({'alert': 'Unknown request: {}'.format(message)})
logger.warn('Unknown request: {}'.format(message))
except Exception as e:
self.send({'alert': 'Invalid request: {}'.format(e)})
logger.exception(e)
def send(self, message):
# Possible useless method: use self.write_message()
j = json.dumps(message)
self.write_message(j)
# client disconnected
def on_close(self):
WebSocketHandler.connections.remove(self)
logger.info("Client disconnected")
def getStatusMsg(self):
msg = central_command.getStatusSummary()
msg['action'] = 'status'
return msg
def msgStatus(self):
self.send(self.getStatusMsg())
def msgInit(self):
msg = self.getStatusMsg()
self.send(msg)
def msgResume(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'resume'})
def msgPause(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'pause'})
def msgRestart(self, hv_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'restart'})
2019-01-25 10:59:03 +00:00
def msgChangeLanguage(self, hv_id, lang_code):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'change_language', 'lang_code': lang_code})
2019-01-25 14:45:46 +00:00
def msgPlayMsg(self, hv_id, msg_id):
central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'play_msg', 'msg_id': msg_id})
@classmethod
def write_to_clients(wsHandlerClass, msg):
for client in wsHandlerClass.connections:
client.write_message(msg)
return WebSocketHandler
2019-01-18 18:39:35 +00:00
2019-01-24 13:27:04 +00:00
class NonCachingStaticFileHandler(tornado.web.StaticFileHandler):
def set_extra_headers(self, path):
# Disable cache
self.set_header('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0')
2019-01-23 21:38:27 +00:00
def getUploadHandler(central_command):
class UploadHandler(tornado.web.RequestHandler):
2019-01-24 14:01:01 +00:00
def set_default_headers(self):
# headers for CORS
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS')
def options(self):
# OPTIONS request for CORS
self.set_status(204)
self.finish()
2019-01-23 21:38:27 +00:00
def post(self):
print('upload')
langCode = self.get_argument("language")
langFile = os.path.join(central_command.config['web']['files_dir'] , central_command.languageFiles[langCode])
storyData = json.loads(self.request.files['json'][0]['body'])
2019-01-24 13:27:04 +00:00
# print(json.dumps(storyData))
# self.finish()
# return
2019-01-23 21:38:27 +00:00
if 'audio' in self.request.files:
msgId = self.get_argument("message_id")
audioFile = self.request.files['audio'][0]
original_fname = audioFile['filename']
fname = ''.join(random.choice(string.ascii_lowercase + string.digits) for x in range(10))
ext = os.path.splitext(original_fname)[1]
2019-01-24 14:01:01 +00:00
audioFilename = os.path.join(central_command.config['web']['files_dir'], langCode, fname + ext)
2019-01-23 21:38:27 +00:00
for i, data in enumerate(storyData):
if data['@id'] != msgId:
continue
2019-01-24 14:01:01 +00:00
if 'audio' in storyData[i] and os.path.exists(storyData[i]['audio']['file']):
logger.info(f"Remove previous file {storyData[i]['audio']['file']} ({storyData[i]['audio']['original_name']})")
os.unlink(storyData[i]['audio']['file'])
2019-01-23 21:38:27 +00:00
storyData[i]['audio'] = {
'file': audioFilename,
'original_name': original_fname
}
2019-01-24 14:01:01 +00:00
with open(audioFilename, 'wb') as fp:
2019-01-23 21:38:27 +00:00
logger.info(f'Save {original_fname} to {audioFilename}')
2019-01-24 14:01:01 +00:00
fp.write(audioFile['body'])
2019-01-23 21:38:27 +00:00
break
2019-01-24 13:27:04 +00:00
print(os.path.abspath(langFile))
with open(langFile, 'w') as json_fp:
logger.info(f'Save story to {langFile} {json_fp}')
json.dump(storyData, json_fp, indent=2)
# Reload language files for new instances
central_command.loadLanguages()
2019-01-23 21:38:27 +00:00
self.finish()
2019-01-24 14:01:01 +00:00
2019-01-23 21:38:27 +00:00
return UploadHandler
2019-01-18 18:39:35 +00:00
2019-02-18 19:38:54 +00:00
def getVoiceHandler(voiceStorage):
class VoiceHandler(tornado.web.RequestHandler):
async def get(self):
# TODO: we should be using ZMQ here...
text = self.get_argument('text')
isVariable = True if int(self.get_argument('variable')) >0 else False
fn = await voiceStorage.requestFile(text, isVariable)
if not fn:
raise Exception(f"No Filename for text: {text}")
if int(self.get_argument('filename')) == 1:
self.set_header("Content-Type","text/plain")
self.write(fn)
else:
self.set_header("Content-Type","audio/wave")
with open(fn, 'rb') as fp:
self.write(fp.read())
self.finish()
return VoiceHandler
2019-02-18 19:38:54 +00:00
class Panopticon(object):
2019-01-18 18:39:35 +00:00
def __init__(self, central_command, config):
self.command = central_command
self.config = config
2019-02-18 19:38:54 +00:00
voice_dir = os.path.join(self.config['web']['files_dir'], 'voices')
self.voiceStorage = VoiceStorage(voice_dir, self.config['voice']['token'])
self.wsHandler = getWebSocketHandler(self.command)
2019-01-18 18:39:35 +00:00
self.application = tornado.web.Application([
(r"/ws(.*)", self.wsHandler),
2019-01-24 13:27:04 +00:00
(r"/local/(.*)", NonCachingStaticFileHandler,
2019-01-23 21:38:27 +00:00
{"path": config['web']['files_dir']}),
(r"/upload", getUploadHandler(self.command)),
2019-02-18 19:38:54 +00:00
(r"/voice", getVoiceHandler(self.voiceStorage)),
(r"/(.*)", tornado.web.StaticFileHandler,
2019-01-23 21:38:27 +00:00
{"path": web_dir, "default_filename": 'index.html'}),
2019-01-18 18:39:35 +00:00
], debug=True)
2019-01-18 18:39:35 +00:00
self.application.listen(config['web']['port'])
# self.loop.configure(evt_loop)
def start(self):
evt_loop = asyncio.new_event_loop()
asyncio.set_event_loop(evt_loop)
2019-01-18 18:39:35 +00:00
self.loop = tornado.ioloop.IOLoop.current()
thread = threading.Thread(
target=self.broadcastLoggingQueueToWs, kwargs={'wsHandler': self.wsHandler, 'q': self.command.logQueue}, name=f"panopticon/logws")
thread.start()
2019-01-23 14:26:44 +00:00
logger.info(f"Start Panopticon on http://localhost:{self.config['web']['port']}")
2019-01-18 18:39:35 +00:00
self.loop.start()
2019-01-18 18:39:35 +00:00
def stop(self):
self.loop.stop()
def broadcastLoggingQueueToWs(self, wsHandler, q: Queue):
while True:
record = q.get()
# record: logging.LogRecord
assert isinstance(record, logging.LogRecord)
hugvey_id = record.name.split('.')[1]
items = record.msg.split(':', 2)
msg = {'action': 'log', 'id':hugvey_id, 'type': items[0]}
if len(items) > 1:
msg['info'] = items[1]
if len(items) > 2:
msg['args'] = items[2]
j = json.dumps(msg)
print(j)
self.loop.add_callback(wsHandler.write_to_clients, j)