471 lines
16 KiB
Python
471 lines
16 KiB
Python
"""
|
|
"Conscript reporting"
|
|
This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server.
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import yaml
|
|
import zmq
|
|
from zmq.asyncio import Context
|
|
|
|
import asyncio
|
|
from hugvey.communication import getTopic, zmqSend, zmqReceive
|
|
from hugvey.panopticon import Panopticon
|
|
from hugvey.story import Story
|
|
from hugvey.voice.google import GoogleVoiceClient
|
|
from hugvey.voice.player import Player
|
|
from hugvey.voice.streamer import AudioStreamer
|
|
import json
|
|
import logging
|
|
import queue
|
|
import threading
|
|
|
|
|
|
logger = logging.getLogger("command")
|
|
|
|
# def exceptionEmitter(a):
|
|
# print(a)
|
|
# def decorate(func):
|
|
# print('decorate')
|
|
# async def call(*args, **kwargs):
|
|
# print('call')
|
|
# # pre(func, *args, **kwargs)
|
|
# try:
|
|
# result = await func(*args, **kwargs)
|
|
# except Exception as e:
|
|
# logger.critical(e, "in", func)
|
|
# raise e
|
|
# # post(func, *args, **kwargs)
|
|
# return result
|
|
# return call
|
|
# return decorate
|
|
|
|
|
|
class CentralCommand(object):
|
|
"""docstring for CentralCommand."""
|
|
|
|
def __init__(self, debug_mode=False):
|
|
self.debug = debug_mode
|
|
self.eventQueue = asyncio.Queue()
|
|
self.commandQueue = asyncio.Queue()
|
|
self.isRunning = threading.Event()
|
|
self.hugveys = {}
|
|
self.ctx = Context.instance()
|
|
self.hugveyLock = asyncio.Lock()
|
|
self.start_time = time.time()
|
|
self.languageFiles = {}
|
|
|
|
def loadConfig(self, filename):
|
|
if hasattr(self, 'config'):
|
|
raise Exception("Overriding config not supported yet")
|
|
|
|
with open(filename, 'r') as fp:
|
|
logger.debug('Load config from {}'.format(filename))
|
|
self.config = yaml.safe_load(fp)
|
|
|
|
self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])]
|
|
|
|
self.loadLanguages()
|
|
|
|
self.panopticon = Panopticon(self, self.config)
|
|
|
|
def loadLanguages(self):
|
|
logger.debug('load language files')
|
|
self.languages = {}
|
|
|
|
for lang in self.config['languages']:
|
|
lang_filename = os.path.join(self.config['web']['files_dir'], lang['file'])
|
|
self.languageFiles[lang['code']] = lang['file']
|
|
with open(lang_filename, 'r') as fp:
|
|
self.languages[lang['code']] = json.load(fp)
|
|
|
|
|
|
def getHugveyStatus(self, hv_id):
|
|
status = {'id': hv_id}
|
|
if not hv_id in self.hugveys:
|
|
status['status'] = 'off'
|
|
return status
|
|
|
|
hv = self.hugveys[hv_id]
|
|
if not hv.story:
|
|
status['status'] = 'off'
|
|
return status
|
|
|
|
status['status'] = hv.getStatus()
|
|
status['language'] = hv.language_code
|
|
status['msg'] = hv.story.currentMessage.id if hv.story.currentMessage else None
|
|
status['finished'] = hv.story.isFinished()
|
|
status['history'] = hv.story.getLogSummary()
|
|
status['counts'] = {t: len(a) for t, a in status['history'].items() if t != 'directions' }
|
|
|
|
return status
|
|
|
|
def getStatusSummary(self):
|
|
status = {
|
|
'uptime': time.time() - self.start_time,
|
|
'languages': self.config['languages'],
|
|
'hugveys': [],
|
|
}
|
|
|
|
for hv_id in self.hugvey_ids:
|
|
status['hugveys'].append(self.getHugveyStatus(hv_id))
|
|
|
|
return status
|
|
|
|
def commandHugvey(self, hv_id, msg):
|
|
"""
|
|
prepare command to be picked up by the sender
|
|
"""
|
|
if threading.current_thread().getName() != 'MainThread':
|
|
# Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse)
|
|
# won't trigger asyncios queue.get() so we have to do this thread
|
|
# safe, in the right loop
|
|
self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg)
|
|
else:
|
|
self._queueCommand(hv_id, msg)
|
|
|
|
def _queueCommand(self, hv_id, msg):
|
|
self.commandQueue.put_nowait((hv_id, msg))
|
|
# if msg['action'] == 'play':
|
|
# self.commandQueue.put_nowait((hv_id, {
|
|
# 'action': 'play',
|
|
# 'msg': "This is an interrption",
|
|
# 'id': 'test',
|
|
# }))
|
|
|
|
def commandAllHugveys(self, msg):
|
|
for hv_id in self.hugvey_ids:
|
|
self.commandHugvey(hv_id, msg)
|
|
|
|
def commandAllActiveHugveys(self, msg):
|
|
for hv_id in self.hugveys:
|
|
self.commandHugvey(hv_id, msg)
|
|
|
|
async def commandSender(self):
|
|
s = self.ctx.socket(zmq.PUB)
|
|
s.bind(self.config['events']['cmd_address'])
|
|
|
|
self.commandAllHugveys({'action': 'show_yourself'})
|
|
|
|
# sleep to allow pending connections to connect
|
|
await asyncio.sleep(1)
|
|
logger.info("Ready to publish commands on: {}".format(
|
|
self.config['events']['cmd_address']))
|
|
logger.debug('Already {} items in queue'.format(
|
|
self.commandQueue.qsize()))
|
|
|
|
while self.isRunning.is_set():
|
|
hv_id, cmd = await self.commandQueue.get()
|
|
logger.info('Got command to send: {} {}'.format(hv_id, cmd))
|
|
zmqSend(s, hv_id, cmd)
|
|
|
|
logger.warn('Stopping command sender')
|
|
s.close()
|
|
|
|
async def instantiateHugvey(self, hugvey_id, msg):
|
|
'''
|
|
Start a HugveyState, according to a show_yourself reply
|
|
|
|
'event': 'connection',
|
|
'id': self.hugvey_id,
|
|
'host': socket.gethostname(),
|
|
'ip': self.getIp(),
|
|
'''
|
|
async with self.hugveyLock: # lock to prevent duplicates on creation
|
|
if not hugvey_id in self.hugveys:
|
|
logger.info(f'Instantiate hugvey #{hugvey_id}')
|
|
h = HugveyState(hugvey_id, self)
|
|
h.config(msg['host'], msg['ip'])
|
|
self.hugveys[hugvey_id] = h
|
|
thread = threading.Thread(
|
|
target=h.start, name=f"hugvey#{hugvey_id}")
|
|
thread.start()
|
|
else:
|
|
logger.info(f'Reconfigure hugvey #{hugvey_id}')
|
|
# (re)configure exisitng hugveys
|
|
h.config(msg['host'], msg['ip'])
|
|
|
|
async def eventListener(self):
|
|
s = self.ctx.socket(zmq.SUB)
|
|
s.bind(self.config['events']['listen_address'])
|
|
logger.info("Listen for events on: {}".format(
|
|
self.config['events']['listen_address']))
|
|
|
|
for id in self.hugvey_ids:
|
|
s.subscribe(getTopic(id))
|
|
|
|
while self.isRunning.is_set():
|
|
try:
|
|
hugvey_id, msg = await zmqReceive(s)
|
|
|
|
if hugvey_id not in self.hugvey_ids:
|
|
logger.critical(
|
|
"Message from alien Hugvey: {}".format(hugvey_id))
|
|
continue
|
|
elif hugvey_id not in self.hugveys:
|
|
if msg['event'] == 'connection':
|
|
# Create a hugvey
|
|
await self.instantiateHugvey(hugvey_id, msg)
|
|
else:
|
|
logger.warning(
|
|
"Message from uninstantiated Hugvey {}".format(hugvey_id))
|
|
logger.debug("Message contains: {}".format(msg))
|
|
continue
|
|
else:
|
|
self.hugveys[hugvey_id].queueEvent(msg)
|
|
# await self.hugveys[hugvey_id].eventQueue.put(msg)
|
|
except Exception as e:
|
|
logger.critical(f"Exception while running event loop:")
|
|
logger.exception(e)
|
|
|
|
|
|
def start(self):
|
|
self.isRunning.set()
|
|
self.loop = asyncio.get_event_loop()
|
|
# self.panopticon_loop = asyncio.new_event_loop()
|
|
|
|
self.tasks = {} # collect tasks so we can cancel in case of error
|
|
self.tasks['eventListener'] = self.loop.create_task(
|
|
self.eventListener())
|
|
self.tasks['commandSender'] = self.loop.create_task(
|
|
self.commandSender())
|
|
|
|
# we want the web interface in a separate thread
|
|
self.panopticon_thread = threading.Thread(
|
|
target=self.panopticon.start, name="Panopticon")
|
|
self.panopticon_thread.start()
|
|
|
|
self.loop.run_forever()
|
|
|
|
def stop(self):
|
|
self.isRunning.clear()
|
|
|
|
|
|
class HugveyState(object):
|
|
"""Represents the state of a Hugvey client on the server.
|
|
Manages server connections & voice parsing etc.
|
|
"""
|
|
|
|
STATE_PAUSE = "paused"
|
|
STATE_GONE = "gone"
|
|
STATE_RUNNING = "running"
|
|
|
|
def __init__(self, id: int, command: CentralCommand):
|
|
|
|
self.id = id
|
|
self.command = command
|
|
self.logger = logging.getLogger(f"hugvey{self.id}")
|
|
self.loop = asyncio.new_event_loop()
|
|
self.isConfigured = False
|
|
self.isRunning = asyncio.Event(loop=self.loop)
|
|
self.eventQueue = None
|
|
self.language_code = 'en-GB'
|
|
self.story = None
|
|
self.streamer = None
|
|
self.status = self.STATE_PAUSE
|
|
self.google = None
|
|
self.notShuttingDown = True # TODO: allow shutdown of object
|
|
|
|
def getStatus(self):
|
|
if self.story.isFinished():
|
|
return "finished"
|
|
return self.status
|
|
|
|
def config(self, hostname, ip):
|
|
self.ip = ip
|
|
self.hostname = hostname
|
|
self.logger.info(
|
|
f"Hugvey {self.id} at {self.ip}, host: {self.hostname}")
|
|
|
|
if self.isConfigured == True:
|
|
# a reconfiguration/reconnection
|
|
pass
|
|
|
|
self.isConfigured = True
|
|
|
|
def sendCommand(self, msg):
|
|
"""
|
|
Send message or command to hugvey
|
|
@param msg: The message to be sent. Probably a dict()
|
|
"""
|
|
self.command.commandHugvey(self.id, msg)
|
|
|
|
def start(self):
|
|
"""
|
|
Start the tasks
|
|
"""
|
|
self.isRunning.set()
|
|
self.status = self.STATE_RUNNING
|
|
|
|
tasks = asyncio.gather(
|
|
self.catchException(self.processAudio()),
|
|
self.catchException(self.handleEvents()),
|
|
self.catchException(self.playStory()),
|
|
loop=self.loop)
|
|
# self.pause()
|
|
self.loop.run_until_complete(tasks)
|
|
|
|
|
|
async def catchException(self, awaitable):
|
|
try:
|
|
await awaitable
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
logger.critical(f"Hugvey restart required but not implemented yet")
|
|
|
|
# TODO: restart
|
|
|
|
def queueEvent(self, msg):
|
|
if 'time' not in msg:
|
|
# add time, so we can track time passed
|
|
msg['time'] = time.time()
|
|
if not self.eventQueue:
|
|
self.logger.critical("No event queue to put {}".format(msg))
|
|
else:
|
|
# Allow for both the Hugvey Command, or the Story handle the event.
|
|
self.loop.call_soon_threadsafe(self._queueEvent, msg)
|
|
|
|
|
|
def _queueEvent(self, msg):
|
|
self.logger.debug(f"Queue event in hugvey loop: {msg}")
|
|
self.eventQueue.put_nowait(msg)
|
|
self.story.events.append(msg)
|
|
|
|
async def handleEvents(self):
|
|
self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues
|
|
while self.command.isRunning.is_set():
|
|
event = await self.eventQueue.get()
|
|
self.logger.info("Received: {}".format(event))
|
|
|
|
if event['event'] == 'connection' and not self.isRunning.is_set():
|
|
self.restart()
|
|
|
|
if event['event'] == 'language':
|
|
self.setLanguage(event['code'])
|
|
|
|
if event['event'] == 'pause':
|
|
self.pause()
|
|
if event['event'] == 'restart':
|
|
self.restart()
|
|
if event['event'] == 'resume':
|
|
self.resume()
|
|
|
|
if event['event'] == 'change_language':
|
|
self.setLanguage(event['lang_code'])
|
|
if event['event'] == 'play_msg':
|
|
self.logger.info("DO PLAY :-)")
|
|
if not self.story:
|
|
self.logger.critical("No story to play message in")
|
|
else:
|
|
#restart first so that story loads the new json
|
|
self.restart()
|
|
# wait a tat for the restart loops to complete
|
|
await asyncio.sleep(.1)
|
|
msg = self.story.get(event['msg_id'])
|
|
if not msg:
|
|
self.logger.critical("Invalid ID to play: {}".format(event['msg_id']))
|
|
else:
|
|
self.story.setCurrentMessage(msg)
|
|
|
|
self.eventQueue = None
|
|
|
|
def setLanguage(self, language_code):
|
|
if language_code not in self.command.languages:
|
|
raise Exception("Invalid language {}".format(language_code))
|
|
|
|
self.logger.info(f"set language: {language_code}")
|
|
self.language_code = language_code
|
|
self.google.setLanguage(language_code)
|
|
|
|
self.restart()
|
|
|
|
# self.story.reset()
|
|
# self.story.setStoryData(self.command.languages[language_code])
|
|
|
|
def pause(self):
|
|
self.logger.info('Pause')
|
|
if self.google:
|
|
self.google.pause()
|
|
if self.story:
|
|
self.story.pause()
|
|
self.isRunning.clear()
|
|
self.status = self.STATE_PAUSE
|
|
|
|
def resume(self):
|
|
""" Start playing without reset"""
|
|
self.logger.info('Resume')
|
|
if self.google:
|
|
self.google.resume()
|
|
if self.story:
|
|
self.story.resume()
|
|
self.isRunning.set()
|
|
self.status = self.STATE_RUNNING
|
|
|
|
def restart(self):
|
|
""" Start playing with reset"""
|
|
self.logger.info('Restart')
|
|
if self.story:
|
|
self.story.stop()
|
|
self.resume()
|
|
self.isRunning.set()
|
|
|
|
def gone(self):
|
|
'''Status to 'gone' as in, shutdown/crashed/whatever
|
|
'''
|
|
self.pause()
|
|
if self.story:
|
|
self.story.stop()
|
|
|
|
self.logger.info('Gone')
|
|
self.status = self.STATE_GONE
|
|
|
|
|
|
async def playStory(self):
|
|
while self.notShuttingDown:
|
|
await self.isRunning.wait()
|
|
|
|
# new story instance on each run
|
|
self.story = Story(self)
|
|
self.story.setStoryData(self.command.languages[self.language_code])
|
|
await self.story.run()
|
|
# self.story = None
|
|
|
|
def getStreamer(self):
|
|
if not self.streamer:
|
|
self.streamer = AudioStreamer(
|
|
self.command.config['voice']['chunk'],
|
|
self.ip,
|
|
int(self.command.config['voice']['port']))
|
|
|
|
if self.command.debug:
|
|
self.logger.warn("Debug on: Connecting Audio player")
|
|
self.player = Player(
|
|
self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
|
|
self.streamer.addConsumer(self.player)
|
|
|
|
self.logger.info("Start Speech")
|
|
self.google = GoogleVoiceClient(
|
|
hugvey=self,
|
|
src_rate=self.command.config['voice']['src_rate'],
|
|
credential_file=self.command.config['voice']['google_credentials'],
|
|
language_code=self.language_code
|
|
)
|
|
self.streamer.addConsumer(self.google)
|
|
return self.streamer
|
|
|
|
async def processAudio(self):
|
|
'''
|
|
Start the audio streamer service
|
|
'''
|
|
|
|
self.logger.info("Start audio stream")
|
|
|
|
while self.notShuttingDown:
|
|
await self.isRunning.wait()
|
|
|
|
self.logger.info("Start audio stream")
|
|
await self.getStreamer().run()
|
|
self.logger.warn("stream has left the building")
|
|
# if we end up here, the streamer finished, probably meaning hte hugvey shutdown
|
|
self.gone()
|