diff --git a/hugvey/central_command.py b/hugvey/central_command.py index 63569e6..e623cef 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -224,7 +224,7 @@ class CentralCommand(object): logger.warn('Stopping light sender') lightConn._sock.close() - async def instantiateHugvey(self, hugvey_id, msg): + def instantiateHugvey(self, hugvey_id): ''' Start a HugveyState, according to a show_yourself reply @@ -233,24 +233,19 @@ class CentralCommand(object): 'host': socket.gethostname(), 'ip': self.getIp(), ''' - async with self.hugveyLock: # lock to prevent duplicates on creation - if not hugvey_id in self.hugveys: - thread = threading.Thread( - target=self.hugveyStateRunner, args=(hugvey_id, msg), name=f"hugvey#{hugvey_id}") - thread.start() - else: - logger.info(f'Reconfigure hugvey #{hugvey_id}') - # (re)configure exisitng hugveys - self.hugveys[hugvey_id].config(msg['host'], msg['ip']) +# async with self.hugveyLock: # lock to prevent duplicates on creation + if not hugvey_id in self.hugveys: + thread = threading.Thread( + target=self.hugveyStateRunner, args=(hugvey_id,), name=f"hugvey#{hugvey_id}") + thread.start() - def hugveyStateRunner(self, hugvey_id, msg): + def hugveyStateRunner(self, hugvey_id): while self.isRunning.is_set(): logger.info(f'Instantiate hugvey #{hugvey_id}') h = HugveyState(hugvey_id, self) - h.config(msg['host'], msg['ip']) +# h.config(msg['host'], msg['ip']) self.hugveys[hugvey_id] = h r = h.run() - print(self.hugveys.keys()) self.hugveys.pop(hugvey_id) if not r: # stop if False, ie. when stream has gone @@ -304,13 +299,13 @@ class CentralCommand(object): "Message from alien Hugvey: {}".format(hugvey_id)) continue elif hugvey_id not in self.hugveys: - if msg['event'] == 'connection': - # Create a hugvey - await self.instantiateHugvey(hugvey_id, msg) - else: - logger.warning( - "Message from uninstantiated Hugvey {}".format(hugvey_id)) - logger.debug("Message contains: {}".format(msg)) +# if msg['event'] == 'connection': +# # Create a hugvey +# self.instantiateHugvey(hugvey_id, msg) +# else: + logger.warning( + "Message from uninstantiated Hugvey {}".format(hugvey_id)) + logger.debug("Message contains: {}".format(msg)) continue else: self.hugveys[hugvey_id].queueEvent(msg) @@ -356,6 +351,7 @@ class CentralCommand(object): for hid in self.hugvey_ids: self.tasks['voiceListener'] = self.loop.create_task( self.catchException(self.voiceListener(hid))) + self.instantiateHugvey(hid) # we want the web interface in a separate thread self.panopticon_thread = threading.Thread( @@ -369,7 +365,7 @@ class CentralCommand(object): async def catchException(self, awaitable): try: - print(awaitable) +# print(awaitable) await awaitable except Exception as e: logger.exception(e) @@ -382,24 +378,23 @@ class HugveyState(object): """ # all statusses can only go up or down, except for gone, which is an error state: - # off <-> blocked <-> awaiting <-> running <-> paused + # off <-> blocked <-> available <-> running <-> paused STATE_OFF = "off" STATE_BLOCKED = "blocked" - STATE_AWAITING = "awaiting" + STATE_AVAILABLE = "available" STATE_RUNNING = "running" STATE_PAUSE = "paused" STATE_GONE = "gone" - - - def __init__(self, id: int, command: CentralCommand): self.id = id self.command = command self.logger = mainLogger.getChild(f"{self.id}").getChild("command") self.loop = asyncio.new_event_loop() - self.isConfigured = False + self.isConfigured = None self.isRunning = asyncio.Event(loop=self.loop) + self.isRunning.clear() + self.eventQueue = None self.language_code = 'en-GB' self.story = None @@ -411,7 +406,7 @@ class HugveyState(object): self.startMsgId = None self.eventLogger = eventLogger.getChild(f"{self.id}") - self.setStatus(self.STATE_BLOCKED) + self.setStatus(self.STATE_GONE) self.requireRestartAfterStop = None @@ -423,7 +418,7 @@ class HugveyState(object): def setStatus(self, status): self.status = status - lightOn = status in [self.STATE_AWAITING, self.STATE_PAUSE, self.STATE_GONE] + lightOn = status in [self.STATE_AVAILABLE, self.STATE_PAUSE] self.setLightStatus(lightOn) self.eventLogger.info(f"status: {self.status}") @@ -431,14 +426,19 @@ class HugveyState(object): def config(self, hostname, ip): self.ip = ip self.hostname = hostname - self.logger.info( - f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") - if self.isConfigured == True: + if self.isConfigured is not None: # a reconfiguration/reconnection pass + else: + self.logger.info( + f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") + + if self.status == self.STATE_GONE: + # turn on :-) + self.setStatus(self.STATE_BLOCKED) - self.isConfigured = True + self.isConfigured = time.time() def sendCommand(self, msg): """ @@ -448,6 +448,7 @@ class HugveyState(object): self.command.commandHugvey(self.id, msg) def run(self): + self.logger.info(f"Await hugvey #{self.id}") tasks = asyncio.gather( self.catchException(self.processAudio()), self.catchException(self.handleEvents()), @@ -481,8 +482,16 @@ class HugveyState(object): self.loop.call_soon_threadsafe(self._queueEvent, msg) def _queueEvent(self, msg): + """ + Put event in both the event loop for the story as well as the Hugvey State handler + """ self.logger.debug(f"Queue event in hugvey loop: {msg}") self.eventQueue.put_nowait(msg) + + # connection events don't need to go to the story + if msg['event'] == 'connection': + return + if self.story: self.story.events.append(msg) else: @@ -494,10 +503,20 @@ class HugveyState(object): try: event = await asyncio.wait_for(self.eventQueue.get(), 2) except asyncio.futures.TimeoutError as e: + # detect missing heartbeat: + if self.isConfigured and time.time() - self.isConfigured > 5: + self.gone() continue self.logger.debug("Received: {}".format(event)) - + if event['event'] == 'connection': +# 'event': 'connection', +# 'id': self.hugvey_id, +# 'host': socket.gethostname(), +# 'ip': self.getIp(), + self.config(event['host'], event['ip']) + + if event['event'] == 'language': self.setLanguage(event['code']) @@ -506,7 +525,7 @@ class HugveyState(object): if event['event'] == 'block': self.block() if event['event'] == 'unblock': - self.awaiting() + self.available() if event['event'] == 'restart': self.restart() if event['event'] == 'finish': @@ -581,11 +600,11 @@ class HugveyState(object): self.isRunning.clear() self.setStatus(self.STATE_BLOCKED) - def awaiting(self): - """Put in awaiting mode""" + def available(self): + """Put in available mode""" self.logger.info('Finish/Await') self.pause() - self.setStatus(self.STATE_AWAITING) + self.setStatus(self.STATE_AVAILABLE) def setLightStatus(self, on): status = 1 if on else 0 @@ -601,6 +620,7 @@ class HugveyState(object): self.story.stop() self.logger.info('Gone') + self.isConfigured = None self.setStatus(self.STATE_GONE) @@ -701,4 +721,4 @@ class HugveyState(object): self.logger.critical(f"stream has left the building from {self.ip}") self.eventLogger.critical(f"error: stream has left the building from {self.ip}") # if we end up here, the streamer finished, probably meaning hte hugvey shutdown - self.shutdown(True) + self.gone() diff --git a/hugvey/client.py b/hugvey/client.py index 2be30af..43508a7 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -385,6 +385,7 @@ class CommandHandler(object): @staticmethod def getIp(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # TODO: make it a local ip, eg. 192.168.1.1 s.connect(("185.66.250.60", 80)) return s.getsockname()[0] @@ -425,6 +426,11 @@ class CommandHandler(object): await asyncio.sleep(0.05) s.close() + + async def heartbeat(self): + while True: + self.showMyself() + await asyncio.sleep(3) class Hugvey(object): @@ -480,6 +486,6 @@ class Hugvey(object): asyncio.ensure_future(self.voice_server.start()) asyncio.ensure_future(self.cmd_server.command_listener()) asyncio.ensure_future(self.cmd_server.event_sender()) - self.cmd_server.showMyself() + asyncio.ensure_future(self.cmd_server.heartbeat()) loop.run_forever() logger.info('done') diff --git a/hugvey/speech/streamer.py b/hugvey/speech/streamer.py index 986afed..facef95 100644 --- a/hugvey/speech/streamer.py +++ b/hugvey/speech/streamer.py @@ -30,7 +30,7 @@ class AudioStreamer(object): address = "tcp://{}:{}".format(self.address, self.port) self.ctx = Context.instance() self.socket = self.ctx.socket(zmq.SUB) - self.socket.setsockopt(zmq.RCVTIMEO, 4000) # timeout: 8 sec + self.socket.setsockopt(zmq.RCVTIMEO, 6000) # timeout: 8 sec self.socket.subscribe('') # self.socket.setsockopt(zmq.CONFLATE, 1) self.socket.connect(address) diff --git a/hugvey/story.py b/hugvey/story.py index 156ee85..5e487cd 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -1084,6 +1084,10 @@ class Story(object): for i in range(len(self.events)): await self._processPendingEvents() + +# Test stability of Central Command with deliberate crash +# if self.timer.getElapsed() > 5: +# raise Exception('test') # The finish is not here anymore, but only on the playbackFinish event. diff --git a/www/css/styles.css b/www/css/styles.css index ce49e52..baaf91a 100644 --- a/www/css/styles.css +++ b/www/css/styles.css @@ -103,11 +103,15 @@ img.icon { text-align: center; } #status .hugvey.hugvey--gone { background-image: linear-gradient(to top, orange, #ce5c00); } + #status .hugvey.hugvey--loading { + background-image: linear-gradient(to top, #576074, #3581a5); } + #status .hugvey.hugvey--loading .status { + color: white; } #status .hugvey.hugvey--blocked { background-image: linear-gradient(to top, #888a85, #555753); } - #status .hugvey.hugvey--awaiting { + #status .hugvey.hugvey--available { background-image: linear-gradient(to top, #888a85, #e2f04a); } - #status .hugvey.hugvey--awaiting .status { + #status .hugvey.hugvey--available .status { color: darkgreen; } #status .hugvey.hugvey--paused { background-image: linear-gradient(to top, #587457, #e2f04a); } diff --git a/www/index.html b/www/index.html index 9290429..1388dbc 100644 --- a/www/index.html +++ b/www/index.html @@ -63,9 +63,9 @@