""" "Conscript reporting" This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server. """ import asyncio import logging import threading import time import yaml import zmq from zmq.asyncio import Context from hugvey.communication import getTopic, zmqSend, zmqReceive from hugvey.panopticon import Panopticon from hugvey.story import Story from hugvey.voice.google import GoogleVoiceClient from hugvey.voice.player import Player from hugvey.voice.streamer import AudioStreamer import queue import os logger = logging.getLogger("command") # def exceptionEmitter(a): # print(a) # def decorate(func): # print('decorate') # async def call(*args, **kwargs): # print('call') # # pre(func, *args, **kwargs) # try: # result = await func(*args, **kwargs) # except Exception as e: # logger.critical(e, "in", func) # raise e # # post(func, *args, **kwargs) # return result # return call # return decorate class CentralCommand(object): """docstring for CentralCommand.""" def __init__(self, debug_mode=False): self.debug = debug_mode self.eventQueue = asyncio.Queue() self.commandQueue = asyncio.Queue() self.isRunning = threading.Event() self.hugveys = {} self.ctx = Context.instance() self.hugveyLock = asyncio.Lock() self.start_time = time.time() def loadConfig(self, filename): if hasattr(self, 'config'): raise Exception("Overriding config not supported yet") with open(filename, 'r') as fp: logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])] # load languages: self.languages = {} for lang in self.config['languages']: lang_filename = os.path.join(self.config['web']['files_dir'], lang['file']) with open(lang_filename, 'r') as fp: self.languages[lang['code']] = yaml.load(fp) self.panopticon = Panopticon(self, self.config) def getHugveyStatus(self, hv_id): status = {'id': hv_id} if not hv_id in self.hugveys: status['status'] = 'off' return status hv = self.hugveys[hv_id] status['status'] = 'running' if hv.isRunning.is_set() else 'paused' status['language'] = hv.language_code status['msg'] = hv.story.currentMessage.id status['counts'] = hv.story.getStoryCounts() status['finished'] = hv.story.isFinished() return status def getStatusSummary(self): status = { 'uptime': time.time() - self.start_time, 'languages': self.config['languages'], 'hugveys': [], } for hv_id in self.hugvey_ids: status['hugveys'].append(self.getHugveyStatus(hv_id)) return status def commandHugvey(self, hv_id, msg): """ prepare command to be picked up by the sender """ if threading.current_thread().getName() != 'MainThread': # Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse) # won't trigger asyncios queue.get() so we have to do this thread # safe, in the right loop self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg) else: self._queueCommand(hv_id, msg) def _queueCommand(self, hv_id, msg): self.commandQueue.put_nowait((hv_id, msg)) # if msg['action'] == 'play': # self.commandQueue.put_nowait((hv_id, { # 'action': 'play', # 'msg': "This is an interrption", # 'id': 'test', # })) def commandAllHugveys(self, msg): for hv_id in self.hugvey_ids: self.commandHugvey(hv_id, msg) def commandAllActiveHugveys(self, msg): for hv_id in self.hugveys: self.commandHugvey(hv_id, msg) async def commandSender(self): s = self.ctx.socket(zmq.PUB) s.bind(self.config['events']['cmd_address']) self.commandAllHugveys({'action': 'show_yourself'}) # sleep to allow pending connections to connect await asyncio.sleep(1) logger.info("Ready to publish commands on: {}".format( self.config['events']['cmd_address'])) logger.debug('Already {} items in queue'.format( self.commandQueue.qsize())) while self.isRunning.is_set(): hv_id, cmd = await self.commandQueue.get() logger.info('Got command to send: {} {}'.format(hv_id, cmd)) zmqSend(s, hv_id, cmd) logger.warn('Stopping command sender') s.close() async def instantiateHugvey(self, hugvey_id, msg): ''' Start a HugveyState, according to a show_yourself reply 'event': 'connection', 'id': self.hugvey_id, 'host': socket.gethostname(), 'ip': self.getIp(), ''' async with self.hugveyLock: # lock to prevent duplicates on creation if not hugvey_id in self.hugveys: logger.info(f'Instantiate hugvey #{hugvey_id}') print('a') h = HugveyState(hugvey_id, self) print('a') h.config(msg['host'], msg['ip']) print('b') self.hugveys[hugvey_id] = h thread = threading.Thread( target=h.start, name=f"hugvey#{hugvey_id}") thread.start() print('c') else: logger.info(f'Reconfigure hugvey #{hugvey_id}') # (re)configure exisitng hugveys h.config(msg['host'], msg['ip']) async def eventListener(self): s = self.ctx.socket(zmq.SUB) s.bind(self.config['events']['listen_address']) logger.info("Listen for events on: {}".format( self.config['events']['listen_address'])) for id in self.hugvey_ids: s.subscribe(getTopic(id)) while self.isRunning.is_set(): try: hugvey_id, msg = await zmqReceive(s) if hugvey_id not in self.hugvey_ids: logger.critical( "Message from alien Hugvey: {}".format(hugvey_id)) continue elif hugvey_id not in self.hugveys: if msg['event'] == 'connection': # Create a hugvey await self.instantiateHugvey(hugvey_id, msg) else: logger.warning( "Message from uninstantiated Hugvey {}".format(hugvey_id)) logger.debug("Message contains: {}".format(msg)) continue else: await self.hugveys[hugvey_id].eventQueue.put(msg) except Exception as e: logger.critical(f"Exception while running event loop:") logger.exception(e) def start(self): self.isRunning.set() self.loop = asyncio.get_event_loop() # self.panopticon_loop = asyncio.new_event_loop() self.tasks = {} # collect tasks so we can cancel in case of error self.tasks['eventListener'] = self.loop.create_task( self.eventListener()) self.tasks['commandSender'] = self.loop.create_task( self.commandSender()) # we want the web interface in a separate thread self.panopticon_thread = threading.Thread( target=self.panopticon.start, name="Panopticon") self.panopticon_thread.start() self.loop.run_forever() def stop(self): self.isRunning.clear() class HugveyState(object): """Represents the state of a Hugvey client on the server. Manages server connections & voice parsing etc. """ def __init__(self, id: int, command: CentralCommand): self.id = id self.command = command self.logger = logging.getLogger(f"hugvey{self.id}") self.loop = asyncio.new_event_loop() self.isConfigured = False self.isRunning = threading.Event() self.eventQueue = None self.language_code = 'en-GB' self.story = Story(self) self.story.setStoryData(self.command.languages[self.language_code]) def config(self, hostname, ip): self.ip = ip self.hostname = hostname self.logger.info( f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") if self.isConfigured == True: # a reconfiguration/reconnection pass self.isConfigured = True def sendCommand(self, msg): """ Send message or command to hugvey @param msg: The message to be sent. Probably a dict() """ self.command.commandHugvey(self.id, msg) def start(self): """ Start the tasks """ tasks = asyncio.gather( self.catchException(self.processAudio()), self.catchException(self.handleEvents()), self.catchException(self.playStory()), loop=self.loop) self.loop.run_until_complete(tasks) self.isRunning.set() async def catchException(self, awaitable): try: await awaitable except Exception as e: logger.exception(e) logger.critical(f"Hugvey restart required but not implemented yet") # TODO: restart def queueEvent(self, msg): if 'time' not in msg: # add time, so we can track time passed msg['time'] = time.time() if not self.eventQueue: self.logger.critical("No event queue to put {}".format(msg)) else: # Allow for both the Hugvey Command, or the Story handle the event. self.eventQueue.put_nowait(msg) self.story.events.append(msg) async def handleEvents(self): self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues while self.command.isRunning.is_set(): event = await self.eventQueue.get() self.logger.info("Received: {}".format(event)) if event['event'] == 'language': self.setLanguage(event['code']) if event['event'] == 'pause': self.pause() if event['event'] == 'restart': self.restart() if event['event'] == 'resume': self.resume() self.eventQueue = None def setLanguage(self, language_code): if language_code not in self.command.languages: raise Exception("Invalid language {}".format(language_code)) self.language_code = language_code self.google.setLanguage(language_code) self.story.reset() self.story.setStoryData(self.command.languages[language_code]) def pause(self): self.google.pause() self.story.pause() self.isRunning.clear() def resume(self): self.google.resume() self.story.resume() self.isRunning.set() def restart(self): self.story.reset() self.resume() self.isRunning.set() async def playStory(self): await self.story.start() async def processAudio(self): ''' Start the audio streamer service ''' self.logger.info("Start audio stream") streamer = AudioStreamer( self.command.config['voice']['chunk'], self.ip, int(self.command.config['voice']['port'])) if self.command.debug: self.logger.warn("Debug on: Connecting Audio player") self.player = Player( self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) streamer.addConsumer(self.player) self.logger.info("Start Speech") self.google = GoogleVoiceClient( hugvey=self, src_rate=self.command.config['voice']['src_rate'], credential_file=self.command.config['voice']['google_credentials'], language_code=self.language_code ) streamer.addConsumer(self.google) await streamer.run()