""" "Conscript reporting" This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server. """ import os import time import yaml import zmq from zmq.asyncio import Context import asyncio from hugvey.communication import getTopic, zmqSend, zmqReceive from hugvey.panopticon import Panopticon from hugvey.story import Story from hugvey.voice.google import GoogleVoiceClient from hugvey.voice.player import Player from hugvey.voice.streamer import AudioStreamer import json import logging import queue import threading logger = logging.getLogger("command") # def exceptionEmitter(a): # print(a) # def decorate(func): # print('decorate') # async def call(*args, **kwargs): # print('call') # # pre(func, *args, **kwargs) # try: # result = await func(*args, **kwargs) # except Exception as e: # logger.critical(e, "in", func) # raise e # # post(func, *args, **kwargs) # return result # return call # return decorate class CentralCommand(object): """docstring for CentralCommand.""" def __init__(self, debug_mode=False): self.debug = debug_mode self.eventQueue = asyncio.Queue() self.commandQueue = asyncio.Queue() self.isRunning = threading.Event() self.hugveys = {} self.ctx = Context.instance() self.hugveyLock = asyncio.Lock() self.start_time = time.time() self.languageFiles = {} def loadConfig(self, filename): if hasattr(self, 'config'): raise Exception("Overriding config not supported yet") with open(filename, 'r') as fp: logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])] self.loadLanguages() self.panopticon = Panopticon(self, self.config) def loadLanguages(self): logger.debug('load language files') self.languages = {} for lang in self.config['languages']: lang_filename = os.path.join(self.config['web']['files_dir'], lang['file']) self.languageFiles[lang['code']] = lang['file'] with open(lang_filename, 'r') as fp: self.languages[lang['code']] = json.load(fp) def getHugveyStatus(self, hv_id): status = {'id': hv_id} if not hv_id in self.hugveys: status['status'] = 'off' return status hv = self.hugveys[hv_id] if not hv.story: status['status'] = 'off' return status status['status'] = hv.getStatus() status['language'] = hv.language_code status['msg'] = hv.story.currentMessage.id if hv.story.currentMessage else None status['finished'] = hv.story.isFinished() status['history'] = hv.story.getLogSummary() status['counts'] = {t: len(a) for t, a in status['history'].items() if t != 'directions' } return status def getStatusSummary(self): status = { 'uptime': time.time() - self.start_time, 'languages': self.config['languages'], 'hugveys': [], } for hv_id in self.hugvey_ids: status['hugveys'].append(self.getHugveyStatus(hv_id)) return status def commandHugvey(self, hv_id, msg): """ prepare command to be picked up by the sender """ if threading.current_thread().getName() != 'MainThread': # Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse) # won't trigger asyncios queue.get() so we have to do this thread # safe, in the right loop self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg) else: self._queueCommand(hv_id, msg) def _queueCommand(self, hv_id, msg): self.commandQueue.put_nowait((hv_id, msg)) # if msg['action'] == 'play': # self.commandQueue.put_nowait((hv_id, { # 'action': 'play', # 'msg': "This is an interrption", # 'id': 'test', # })) def commandAllHugveys(self, msg): for hv_id in self.hugvey_ids: self.commandHugvey(hv_id, msg) def commandAllActiveHugveys(self, msg): for hv_id in self.hugveys: self.commandHugvey(hv_id, msg) async def commandSender(self): s = self.ctx.socket(zmq.PUB) s.bind(self.config['events']['cmd_address']) self.commandAllHugveys({'action': 'show_yourself'}) # sleep to allow pending connections to connect await asyncio.sleep(1) logger.info("Ready to publish commands on: {}".format( self.config['events']['cmd_address'])) logger.debug('Already {} items in queue'.format( self.commandQueue.qsize())) while self.isRunning.is_set(): hv_id, cmd = await self.commandQueue.get() logger.info('Got command to send: {} {}'.format(hv_id, cmd)) zmqSend(s, hv_id, cmd) logger.warn('Stopping command sender') s.close() async def instantiateHugvey(self, hugvey_id, msg): ''' Start a HugveyState, according to a show_yourself reply 'event': 'connection', 'id': self.hugvey_id, 'host': socket.gethostname(), 'ip': self.getIp(), ''' async with self.hugveyLock: # lock to prevent duplicates on creation if not hugvey_id in self.hugveys: logger.info(f'Instantiate hugvey #{hugvey_id}') h = HugveyState(hugvey_id, self) h.config(msg['host'], msg['ip']) self.hugveys[hugvey_id] = h thread = threading.Thread( target=h.start, name=f"hugvey#{hugvey_id}") thread.start() else: logger.info(f'Reconfigure hugvey #{hugvey_id}') # (re)configure exisitng hugveys h.config(msg['host'], msg['ip']) async def eventListener(self): s = self.ctx.socket(zmq.SUB) s.bind(self.config['events']['listen_address']) logger.info("Listen for events on: {}".format( self.config['events']['listen_address'])) for id in self.hugvey_ids: s.subscribe(getTopic(id)) while self.isRunning.is_set(): try: hugvey_id, msg = await zmqReceive(s) if hugvey_id not in self.hugvey_ids: logger.critical( "Message from alien Hugvey: {}".format(hugvey_id)) continue elif hugvey_id not in self.hugveys: if msg['event'] == 'connection': # Create a hugvey await self.instantiateHugvey(hugvey_id, msg) else: logger.warning( "Message from uninstantiated Hugvey {}".format(hugvey_id)) logger.debug("Message contains: {}".format(msg)) continue else: self.hugveys[hugvey_id].queueEvent(msg) # await self.hugveys[hugvey_id].eventQueue.put(msg) except Exception as e: logger.critical(f"Exception while running event loop:") logger.exception(e) def start(self): self.isRunning.set() self.loop = asyncio.get_event_loop() # self.panopticon_loop = asyncio.new_event_loop() self.tasks = {} # collect tasks so we can cancel in case of error self.tasks['eventListener'] = self.loop.create_task( self.eventListener()) self.tasks['commandSender'] = self.loop.create_task( self.commandSender()) # we want the web interface in a separate thread self.panopticon_thread = threading.Thread( target=self.panopticon.start, name="Panopticon") self.panopticon_thread.start() self.loop.run_forever() def stop(self): self.isRunning.clear() class HugveyState(object): """Represents the state of a Hugvey client on the server. Manages server connections & voice parsing etc. """ STATE_PAUSE = "paused" STATE_GONE = "gone" STATE_RUNNING = "running" def __init__(self, id: int, command: CentralCommand): self.id = id self.command = command self.logger = logging.getLogger(f"hugvey{self.id}") self.loop = asyncio.new_event_loop() self.isConfigured = False self.isRunning = asyncio.Event(loop=self.loop) self.eventQueue = None self.language_code = 'en-GB' self.story = None self.streamer = None self.status = self.STATE_PAUSE self.google = None self.notShuttingDown = True # TODO: allow shutdown of object def getStatus(self): if self.story.isFinished(): return "finished" return self.status def config(self, hostname, ip): self.ip = ip self.hostname = hostname self.logger.info( f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") if self.isConfigured == True: # a reconfiguration/reconnection pass self.isConfigured = True def sendCommand(self, msg): """ Send message or command to hugvey @param msg: The message to be sent. Probably a dict() """ self.command.commandHugvey(self.id, msg) def start(self): """ Start the tasks """ self.isRunning.set() self.status = self.STATE_RUNNING tasks = asyncio.gather( self.catchException(self.processAudio()), self.catchException(self.handleEvents()), self.catchException(self.playStory()), loop=self.loop) # self.pause() self.loop.run_until_complete(tasks) async def catchException(self, awaitable): try: await awaitable except Exception as e: logger.exception(e) logger.critical(f"Hugvey restart required but not implemented yet") # TODO: restart def queueEvent(self, msg): if 'time' not in msg: # add time, so we can track time passed msg['time'] = time.time() if not self.eventQueue: self.logger.critical("No event queue to put {}".format(msg)) else: # Allow for both the Hugvey Command, or the Story handle the event. self.loop.call_soon_threadsafe(self._queueEvent, msg) def _queueEvent(self, msg): self.logger.debug(f"Queue event in hugvey loop: {msg}") self.eventQueue.put_nowait(msg) self.story.events.append(msg) async def handleEvents(self): self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues while self.command.isRunning.is_set(): event = await self.eventQueue.get() self.logger.info("Received: {}".format(event)) if event['event'] == 'connection' and not self.isRunning.is_set(): self.restart() if event['event'] == 'language': self.setLanguage(event['code']) if event['event'] == 'pause': self.pause() if event['event'] == 'restart': self.restart() if event['event'] == 'resume': self.resume() if event['event'] == 'change_language': self.setLanguage(event['lang_code']) if event['event'] == 'play_msg': self.logger.info("DO PLAY :-)") if not self.story: self.logger.critical("No story to play message in") else: #restart first so that story loads the new json self.restart() # wait a tat for the restart loops to complete await asyncio.sleep(.1) msg = self.story.get(event['msg_id']) if not msg: self.logger.critical("Invalid ID to play: {}".format(event['msg_id'])) else: self.story.setCurrentMessage(msg) self.eventQueue = None def setLanguage(self, language_code): if language_code not in self.command.languages: raise Exception("Invalid language {}".format(language_code)) self.logger.info(f"set language: {language_code}") self.language_code = language_code self.google.setLanguage(language_code) self.restart() # self.story.reset() # self.story.setStoryData(self.command.languages[language_code]) def pause(self): self.logger.info('Pause') if self.google: self.google.pause() if self.story: self.story.pause() self.isRunning.clear() self.status = self.STATE_PAUSE def resume(self): """ Start playing without reset""" self.logger.info('Resume') if self.google: self.google.resume() if self.story: self.story.resume() self.isRunning.set() self.status = self.STATE_RUNNING def restart(self): """ Start playing with reset""" self.logger.info('Restart') if self.story: self.story.stop() self.resume() self.isRunning.set() def gone(self): '''Status to 'gone' as in, shutdown/crashed/whatever ''' self.pause() if self.story: self.story.stop() self.logger.info('Gone') self.status = self.STATE_GONE async def playStory(self): while self.notShuttingDown: await self.isRunning.wait() # new story instance on each run self.story = Story(self) self.story.setStoryData(self.command.languages[self.language_code]) await self.story.run() # self.story = None def getStreamer(self): if not self.streamer: self.streamer = AudioStreamer( self.command.config['voice']['chunk'], self.ip, int(self.command.config['voice']['port'])) if self.command.debug: self.logger.warn("Debug on: Connecting Audio player") self.player = Player( self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) self.streamer.addConsumer(self.player) self.logger.info("Start Speech") self.google = GoogleVoiceClient( hugvey=self, src_rate=self.command.config['voice']['src_rate'], credential_file=self.command.config['voice']['google_credentials'], language_code=self.language_code ) self.streamer.addConsumer(self.google) return self.streamer async def processAudio(self): ''' Start the audio streamer service ''' self.logger.info("Start audio stream") while self.notShuttingDown: await self.isRunning.wait() self.logger.info("Start audio stream") await self.getStreamer().run() self.logger.warn("stream has left the building") # if we end up here, the streamer finished, probably meaning hte hugvey shutdown self.gone()