diff --git a/hugvey/central_command.py b/hugvey/central_command.py index 65d2da3..63a151b 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -10,7 +10,7 @@ import zmq from zmq.asyncio import Context import asyncio -from hugvey.communication import getTopic, zmqSend, zmqReceive +from hugvey.communication import getTopic, zmqSend, zmqReceive, LOG_BS from hugvey.panopticon import Panopticon from hugvey.story import Story from hugvey.speech.google import GoogleVoiceClient @@ -57,6 +57,7 @@ class CentralCommand(object): self.debug = debug_mode self.eventQueue = asyncio.Queue() self.commandQueue = asyncio.Queue() + self.lightQueue = asyncio.Queue() self.isRunning = threading.Event() self.logQueue = multiprocessing.Queue() self.hugveys = {} @@ -91,13 +92,6 @@ class CentralCommand(object): self.panopticon = Panopticon(self, self.config, self.voiceStorage) - self.lightConn = udp_client.SimpleUDPClient( - self.config['light']['ip'], - self.config['light']['port']) - -# logger.info("Send light /general 1") -# self.lightConn.send_message("/general", [1]) - def loadLanguages(self): logger.debug('load language files') @@ -118,19 +112,19 @@ class CentralCommand(object): return status hv = self.hugveys[hv_id] - if not hv.story: - status['status'] = 'off' - return status +# if not hv.story: +# status['status'] = 'off' +# return status status['status'] = hv.getStatus() status['language'] = hv.language_code - status['msg'] = hv.story.currentMessage.id if hv.story.currentMessage else None - status['finished'] = hv.story.isFinished() - status['history'] = {} if isSelected is False else hv.story.getLogSummary() + status['msg'] = hv.story.currentMessage.id if hv.story and hv.story.currentMessage else None +# status['finished'] = hv.story.isFinished() + status['history'] = {} if isSelected is False or not hv.story else hv.story.getLogSummary() # status['history'] = hv.story.getLogSummary() # disabled as it is a bit slow. We now have eventLog # status['counts'] = {t: len(a) for t, a in status['history'].items() if t != 'directions' } - status['counts'] = hv.story.getLogCounts() - status['duration'] = hv.story.timer.getElapsed() + status['counts'] = {} if not hv.story else hv.story.getLogCounts() + status['duration'] = 0 if not hv.story else hv.story.timer.getElapsed() return status @@ -141,6 +135,10 @@ class CentralCommand(object): 'hugvey_ids': self.hugvey_ids, 'hugveys': [], } + + #use this to test if any threads stay open + # eg. after killing/dying of a hugvey +# print(threading.enumerate()) for hv_id in self.hugvey_ids: status['hugveys'].append(self.getHugveyStatus(hv_id, selected_id == hv_id)) @@ -162,6 +160,23 @@ class CentralCommand(object): def _queueCommand(self, hv_id, msg): self.commandQueue.put_nowait((hv_id, msg)) + + + def commandLight(self, route, data): + """ + Buffer light commands + """ + logging.debug(f"Light: {route} {data}") + if threading.current_thread().getName() != 'MainThread': + # Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse) + # won't trigger asyncios queue.get() so we have to do this thread + # safe, in the right loop + self.loop.call_soon_threadsafe(self._queueLightCommand, route, data) + else: + self._queueLightCommand(route, data) + + def _queueLightCommand(self, route, data): + self.lightQueue.put_nowait((route, data)) def commandAllHugveys(self, msg): for hv_id in self.hugvey_ids: @@ -191,6 +206,23 @@ class CentralCommand(object): logger.warn('Stopping command sender') s.close() + + + async def lightSender(self): + lightConn = udp_client.SimpleUDPClient( + self.config['light']['ip'], + self.config['light']['port']) + + logger.info(f"Ready to send light commands to: {self.config['light']['ip']}:{self.config['light']['port']}") + + while self.isRunning.is_set(): + route, data = await self.lightQueue.get() + logger.debug('Got light to send: {} {}'.format(route, data)) + lightConn.send_message(route, data) + await asyncio.sleep(.06) + + logger.warn('Stopping light sender') + lightConn._sock.close() async def instantiateHugvey(self, hugvey_id, msg): ''' @@ -203,17 +235,28 @@ class CentralCommand(object): ''' async with self.hugveyLock: # lock to prevent duplicates on creation if not hugvey_id in self.hugveys: - logger.info(f'Instantiate hugvey #{hugvey_id}') - h = HugveyState(hugvey_id, self) - h.config(msg['host'], msg['ip']) - self.hugveys[hugvey_id] = h thread = threading.Thread( - target=h.start, name=f"hugvey#{hugvey_id}") + target=self.hugveyStateRunner, args=(hugvey_id, msg), name=f"hugvey#{hugvey_id}") thread.start() else: logger.info(f'Reconfigure hugvey #{hugvey_id}') # (re)configure exisitng hugveys - h.config(msg['host'], msg['ip']) + self.hugveys[hugvey_id].config(msg['host'], msg['ip']) + + def hugveyStateRunner(self, hugvey_id, msg): + while self.isRunning.is_set(): + logger.info(f'Instantiate hugvey #{hugvey_id}') + h = HugveyState(hugvey_id, self) + h.config(msg['host'], msg['ip']) + self.hugveys[hugvey_id] = h + r = h.run() + print(self.hugveys.keys()) + self.hugveys.pop(hugvey_id) + if not r: + # stop if False, ie. when stream has gone + return + logger.critical(f'Hugvey stopped (crashed?). Reinstantiate after 5 sec') + time.sleep(5) async def timerEmitter(self): """ @@ -308,6 +351,8 @@ class CentralCommand(object): self.catchException(self.eventListener())) self.tasks['commandSender'] = self.loop.create_task( self.catchException(self.commandSender())) + self.tasks['lightSender'] = self.loop.create_task( + self.catchException(self.lightSender())) for hid in self.hugvey_ids: self.tasks['voiceListener'] = self.loop.create_task( self.catchException(self.voiceListener(hid))) @@ -336,10 +381,17 @@ class HugveyState(object): Manages server connections & voice parsing etc. """ + # all statusses can only go up or down, except for gone, which is an error state: + # off <-> blocked <-> awaiting <-> running <-> paused + STATE_OFF = "off" + STATE_BLOCKED = "blocked" + STATE_AWAITING = "awaiting" + STATE_RUNNING = "running" STATE_PAUSE = "paused" STATE_GONE = "gone" - STATE_RUNNING = "running" - STATE_FINISHED = "finished" + + + def __init__(self, id: int, command: CentralCommand): self.id = id @@ -352,23 +404,29 @@ class HugveyState(object): self.language_code = 'en-GB' self.story = None self.streamer = None - self.status = self.STATE_PAUSE self.google = None + self.player = None self.recorder = None self.notShuttingDown = True # TODO: allow shutdown of object self.startMsgId = None self.eventLogger = eventLogger.getChild(f"{self.id}") - self.lightConn = udp_client.SimpleUDPClient( - command.config['light']['ip'], - command.config['light']['port']) - + self.setStatus(self.STATE_BLOCKED) + + self.requireRestartAfterStop = None + def __del__(self): + self.logger.warn("Destroying hugvey object") + def getStatus(self): - if self.story.isFinished(): - return self.STATE_FINISHED return self.status + def setStatus(self, status): + self.status = status + lightOn = status in [self.STATE_AWAITING, self.STATE_PAUSE, self.STATE_GONE] + self.setLightStatus(lightOn) + + def config(self, hostname, ip): self.ip = ip self.hostname = hostname @@ -388,21 +446,15 @@ class HugveyState(object): """ self.command.commandHugvey(self.id, msg) - def start(self): - """ - Start the tasks - """ - self.isRunning.set() - self.status = self.STATE_RUNNING - + def run(self): tasks = asyncio.gather( self.catchException(self.processAudio()), self.catchException(self.handleEvents()), self.catchException(self.playStory()), loop=self.loop) -# self.pause() self.loop.run_until_complete(tasks) - + self.logger.warning(f"FINISHED RUNNING {self.id}") + return self.requireRestartAfterStop async def catchException(self, awaitable): try: @@ -410,10 +462,12 @@ class HugveyState(object): await awaitable except Exception as e: self.logger.exception(e) - self.logger.critical(f"Hugvey restart required but not implemented yet") + self.logger.critical(f"Hugvey crash") self.eventLogger.critical(f"error: {e}") - - # TODO: restart + + # restart + # TODO: test proper functioning + self.shutdown() def queueEvent(self, msg): if 'time' not in msg: @@ -428,26 +482,34 @@ class HugveyState(object): def _queueEvent(self, msg): self.logger.debug(f"Queue event in hugvey loop: {msg}") self.eventQueue.put_nowait(msg) - self.story.events.append(msg) + if self.story: + self.story.events.append(msg) + else: + self.logger.critical("Cannot queue event, as no story is present.") async def handleEvents(self): self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues - while self.command.isRunning.is_set(): - event = await self.eventQueue.get() - self.logger.debug("Received: {}".format(event)) + while self.notShuttingDown: + try: + event = await asyncio.wait_for(self.eventQueue.get(), 2) + except asyncio.futures.TimeoutError as e: + continue - if event['event'] == 'connection' and not self.isRunning.is_set(): - self.restart() + self.logger.debug("Received: {}".format(event)) if event['event'] == 'language': self.setLanguage(event['code']) if event['event'] == 'pause': self.pause() + if event['event'] == 'block': + self.block() + if event['event'] == 'unblock': + self.awaiting() if event['event'] == 'restart': self.restart() if event['event'] == 'finish': - self.story.finish() + self.story._finish() # finish story AND hugvey state if event['event'] == 'resume': self.resume() @@ -466,17 +528,6 @@ class HugveyState(object): self.startMsgId = event['msg_id'] self.logger.debug(f"Restart from {self.startMsgId}") self.restart() -# self.pause() # this doesn't reload story data!! Make sure we restart - # wait a tat for the restart loops to complete -# await asyncio.sleep(.1) -# self.logger.debug('restarted') -# msg = self.story.get(event['msg_id']) -# if not msg: -# self.logger.critical("Invalid ID to play: {}".format(event['msg_id'])) -# else: -# self.story.setCurrentMessage(msg) -# -# self.resume() self.eventQueue = None @@ -488,8 +539,8 @@ class HugveyState(object): self.language_code = language_code self.google.setLanguage(language_code) - self.restart() - + if self.isRunning.is_set(): + self.restart() # self.story.reset() # self.story.setStoryData(self.command.languages[language_code]) @@ -500,7 +551,7 @@ class HugveyState(object): if self.story: self.story.pause() self.isRunning.clear() - self.status = self.STATE_PAUSE + self.setStatus(self.STATE_PAUSE) def resume(self): """ Start playing without reset""" @@ -510,28 +561,36 @@ class HugveyState(object): if self.story: self.story.resume() self.isRunning.set() - self.status = self.STATE_RUNNING + self.setStatus(self.STATE_RUNNING) def restart(self): - """ Start playing with reset""" + """Start playing with reset""" self.logger.info('Restart') if self.story: self.story.stop() self.resume() - self.isRunning.set() - - def finish(self): - """Finish playback""" - self.logger.info('Finish') - self.pause() + + def block(self): + """Block a hugvey""" + self.logger.info('block') + if self.google: + self.google.pause() + if self.story: + self.story.finish() self.isRunning.clear() - self.status = self.STATE_FINISHED - self.setLightStatus(True) + self.setStatus(self.STATE_BLOCKED) + + def awaiting(self): + """Put in awaiting mode""" + self.logger.info('Finish/Await') + self.pause() + self.setStatus(self.STATE_AWAITING) def setLightStatus(self, on): status = 1 if on else 0 - self.logger.info(f"Send /hugvey {status}") - self.lightConn.send_message("/hugvey", [self.id, status]) + self.logger.log(LOG_BS, f"Send /hugvey {status}") + + self.command.commandLight('/hugvey', [self.id, status]) def gone(self): '''Status to 'gone' as in, shutdown/crashed/whatever @@ -541,26 +600,54 @@ class HugveyState(object): self.story.stop() self.logger.info('Gone') - self.status = self.STATE_GONE + self.setStatus(self.STATE_GONE) + + + def shutdown(self, definitive = False): + self.logger.info(f"Start shutdown sequence {definitive}") + if self.streamer: + self.streamer.stop() + if self.story: + self.story.shutdown() + self.story = None + + # shutdown for stream consumers already ran. Only clear references + if self.google: + self.google = None + if self.player: + self.player = None + if self.recorder: + self.recorder = None + + if self.requireRestartAfterStop is None: + # prevent double setting of the same variable + # first call sometimes triggers second + self.requireRestartAfterStop = not definitive + + self.notShuttingDown = False async def playStory(self): while self.notShuttingDown: - await self.isRunning.wait() - - # new story instance on each run - port = self.command.config['web']['port'] - self.story = Story(self, port) - startMsgId = self.startMsgId - self.startMsgId = None # use only once, reset before 'run' - self.logger.warn(f"Starting from {startMsgId}") - if not self.streamer: - await asyncio.sleep(1) - - self.streamer.triggerStart() - self.story.setStoryData(self.command.languages[self.language_code]) - self.setLightStatus(False) - await self.story.run(startMsgId) + try: + await asyncio.wait_for(self.isRunning.wait(), 1) + except asyncio.futures.TimeoutError as e: + # timeout + catch so we can shutdown if needed without infinite await + continue + else: + # new story instance on each run + port = self.command.config['web']['port'] + self.story = Story(self, port) + startMsgId = self.startMsgId + self.startMsgId = None # use only once, reset before 'run' + self.logger.warn(f"Starting from {startMsgId}") + if not self.streamer: + await asyncio.sleep(1) + + self.streamer.triggerStart() + self.story.setStoryData(self.command.languages[self.language_code]) + self.setLightStatus(False) + await self.story.run(startMsgId) # self.story = None def getStreamer(self): @@ -598,14 +685,18 @@ class HugveyState(object): Start the audio streamer service ''' - self.logger.debug("Start audio stream") + self.logger.debug("Start audio loop") while self.notShuttingDown: - await self.isRunning.wait() - - self.logger.debug("Start audio stream") - await self.getStreamer().run() - self.logger.critical(f"stream has left the building from {self.ip}") - self.eventLogger.critical(f"error: stream has left the building from {self.ip}") - # if we end up here, the streamer finished, probably meaning hte hugvey shutdown - self.gone() + try: + await asyncio.wait_for(self.isRunning.wait(), 1) + except asyncio.futures.TimeoutError as e: + # timeout + catch so we can shutdown if needed without infinite await + continue + else: + self.logger.debug("Start audio stream") + await self.getStreamer().run() + self.logger.critical(f"stream has left the building from {self.ip}") + self.eventLogger.critical(f"error: stream has left the building from {self.ip}") + # if we end up here, the streamer finished, probably meaning hte hugvey shutdown + self.shutdown(True) diff --git a/hugvey/panopticon.py b/hugvey/panopticon.py index 9e358bd..15942c7 100644 --- a/hugvey/panopticon.py +++ b/hugvey/panopticon.py @@ -52,6 +52,10 @@ def getWebSocketHandler(central_command): self.msgInit() elif msg['action'] == 'get_status': self.msgStatus(msg['selected_id']) + elif msg['action'] == 'block': + self.msgBlock(msg['hugvey']) + elif msg['action'] == 'unblock': + self.msgUnblock(msg['hugvey']) elif msg['action'] == 'resume': self.msgResume(msg['hugvey']) elif msg['action'] == 'pause': @@ -95,6 +99,12 @@ def getWebSocketHandler(central_command): msg = self.getStatusMsg() self.send(msg) + def msgBlock(self, hv_id): + central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'block'}) + + def msgUnblock(self, hv_id): + central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'unblock'}) + def msgResume(self, hv_id): central_command.hugveys[hv_id].eventQueue.put_nowait({'event': 'resume'}) diff --git a/hugvey/speech/google.py b/hugvey/speech/google.py index cb36103..ccf099c 100644 --- a/hugvey/speech/google.py +++ b/hugvey/speech/google.py @@ -95,7 +95,8 @@ class GoogleVoiceClient(object): while not self.toBeShutdown: try: self.logger.info("wait for Google Voice") - self.isRunning.wait() + if not self.isRunning.wait(timeout=1): + continue # re-ceck toBeShutdown self.logger.info("Starting Google Voice") @@ -175,6 +176,7 @@ class GoogleVoiceClient(object): "transcript": self.lastNonFinalTranscript.strip(), } self.hugvey.queueEvent(msg) + self.logger.warn("Stop google run()") # finish means wrapping of hugvey#3v thread def receive(self, chunk): @@ -210,9 +212,11 @@ class GoogleVoiceClient(object): def shutdown(self): self.toBeShutdown = True + self.hugvey = None def triggerStart(self): pass - + def __del__(self): + self.logger.warn("Destroyed google object") diff --git a/hugvey/speech/player.py b/hugvey/speech/player.py index 0f7e252..b376fdc 100644 --- a/hugvey/speech/player.py +++ b/hugvey/speech/player.py @@ -92,4 +92,8 @@ class Player: self.p.terminate() def triggerStart(self): - pass \ No newline at end of file + pass + + + def __del__(self): + self.logger.warn("Destroyed player object") \ No newline at end of file diff --git a/hugvey/speech/recorder.py b/hugvey/speech/recorder.py index c1ca0b4..8427ed8 100644 --- a/hugvey/speech/recorder.py +++ b/hugvey/speech/recorder.py @@ -26,6 +26,7 @@ class Recorder: self.src_rate = src_rate self.main_folder = out_folder # unfortunately not every device plays 16kHz audio streams self.running = False + self.data = array('h') def start(self): self.subsequentMutedFrames = 0 @@ -107,4 +108,7 @@ class Recorder: def log(self, origin, msg): with open(os.path.join(self.out_folder, "log.txt"), "a") as fp: fp.write(f"{origin}: {msg}\n") - \ No newline at end of file + + + def __del__(self): + self.logger.warn("Destroyed recorder object") \ No newline at end of file diff --git a/hugvey/speech/streamer.py b/hugvey/speech/streamer.py index 9fd84f2..986afed 100644 --- a/hugvey/speech/streamer.py +++ b/hugvey/speech/streamer.py @@ -55,6 +55,8 @@ class AudioStreamer(object): for consumer in self.consumers: consumer.shutdown() + + self.consumers = [] def process(self, chunk): @@ -65,4 +67,8 @@ class AudioStreamer(object): def triggerStart(self): # start a (new) run on the hugvey. Send it to the consumers that need it for consumer in self.consumers: - consumer.triggerStart() \ No newline at end of file + consumer.triggerStart() + + + def __del__(self): + self.logger.warn("Destroyed streamer object") \ No newline at end of file diff --git a/hugvey/story.py b/hugvey/story.py index be30742..1d44f5c 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -907,6 +907,10 @@ class Story(object): self.logger.info("Stop Story") if self.isRunning: self.isRunning = False + + def shutdown(self): + self.stop() + self.hugvey = None async def _processPendingEvents(self): # Gather events: @@ -948,7 +952,7 @@ class Story(object): self.currentDiversion = None else: self.logger.info("THE END!") - self.finish() + self._finish() return if e['event'] == 'speech': @@ -1057,7 +1061,7 @@ class Story(object): directions = self.getCurrentDirections() await self._processDirections(directions) - + # TODO create timer event # self.commands.append({'msg':'TEST!'}) @@ -1165,14 +1169,20 @@ class Story(object): return False + def _finish(self): + """ + Finish story and set hugvey to the right state + """ + self.finish() + #stop google etc: + self.hugvey.awaiting() + def finish(self): + """ + Finish only the story + """ self.logger.info(f"Finished story for {self.hugvey.id}") self.hugvey.eventLogger.info("story: finished") self.stop() self.finish_time = time.time() self.timer.pause() - #stop google etc: - self.hugvey.finish() - - - diff --git a/www/css/styles.css b/www/css/styles.css index 0063d29..5eb8f25 100644 --- a/www/css/styles.css +++ b/www/css/styles.css @@ -103,12 +103,14 @@ img.icon { text-align: center; } #status .hugvey.hugvey--gone { background-image: linear-gradient(to top, orange, #ce5c00); } - #status .hugvey.hugvey--paused { + #status .hugvey.hugvey--blocked { background-image: linear-gradient(to top, #888a85, #555753); } - #status .hugvey.hugvey--finished { - background-image: linear-gradient(to top, #888a85, #35a589); } - #status .hugvey.hugvey--finished .status { + #status .hugvey.hugvey--awaiting { + background-image: linear-gradient(to top, #888a85, #e2f04a); } + #status .hugvey.hugvey--awaiting .status { color: darkgreen; } + #status .hugvey.hugvey--paused { + background-image: linear-gradient(to top, #587457, #e2f04a); } #story { position: relative; diff --git a/www/index.html b/www/index.html index 668c95c..4af8a01 100644 --- a/www/index.html +++ b/www/index.html @@ -48,8 +48,8 @@ {{ hv.language }} -
{{timer(hv, - 'finished')}}
+
@@ -64,10 +64,12 @@ {{ timer(hv, 'duration') }}
+
Unblock
+
Block
+
Start
+
Finish
Pause
Resume
-
Restart
-
Finish
diff --git a/www/js/hugvey_console.js b/www/js/hugvey_console.js index 7e24df5..e352b28 100644 --- a/www/js/hugvey_console.js +++ b/www/js/hugvey_console.js @@ -24,6 +24,14 @@ class Panopticon { panopticon.hugveys.selectedId = null; return panopticon.loadNarrative( code, file ); }, + block: function(hv) { + hv.status = "loading"; + return panopticon.block(hv.id); + }, + unblock: function(hv) { + hv.status = "loading"; + return panopticon.unblock(hv.id); + }, pause: function(hv) { hv.status = "loading"; return panopticon.pause(hv.id); @@ -285,7 +293,12 @@ class Panopticon { req.open( "GET", "/local/" + file ); req.send(); } - + block( hv_id ) { + this.send( { action: 'block', hugvey: hv_id } ) + } + unblock( hv_id ) { + this.send( { action: 'unblock', hugvey: hv_id } ) + } resume( hv_id ) { this.send( { action: 'resume', hugvey: hv_id } ) } diff --git a/www/scss/styles.scss b/www/scss/styles.scss index 397b05b..ca62a1d 100644 --- a/www/scss/styles.scss +++ b/www/scss/styles.scss @@ -163,30 +163,21 @@ img.icon{ // } } - &.hugvey--paused{ + &.hugvey--blocked{ background-image: linear-gradient(to top, rgb(136, 138, 133), rgb(85, 87, 83)); -// &::after{ -// content: 'disconnected'; -// font-style: italic; -// color: gray; -// text-align:center; -// // font-size: 30pt; -// } } - &.hugvey--finished{ - background-image: linear-gradient(to top, rgb(136, 138, 133), #35a589); + &.hugvey--awaiting{ + background-image: linear-gradient(to top, rgb(136, 138, 133), #e2f04a); .status{ color: darkgreen; } -// &::after{ -// content: 'disconnected'; -// font-style: italic; -// color: gray; -// text-align:center; -// // font-size: 30pt; -// } } + + &.hugvey--paused{ + background-image: linear-gradient(to top, #587457, #e2f04a); + } + } }