hugvey/hugvey/central_command.py

537 lines
19 KiB
Python
Raw Normal View History

"""
"Conscript reporting"
This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server.
"""
2019-01-24 14:27:04 +01:00
import os
2019-01-18 19:39:35 +01:00
import time
import yaml
import zmq
2019-01-17 20:28:55 +01:00
from zmq.asyncio import Context
2019-01-24 14:27:04 +01:00
import asyncio
from hugvey.communication import getTopic, zmqSend, zmqReceive
2019-01-18 19:39:35 +01:00
from hugvey.panopticon import Panopticon
from hugvey.story import Story
from hugvey.speech.google import GoogleVoiceClient
from hugvey.speech.player import Player
from hugvey.speech.streamer import AudioStreamer
2019-01-24 14:27:04 +01:00
import json
import logging
import queue
2019-01-24 14:27:04 +01:00
import threading
2019-02-18 20:38:54 +01:00
from hugvey.voice import VoiceStorage
import multiprocessing
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("command")
eventLogger = logging.getLogger("events")
eventLogger.setLevel(logging.DEBUG)
# def exceptionEmitter(a):
# print(a)
# def decorate(func):
# print('decorate')
# async def call(*args, **kwargs):
# print('call')
# # pre(func, *args, **kwargs)
# try:
# result = await func(*args, **kwargs)
# except Exception as e:
# logger.critical(e, "in", func)
# raise e
# # post(func, *args, **kwargs)
# return result
# return call
# return decorate
class CentralCommand(object):
"""docstring for CentralCommand."""
def __init__(self, args = {}, debug_mode=False):
self.debug = debug_mode
self.eventQueue = asyncio.Queue()
self.commandQueue = asyncio.Queue()
self.isRunning = threading.Event()
self.logQueue = multiprocessing.Queue()
self.hugveys = {}
self.ctx = Context.instance()
2019-01-17 20:28:55 +01:00
self.hugveyLock = asyncio.Lock()
self.start_time = time.time()
2019-01-23 22:38:27 +01:00
self.languageFiles = {}
self.args = args # cli args
eventLogger.addHandler(logging.handlers.QueueHandler(self.logQueue))
def loadConfig(self, filename):
2019-01-18 19:39:35 +01:00
if hasattr(self, 'config'):
raise Exception("Overriding config not supported yet")
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
for arg in vars(self.args):
if arg in self.config:
logger.debug("Override argument {}".format(arg))
self.config[arg] = getattr(self.args,arg)
self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])]
self.loadLanguages()
2019-02-18 20:38:54 +01:00
voice_dir = os.path.join(self.config['web']['files_dir'], 'voices')
self.voiceStorage = VoiceStorage(voice_dir, self.config['voice']['token'])
self.panopticon = Panopticon(self, self.config, self.voiceStorage)
def loadLanguages(self):
logger.debug('load language files')
2019-01-18 12:42:50 +01:00
self.languages = {}
2019-01-18 12:42:50 +01:00
for lang in self.config['languages']:
2019-01-23 15:26:44 +01:00
lang_filename = os.path.join(self.config['web']['files_dir'], lang['file'])
2019-01-23 22:38:27 +01:00
self.languageFiles[lang['code']] = lang['file']
2019-01-23 15:26:44 +01:00
with open(lang_filename, 'r') as fp:
2019-01-24 14:27:04 +01:00
self.languages[lang['code']] = json.load(fp)
def getHugveyStatus(self, hv_id, isSelected = False):
status = {'id': hv_id}
if not hv_id in self.hugveys:
status['status'] = 'off'
return status
hv = self.hugveys[hv_id]
2019-01-25 11:17:10 +01:00
if not hv.story:
status['status'] = 'off'
return status
status['status'] = hv.getStatus()
status['language'] = hv.language_code
status['msg'] = hv.story.currentMessage.id if hv.story.currentMessage else None
status['finished'] = hv.story.isFinished()
status['history'] = {} if isSelected is False else hv.story.getLogSummary()
# status['history'] = hv.story.getLogSummary() # disabled as it is a bit slow. We now have eventLog
status['counts'] = {t: len(a) for t, a in status['history'].items() if t != 'directions' }
return status
def getStatusSummary(self, selected_id = None):
status = {
'uptime': time.time() - self.start_time,
'languages': self.config['languages'],
'hugvey_ids': self.hugvey_ids,
'hugveys': [],
}
for hv_id in self.hugvey_ids:
status['hugveys'].append(self.getHugveyStatus(hv_id, selected_id == hv_id))
return status
def commandHugvey(self, hv_id, msg):
2019-01-18 12:42:50 +01:00
"""
prepare command to be picked up by the sender
"""
2019-02-18 20:38:54 +01:00
logging.debug(f"COmmand {hv_id}: {msg}")
if threading.current_thread().getName() != 'MainThread':
# Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse)
# won't trigger asyncios queue.get() so we have to do this thread
# safe, in the right loop
self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg)
else:
self._queueCommand(hv_id, msg)
def _queueCommand(self, hv_id, msg):
self.commandQueue.put_nowait((hv_id, msg))
def commandAllHugveys(self, msg):
for hv_id in self.hugvey_ids:
self.commandHugvey(hv_id, msg)
def commandAllActiveHugveys(self, msg):
for hv_id in self.hugveys:
self.commandHugvey(hv_id, msg)
async def commandSender(self):
s = self.ctx.socket(zmq.PUB)
s.bind(self.config['events']['cmd_address'])
self.commandAllHugveys({'action': 'show_yourself'})
# sleep to allow pending connections to connect
await asyncio.sleep(1)
logger.info("Ready to publish commands on: {}".format(
self.config['events']['cmd_address']))
logger.debug('Already {} items in queue'.format(
self.commandQueue.qsize()))
while self.isRunning.is_set():
hv_id, cmd = await self.commandQueue.get()
logger.debug('Got command to send: {} {}'.format(hv_id, cmd))
zmqSend(s, hv_id, cmd)
logger.warn('Stopping command sender')
s.close()
2019-01-17 20:28:55 +01:00
async def instantiateHugvey(self, hugvey_id, msg):
'''
Start a HugveyState, according to a show_yourself reply
'event': 'connection',
'id': self.hugvey_id,
'host': socket.gethostname(),
'ip': self.getIp(),
'''
async with self.hugveyLock: # lock to prevent duplicates on creation
2019-01-17 20:28:55 +01:00
if not hugvey_id in self.hugveys:
logger.info(f'Instantiate hugvey #{hugvey_id}')
h = HugveyState(hugvey_id, self)
h.config(msg['host'], msg['ip'])
2019-01-17 20:28:55 +01:00
self.hugveys[hugvey_id] = h
thread = threading.Thread(
target=h.start, name=f"hugvey#{hugvey_id}")
2019-01-17 20:28:55 +01:00
thread.start()
else:
logger.info(f'Reconfigure hugvey #{hugvey_id}')
# (re)configure exisitng hugveys
h.config(msg['host'], msg['ip'])
async def eventListener(self):
s = self.ctx.socket(zmq.SUB)
s.bind(self.config['events']['listen_address'])
logger.debug("Listen for events on: {}".format(
self.config['events']['listen_address']))
for id in self.hugvey_ids:
s.subscribe(getTopic(id))
while self.isRunning.is_set():
try:
hugvey_id, msg = await zmqReceive(s)
if hugvey_id not in self.hugvey_ids:
logger.critical(
"Message from alien Hugvey: {}".format(hugvey_id))
continue
elif hugvey_id not in self.hugveys:
if msg['event'] == 'connection':
# Create a hugvey
await self.instantiateHugvey(hugvey_id, msg)
else:
logger.warning(
"Message from uninstantiated Hugvey {}".format(hugvey_id))
logger.debug("Message contains: {}".format(msg))
continue
else:
self.hugveys[hugvey_id].queueEvent(msg)
# await self.hugveys[hugvey_id].eventQueue.put(msg)
except Exception as e:
logger.critical(f"Exception while running event loop:")
logger.exception(e)
async def voiceListener(self, hugvey_id):
s = self.ctx.socket(zmq.REP) #: :type s: zmq.sugar.Socket
voiceAddr = f"ipc://voice{hugvey_id}"
s.bind(voiceAddr)
logger.debug("Listen for voice requests on: {}".format(voiceAddr))
while self.isRunning.is_set():
try:
r = await s.recv_json()
isVariable = bool(r['variable'])
text = r['text']
fn = await self.voiceStorage.requestFile(text, isVariable)
if fn is None:
eventLogger.getChild(f"{hugvey_id}").critical("error: No voice file fetched, check logs.")
fn = 'local/crash.wav'
# TODO: trigger a repeat/crash event.
await s.send_string(fn)
except Exception as e:
logger.critical(f"Exception while running voice loop:")
logger.exception(e)
def start(self):
self.isRunning.set()
self.loop = asyncio.get_event_loop()
2019-01-18 19:39:35 +01:00
# self.panopticon_loop = asyncio.new_event_loop()
self.tasks = {} # collect tasks so we can cancel in case of error
self.tasks['eventListener'] = self.loop.create_task(
2019-02-18 20:38:54 +01:00
self.catchException(self.eventListener()))
self.tasks['commandSender'] = self.loop.create_task(
2019-02-18 20:38:54 +01:00
self.catchException(self.commandSender()))
for hid in self.hugvey_ids:
self.tasks['voiceListener'] = self.loop.create_task(
self.catchException(self.voiceListener(hid)))
2019-01-18 19:39:35 +01:00
# we want the web interface in a separate thread
self.panopticon_thread = threading.Thread(
target=self.panopticon.start, name="Panopticon")
2019-01-18 19:39:35 +01:00
self.panopticon_thread.start()
self.loop.run_forever()
def stop(self):
self.isRunning.clear()
2019-02-18 20:38:54 +01:00
async def catchException(self, awaitable):
try:
print(awaitable)
await awaitable
except Exception as e:
logger.exception(e)
logger.critical(f"Hugvey restart might be required but not implemented yet")
class HugveyState(object):
"""Represents the state of a Hugvey client on the server.
Manages server connections & voice parsing etc.
"""
STATE_PAUSE = "paused"
STATE_GONE = "gone"
STATE_RUNNING = "running"
def __init__(self, id: int, command: CentralCommand):
self.id = id
self.command = command
self.logger = mainLogger.getChild(f"{self.id}").getChild("command")
self.loop = asyncio.new_event_loop()
2019-01-17 20:28:55 +01:00
self.isConfigured = False
2019-01-25 11:17:10 +01:00
self.isRunning = asyncio.Event(loop=self.loop)
2019-01-18 12:42:50 +01:00
self.eventQueue = None
self.language_code = 'en-GB'
2019-01-25 11:17:10 +01:00
self.story = None
self.streamer = None
self.status = self.STATE_PAUSE
self.google = None
self.notShuttingDown = True # TODO: allow shutdown of object
2019-02-14 11:15:09 +01:00
self.startMsgId = None
self.eventLogger = eventLogger.getChild(f"{self.id}")
def getStatus(self):
if self.story.isFinished():
return "finished"
return self.status
def config(self, hostname, ip):
self.ip = ip
self.hostname = hostname
self.logger.info(
f"Hugvey {self.id} at {self.ip}, host: {self.hostname}")
2019-01-17 20:28:55 +01:00
if self.isConfigured == True:
# a reconfiguration/reconnection
pass
2019-01-17 20:28:55 +01:00
self.isConfigured = True
2019-01-18 12:42:50 +01:00
def sendCommand(self, msg):
"""
Send message or command to hugvey
@param msg: The message to be sent. Probably a dict()
"""
self.command.commandHugvey(self.id, msg)
def start(self):
2019-01-18 12:42:50 +01:00
"""
Start the tasks
"""
self.isRunning.set()
self.status = self.STATE_RUNNING
2019-01-18 12:42:50 +01:00
tasks = asyncio.gather(
self.catchException(self.processAudio()),
self.catchException(self.handleEvents()),
self.catchException(self.playStory()),
loop=self.loop)
# self.pause()
self.loop.run_until_complete(tasks)
2019-01-18 12:42:50 +01:00
async def catchException(self, awaitable):
try:
2019-02-18 20:38:54 +01:00
print(awaitable)
2019-01-18 12:42:50 +01:00
await awaitable
except Exception as e:
self.logger.exception(e)
self.logger.critical(f"Hugvey restart required but not implemented yet")
self.eventLogger.critical(f"error: {e}")
2019-01-18 12:42:50 +01:00
# TODO: restart
def queueEvent(self, msg):
if 'time' not in msg:
# add time, so we can track time passed
msg['time'] = time.time()
if not self.eventQueue:
self.logger.critical("No event queue to put {}".format(msg))
else:
# Allow for both the Hugvey Command, or the Story handle the event.
2019-01-25 13:11:00 +01:00
self.loop.call_soon_threadsafe(self._queueEvent, msg)
def _queueEvent(self, msg):
self.logger.debug(f"Queue event in hugvey loop: {msg}")
self.eventQueue.put_nowait(msg)
self.story.events.append(msg)
2019-01-18 12:42:50 +01:00
async def handleEvents(self):
self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues
2019-01-18 12:42:50 +01:00
while self.command.isRunning.is_set():
event = await self.eventQueue.get()
self.logger.debug("Received: {}".format(event))
2019-01-25 13:11:00 +01:00
if event['event'] == 'connection' and not self.isRunning.is_set():
self.restart()
if event['event'] == 'language':
2019-01-18 12:42:50 +01:00
self.setLanguage(event['code'])
if event['event'] == 'pause':
self.pause()
if event['event'] == 'restart':
self.restart()
if event['event'] == 'resume':
self.resume()
2019-01-25 11:59:03 +01:00
if event['event'] == 'change_language':
self.setLanguage(event['lang_code'])
2019-01-25 15:45:46 +01:00
if event['event'] == 'play_msg':
self.logger.info(f"Play given message {event['msg_id']}")
2019-01-25 15:45:46 +01:00
if not self.story:
self.logger.critical("No story to play message in")
else:
#restart first so that story loads the new json
# self.restart()
if self.story is None:
return
2019-02-14 11:15:09 +01:00
self.startMsgId = event['msg_id']
self.logger.debug(f"Restart from {self.startMsgId}")
self.restart()
# self.pause() # this doesn't reload story data!! Make sure we restart
# wait a tat for the restart loops to complete
2019-02-14 11:15:09 +01:00
# await asyncio.sleep(.1)
# self.logger.debug('restarted')
# msg = self.story.get(event['msg_id'])
# if not msg:
# self.logger.critical("Invalid ID to play: {}".format(event['msg_id']))
# else:
# self.story.setCurrentMessage(msg)
#
# self.resume()
2019-01-18 12:42:50 +01:00
self.eventQueue = None
2019-01-18 12:42:50 +01:00
def setLanguage(self, language_code):
if language_code not in self.command.languages:
raise Exception("Invalid language {}".format(language_code))
2019-01-25 11:59:03 +01:00
self.logger.info(f"set language: {language_code}")
2019-01-18 12:42:50 +01:00
self.language_code = language_code
self.google.setLanguage(language_code)
2019-01-25 11:59:03 +01:00
self.restart()
2019-01-25 11:59:03 +01:00
# self.story.reset()
# self.story.setStoryData(self.command.languages[language_code])
def pause(self):
self.logger.info('Pause')
if self.google:
self.google.pause()
2019-01-25 11:17:10 +01:00
if self.story:
self.story.pause()
self.isRunning.clear()
self.status = self.STATE_PAUSE
def resume(self):
2019-01-25 15:45:46 +01:00
""" Start playing without reset"""
self.logger.info('Resume')
if self.google:
self.google.resume()
2019-01-25 11:17:10 +01:00
if self.story:
self.story.resume()
self.isRunning.set()
self.status = self.STATE_RUNNING
def restart(self):
2019-01-25 15:45:46 +01:00
""" Start playing with reset"""
self.logger.info('Restart')
2019-01-25 11:17:10 +01:00
if self.story:
2019-01-25 11:59:03 +01:00
self.story.stop()
self.resume()
self.isRunning.set()
def gone(self):
'''Status to 'gone' as in, shutdown/crashed/whatever
'''
self.pause()
2019-01-25 11:17:10 +01:00
if self.story:
self.story.stop()
self.logger.info('Gone')
self.status = self.STATE_GONE
2019-01-18 12:42:50 +01:00
async def playStory(self):
2019-01-25 11:17:10 +01:00
while self.notShuttingDown:
await self.isRunning.wait()
# new story instance on each run
2019-02-18 20:38:54 +01:00
port = self.command.config['web']['port']
self.story = Story(self, port)
2019-02-14 11:15:09 +01:00
startMsgId = self.startMsgId
self.startMsgId = None # use only once, reset before 'run'
self.logger.warn(f"Starting from {startMsgId}")
2019-01-25 11:17:10 +01:00
self.story.setStoryData(self.command.languages[self.language_code])
2019-02-14 11:15:09 +01:00
await self.story.run(startMsgId)
2019-01-25 11:17:10 +01:00
# self.story = None
def getStreamer(self):
if not self.streamer:
self.streamer = AudioStreamer(
self.command.config['voice']['chunk'],
self.ip,
int(self.command.config['voice']['port']) + self.id)
if self.command.config['voyeur']:
self.logger.warn("Debug on: Connecting Audio player")
self.player = Player(
self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
self.streamer.addConsumer(self.player)
self.logger.debug("Start Speech")
self.google = GoogleVoiceClient(
hugvey=self,
src_rate=self.command.config['voice']['src_rate'],
credential_file=self.command.config['voice']['google_credentials'],
language_code=self.language_code
)
self.streamer.addConsumer(self.google)
return self.streamer
2019-01-18 12:42:50 +01:00
async def processAudio(self):
'''
Start the audio streamer service
'''
self.logger.debug("Start audio stream")
while self.notShuttingDown:
2019-01-25 13:11:00 +01:00
await self.isRunning.wait()
self.logger.debug("Start audio stream")
await self.getStreamer().run()
self.logger.critical(f"stream has left the building from {self.ip}")
self.eventLogger.critical(f"error: stream has left the building from {self.ip}")
# if we end up here, the streamer finished, probably meaning hte hugvey shutdown
self.gone()