""" "Conscript reporting" This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server. """ import os import time import yaml import zmq from zmq.asyncio import Context import asyncio from hugvey.communication import getTopic, zmqSend, zmqReceive from hugvey.panopticon import Panopticon from hugvey.story import Story from hugvey.speech.google import GoogleVoiceClient from hugvey.speech.player import Player from hugvey.speech.streamer import AudioStreamer import json import logging import queue import threading from hugvey.voice import VoiceStorage import multiprocessing from hugvey.speech.recorder import Recorder from pythonosc import udp_client mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("command") eventLogger = logging.getLogger("events") eventLogger.setLevel(logging.DEBUG) # def exceptionEmitter(a): # print(a) # def decorate(func): # print('decorate') # async def call(*args, **kwargs): # print('call') # # pre(func, *args, **kwargs) # try: # result = await func(*args, **kwargs) # except Exception as e: # logger.critical(e, "in", func) # raise e # # post(func, *args, **kwargs) # return result # return call # return decorate class CentralCommand(object): """docstring for CentralCommand.""" def __init__(self, args = {}, debug_mode=False): self.debug = debug_mode self.eventQueue = asyncio.Queue() self.commandQueue = asyncio.Queue() self.isRunning = threading.Event() self.logQueue = multiprocessing.Queue() self.hugveys = {} self.ctx = Context.instance() self.hugveyLock = asyncio.Lock() self.start_time = time.time() self.languageFiles = {} self.languageConfig = {} self.args = args # cli args eventLogger.addHandler(logging.handlers.QueueHandler(self.logQueue)) def loadConfig(self, filename): if hasattr(self, 'config'): raise Exception("Overriding config not supported yet") with open(filename, 'r') as fp: logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) for arg in vars(self.args): if arg in self.config: logger.debug("Override argument {}".format(arg)) self.config[arg] = getattr(self.args,arg) self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])] self.loadLanguages() voice_dir = os.path.join(self.config['web']['files_dir'], 'voices') self.voiceStorage = VoiceStorage(voice_dir, self.languageConfig) self.panopticon = Panopticon(self, self.config, self.voiceStorage) self.lightConn = udp_client.SimpleUDPClient( self.config['light']['ip'], self.config['light']['port']) # logger.info("Send light /general 1") # self.lightConn.send_message("/general", [1]) def loadLanguages(self): logger.debug('load language files') self.languages = {} for lang in self.config['languages']: lang_filename = os.path.join(self.config['web']['files_dir'], lang['file']) self.languageFiles[lang['code']] = lang['file'] self.languageConfig[lang['code']] = lang with open(lang_filename, 'r') as fp: self.languages[lang['code']] = json.load(fp) def getHugveyStatus(self, hv_id, isSelected = False): status = {'id': hv_id} if not hv_id in self.hugveys: status['status'] = 'off' return status hv = self.hugveys[hv_id] if not hv.story: status['status'] = 'off' return status status['status'] = hv.getStatus() status['language'] = hv.language_code status['msg'] = hv.story.currentMessage.id if hv.story.currentMessage else None status['finished'] = hv.story.isFinished() status['history'] = {} if isSelected is False else hv.story.getLogSummary() # status['history'] = hv.story.getLogSummary() # disabled as it is a bit slow. We now have eventLog # status['counts'] = {t: len(a) for t, a in status['history'].items() if t != 'directions' } status['counts'] = hv.story.getLogCounts() status['duration'] = hv.story.timer.getElapsed() return status def getStatusSummary(self, selected_id = None): status = { 'uptime': "-" if not self.start_time else (time.time() - self.start_time), 'languages': self.config['languages'], 'hugvey_ids': self.hugvey_ids, 'hugveys': [], } for hv_id in self.hugvey_ids: status['hugveys'].append(self.getHugveyStatus(hv_id, selected_id == hv_id)) return status def commandHugvey(self, hv_id, msg): """ prepare command to be picked up by the sender """ logging.debug(f"COmmand {hv_id}: {msg}") if threading.current_thread().getName() != 'MainThread': # Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse) # won't trigger asyncios queue.get() so we have to do this thread # safe, in the right loop self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg) else: self._queueCommand(hv_id, msg) def _queueCommand(self, hv_id, msg): self.commandQueue.put_nowait((hv_id, msg)) def commandAllHugveys(self, msg): for hv_id in self.hugvey_ids: self.commandHugvey(hv_id, msg) def commandAllActiveHugveys(self, msg): for hv_id in self.hugveys: self.commandHugvey(hv_id, msg) async def commandSender(self): s = self.ctx.socket(zmq.PUB) s.bind(self.config['events']['cmd_address']) self.commandAllHugveys({'action': 'show_yourself'}) # sleep to allow pending connections to connect await asyncio.sleep(1) logger.info("Ready to publish commands on: {}".format( self.config['events']['cmd_address'])) logger.debug('Already {} items in queue'.format( self.commandQueue.qsize())) while self.isRunning.is_set(): hv_id, cmd = await self.commandQueue.get() logger.debug('Got command to send: {} {}'.format(hv_id, cmd)) zmqSend(s, hv_id, cmd) logger.warn('Stopping command sender') s.close() async def instantiateHugvey(self, hugvey_id, msg): ''' Start a HugveyState, according to a show_yourself reply 'event': 'connection', 'id': self.hugvey_id, 'host': socket.gethostname(), 'ip': self.getIp(), ''' async with self.hugveyLock: # lock to prevent duplicates on creation if not hugvey_id in self.hugveys: logger.info(f'Instantiate hugvey #{hugvey_id}') h = HugveyState(hugvey_id, self) h.config(msg['host'], msg['ip']) self.hugveys[hugvey_id] = h thread = threading.Thread( target=h.start, name=f"hugvey#{hugvey_id}") thread.start() else: logger.info(f'Reconfigure hugvey #{hugvey_id}') # (re)configure exisitng hugveys h.config(msg['host'], msg['ip']) async def timerEmitter(self): """ This is fixed: a one hour loop with a collective moment 10-15 minutes, 30-35 minutes and 50-55 minutes """ loop_duration = 60 * 60 # one hour loop intervals = [ { 'start_time': 10*60, 'duration': 5 * 60, }, { 'start_time': 30*60, 'duration': 5 * 60, }, { 'start_time': 50*60, 'duration': 5 * 60, } ] self.start_time = time.time() # TODO: emit start event while self.isRunning.is_set(): pass async def eventListener(self): s = self.ctx.socket(zmq.SUB) s.bind(self.config['events']['listen_address']) logger.debug("Listen for events on: {}".format( self.config['events']['listen_address'])) for id in self.hugvey_ids: s.subscribe(getTopic(id)) while self.isRunning.is_set(): try: hugvey_id, msg = await zmqReceive(s) if hugvey_id not in self.hugvey_ids: logger.critical( "Message from alien Hugvey: {}".format(hugvey_id)) continue elif hugvey_id not in self.hugveys: if msg['event'] == 'connection': # Create a hugvey await self.instantiateHugvey(hugvey_id, msg) else: logger.warning( "Message from uninstantiated Hugvey {}".format(hugvey_id)) logger.debug("Message contains: {}".format(msg)) continue else: self.hugveys[hugvey_id].queueEvent(msg) # await self.hugveys[hugvey_id].eventQueue.put(msg) except Exception as e: logger.critical(f"Exception while running event loop:") logger.exception(e) async def voiceListener(self, hugvey_id): s = self.ctx.socket(zmq.REP) #: :type s: zmq.sugar.Socket voiceAddr = f"ipc://voice{hugvey_id}" s.bind(voiceAddr) logger.debug("Listen for voice requests on: {}".format(voiceAddr)) while self.isRunning.is_set(): try: r = await s.recv_json() isVariable = bool(r['variable']) text = r['text'] hv = self.hugveys[hugvey_id] #: :type hv: HugveyState fn = await self.voiceStorage.requestFile(hv.language_code, text, isVariable) if fn is None: eventLogger.getChild(f"{hugvey_id}").critical("error: No voice file fetched, check logs.") fn = 'local/crash.wav' # TODO: trigger a repeat/crash event. await s.send_string(fn) except Exception as e: logger.critical(f"Exception while running voice loop:") logger.exception(e) def start(self): self.isRunning.set() self.loop = asyncio.get_event_loop() # self.panopticon_loop = asyncio.new_event_loop() self.tasks = {} # collect tasks so we can cancel in case of error self.tasks['eventListener'] = self.loop.create_task( self.catchException(self.eventListener())) self.tasks['commandSender'] = self.loop.create_task( self.catchException(self.commandSender())) for hid in self.hugvey_ids: self.tasks['voiceListener'] = self.loop.create_task( self.catchException(self.voiceListener(hid))) # we want the web interface in a separate thread self.panopticon_thread = threading.Thread( target=self.panopticon.start, name="Panopticon") self.panopticon_thread.start() self.loop.run_forever() def stop(self): self.isRunning.clear() async def catchException(self, awaitable): try: print(awaitable) await awaitable except Exception as e: logger.exception(e) logger.critical(f"Hugvey restart might be required but not implemented yet") class HugveyState(object): """Represents the state of a Hugvey client on the server. Manages server connections & voice parsing etc. """ STATE_PAUSE = "paused" STATE_GONE = "gone" STATE_RUNNING = "running" STATE_FINISHED = "finished" def __init__(self, id: int, command: CentralCommand): self.id = id self.command = command self.logger = mainLogger.getChild(f"{self.id}").getChild("command") self.loop = asyncio.new_event_loop() self.isConfigured = False self.isRunning = asyncio.Event(loop=self.loop) self.eventQueue = None self.language_code = 'en-GB' self.story = None self.streamer = None self.status = self.STATE_PAUSE self.google = None self.recorder = None self.notShuttingDown = True # TODO: allow shutdown of object self.startMsgId = None self.eventLogger = eventLogger.getChild(f"{self.id}") self.lightConn = udp_client.SimpleUDPClient( command.config['light']['ip'], command.config['light']['port']) def getStatus(self): if self.story.isFinished(): return self.STATE_FINISHED return self.status def config(self, hostname, ip): self.ip = ip self.hostname = hostname self.logger.info( f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") if self.isConfigured == True: # a reconfiguration/reconnection pass self.isConfigured = True def sendCommand(self, msg): """ Send message or command to hugvey @param msg: The message to be sent. Probably a dict() """ self.command.commandHugvey(self.id, msg) def start(self): """ Start the tasks """ self.isRunning.set() self.status = self.STATE_RUNNING tasks = asyncio.gather( self.catchException(self.processAudio()), self.catchException(self.handleEvents()), self.catchException(self.playStory()), loop=self.loop) # self.pause() self.loop.run_until_complete(tasks) async def catchException(self, awaitable): try: # print(awaitable) await awaitable except Exception as e: self.logger.exception(e) self.logger.critical(f"Hugvey restart required but not implemented yet") self.eventLogger.critical(f"error: {e}") # TODO: restart def queueEvent(self, msg): if 'time' not in msg: # add time, so we can track time passed msg['time'] = time.time() if not self.eventQueue: self.logger.critical("No event queue to put {}".format(msg)) else: # Allow for both the Hugvey Command, or the Story handle the event. self.loop.call_soon_threadsafe(self._queueEvent, msg) def _queueEvent(self, msg): self.logger.debug(f"Queue event in hugvey loop: {msg}") self.eventQueue.put_nowait(msg) self.story.events.append(msg) async def handleEvents(self): self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues while self.command.isRunning.is_set(): event = await self.eventQueue.get() self.logger.debug("Received: {}".format(event)) if event['event'] == 'connection' and not self.isRunning.is_set(): self.restart() if event['event'] == 'language': self.setLanguage(event['code']) if event['event'] == 'pause': self.pause() if event['event'] == 'restart': self.restart() if event['event'] == 'finish': self.story.finish() if event['event'] == 'resume': self.resume() if event['event'] == 'change_language': self.setLanguage(event['lang_code']) if event['event'] == 'play_msg': self.logger.info(f"Play given message {event['msg_id']}") if not self.story: self.logger.critical("No story to play message in") else: #restart first so that story loads the new json # self.restart() if self.story is None: return self.startMsgId = event['msg_id'] self.logger.debug(f"Restart from {self.startMsgId}") self.restart() # self.pause() # this doesn't reload story data!! Make sure we restart # wait a tat for the restart loops to complete # await asyncio.sleep(.1) # self.logger.debug('restarted') # msg = self.story.get(event['msg_id']) # if not msg: # self.logger.critical("Invalid ID to play: {}".format(event['msg_id'])) # else: # self.story.setCurrentMessage(msg) # # self.resume() self.eventQueue = None def setLanguage(self, language_code): if language_code not in self.command.languages: raise Exception("Invalid language {}".format(language_code)) self.logger.info(f"set language: {language_code}") self.language_code = language_code self.google.setLanguage(language_code) self.restart() # self.story.reset() # self.story.setStoryData(self.command.languages[language_code]) def pause(self): self.logger.info('Pause') if self.google: self.google.pause() if self.story: self.story.pause() self.isRunning.clear() self.status = self.STATE_PAUSE def resume(self): """ Start playing without reset""" self.logger.info('Resume') if self.google: self.google.resume() if self.story: self.story.resume() self.isRunning.set() self.status = self.STATE_RUNNING def restart(self): """ Start playing with reset""" self.logger.info('Restart') if self.story: self.story.stop() self.resume() self.isRunning.set() def finish(self): """Finish playback""" self.logger.info('Finish') self.pause() self.isRunning.clear() self.status = self.STATE_FINISHED self.setLightStatus(True) def setLightStatus(self, on): status = 1 if on else 0 self.logger.info(f"Send /hugvey {status}") self.lightConn.send_message("/hugvey", [self.id, status]) def gone(self): '''Status to 'gone' as in, shutdown/crashed/whatever ''' self.pause() if self.story: self.story.stop() self.logger.info('Gone') self.status = self.STATE_GONE async def playStory(self): while self.notShuttingDown: await self.isRunning.wait() # new story instance on each run port = self.command.config['web']['port'] self.story = Story(self, port) startMsgId = self.startMsgId self.startMsgId = None # use only once, reset before 'run' self.logger.warn(f"Starting from {startMsgId}") if not self.streamer: await asyncio.sleep(1) self.streamer.triggerStart() self.story.setStoryData(self.command.languages[self.language_code]) self.setLightStatus(False) await self.story.run(startMsgId) # self.story = None def getStreamer(self): if not self.streamer: self.streamer = AudioStreamer( self.command.config['voice']['chunk'], self.ip, int(self.command.config['voice']['port']) + self.id, self.id) if self.command.config['voyeur']: self.logger.warn("Debug on: Connecting Audio player") self.player = Player( self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) self.streamer.addConsumer(self.player) if self.command.config['voice']['record_dir']: self.logger.warn("Record Audio of conversation") self.recorder = Recorder( self.id, self.command.config['voice']['src_rate'], self.command.config['voice']['record_dir']) self.streamer.addConsumer(self.recorder) self.logger.debug("Start Speech") self.google = GoogleVoiceClient( hugvey=self, src_rate=self.command.config['voice']['src_rate'], credential_file=self.command.config['voice']['google_credentials'], language_code=self.language_code ) self.streamer.addConsumer(self.google) return self.streamer async def processAudio(self): ''' Start the audio streamer service ''' self.logger.debug("Start audio stream") while self.notShuttingDown: await self.isRunning.wait() self.logger.debug("Start audio stream") await self.getStreamer().run() self.logger.critical(f"stream has left the building from {self.ip}") self.eventLogger.critical(f"error: stream has left the building from {self.ip}") # if we end up here, the streamer finished, probably meaning hte hugvey shutdown self.gone()