From 0861215794a24e9e1cb37e9ff9002c635f3b554d Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Wed, 1 May 2019 18:27:10 +0200 Subject: [PATCH] Interruption diversions for specific timeouts etc. --- hugvey/central_command.py | 4 +- hugvey/client.py | 4 ++ hugvey/panopticon.py | 2 +- hugvey/speech/streamer.py | 20 +++--- hugvey/story.py | 140 ++++++++++++++++++++++++++++++-------- www/js/hugvey_console.js | 50 +++++++++++++- 6 files changed, 177 insertions(+), 43 deletions(-) diff --git a/hugvey/central_command.py b/hugvey/central_command.py index cdb20ca..76b35f8 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -96,6 +96,7 @@ class CentralCommand(object): def loadLanguages(self): logger.debug('load language files') self.languages = {} + self.languageCache = {} for lang in self.config['languages']: lang_filename = os.path.join(self.config['web']['files_dir'], lang['file']) @@ -511,7 +512,8 @@ class HugveyState(object): event = await asyncio.wait_for(self.eventQueue.get(), 2) except asyncio.futures.TimeoutError as e: # detect missing heartbeat: - if self.isConfigured and time.time() - self.isConfigured > 5: + if self.isConfigured and time.time() - self.isConfigured > 15: + self.logger.error("Hugvey did not send heartbeat.") self.gone() continue diff --git a/hugvey/client.py b/hugvey/client.py index 066f406..f96f16e 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -284,6 +284,10 @@ class CommandHandler(object): # use duration for timing the popen duration (and redo it if needed) duration = cmd['duration'] if 'duration' in cmd else None self.playingMsgId = msgId + + if self.playPopen: + logger.info("Interrupting playback of {}".format(self.playingMsgId)) + self.playPopen.terminate() err = None if file is None and text is None: diff --git a/hugvey/panopticon.py b/hugvey/panopticon.py index 0ebf21c..b4bb489 100644 --- a/hugvey/panopticon.py +++ b/hugvey/panopticon.py @@ -270,6 +270,6 @@ class Panopticon(object): msg['args'] = items[2] j = json.dumps(msg) - print(j) + logger.debug(j) self.loop.add_callback(wsHandler.write_to_clients, j) \ No newline at end of file diff --git a/hugvey/speech/streamer.py b/hugvey/speech/streamer.py index facef95..0581d77 100644 --- a/hugvey/speech/streamer.py +++ b/hugvey/speech/streamer.py @@ -30,25 +30,25 @@ class AudioStreamer(object): address = "tcp://{}:{}".format(self.address, self.port) self.ctx = Context.instance() self.socket = self.ctx.socket(zmq.SUB) - self.socket.setsockopt(zmq.RCVTIMEO, 6000) # timeout: 8 sec + self.socket.setsockopt(zmq.RCVTIMEO, 8000) # timeout: 8 sec self.socket.subscribe('') # self.socket.setsockopt(zmq.CONFLATE, 1) self.socket.connect(address) # s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.logger.info("Attempt connection on {}:{}".format(self.address, self.port)) + self.logger.info("0mq SUBSCRIBE for audio stream on {}:{}".format(self.address, self.port)) # s.connect((self.address, self.port)) # - try: - while self.isRunning: + while self.isRunning: + try: data = await self.socket.recv() - # self.logger.debug('chunk received') +# self.logger.debug('chunk received') self.process(data) - except zmq.error.Again as timeout_e: - self.logger.warn("Timeout of audiostream. Hugvey shutdown?") - finally: - self.logger.info("Close socket on {}:{}".format(self.address, self.port)) - self.socket.close() + except zmq.error.Again as timeout_e: + self.logger.warn("Timeout of audiostream. Hugvey shutdown?") + + self.logger.info("Close socket on {}:{}".format(self.address, self.port)) + self.socket.close() def stop(self): self.isRunning = False diff --git a/hugvey/story.py b/hugvey/story.py index 355ec50..9b20197 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -14,6 +14,7 @@ from zmq.asyncio import Context import zmq import wave from pythonosc import udp_client +import random mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("narrative") @@ -254,11 +255,14 @@ class Condition(object): self.type = None self.vars = {} self.logInfo = None + self.originalJsonString = None + self.usedContainsDuration = None @classmethod def initFromJson(conditionClass, data, story): condition = conditionClass(data['@id']) condition.type = data['type'] + condition.originalJsonString = json.dumps(data) # TODO: should Condition be subclassed? if data['type'] == "replyContains": @@ -416,6 +420,7 @@ class Condition(object): capturedVariables, timeSinceReply ) + self.usedContainsDuration = float(delay['waitTime']) return True break # don't check other delays # wait for delay to match @@ -487,6 +492,7 @@ class Diversion(object): self.hasHit = False self.disabled = False self.type = type + self.counter = 0 if type == 'no_response': self.method = self._divergeIfNoResponse self.finaliseMethod = self._returnAfterNoResponse @@ -505,6 +511,8 @@ class Diversion(object): if type == 'repeat': self.method = self._divergeIfRepeatRequest self.regex = re.compile(self.params['regex']) + if type == 'interrupt': + self.method = self._divergeIfInterrupted if not self.method: raise Exception("No valid type given for diversion") @@ -521,7 +529,7 @@ class Diversion(object): 'id': self.id, } - async def divergeIfNeeded(self, story, direction: None): + async def divergeIfNeeded(self, story, direction = None): """ Validate if condition is met for the current story state Returns True when diverging @@ -542,21 +550,30 @@ class Diversion(object): direction.msgTo if direction else None, direction if direction else None) if r: - if self.type != 'repeat': + if self.type != 'repeat' and self.type !='interrupt': # repeat diversion should be usable infinte times self.hasHit = True story.addToLog(self) return r - def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True): + def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True, timeoutDuration = .5, replyContainsDurations = None): """ The finishes of this diversion's strand should point to the return message with the right timeout/timing. If hit, this direction should also notify this diversion. + + replyContainsDurations: list formatted as in JSON + [{ + "minReplyDuration": "0", + "waitTime": "3" + }] """ + self.counter +=1 + finishMessageIds = story.getFinishesForMsg(startMsg) - finalTimeoutDuration = 0.5 + finalTimeoutDuration = timeoutDuration + finalContainsDurations = replyContainsDurations #: :type story: Story #: :type originalDirection: Direction # story.directionsPerMsg[story.currentMessage.id] @@ -568,7 +585,8 @@ class Diversion(object): for condition in originalDirection.conditions: if condition.type == 'timeout': finalTimeoutDuration = float(condition.vars['seconds']) - break + if condition.type == 'replyContains': + finalContainsDurations = json.loads(condition.originalJsonString)['vars']['delays'] i = 0 for msgId in finishMessageIds: @@ -578,21 +596,41 @@ class Diversion(object): if not msg: continue - direction = Direction(f"{self.id}-{i}", msg, returnMsg) + direction = Direction(f"{self.id}-{i}-{self.counter}", msg, returnMsg) data = json.loads(f""" {{ - "@id": "{self.id}-c{i}", + "@id": "{self.id}-ct{i}-{self.counter}", "@type": "Condition", "type": "timeout", - "label": "", + "label": "Autogenerated Timeout", "vars": {{ - "seconds": "{finalTimeoutDuration}", - "onlyIfNoReply": false + "seconds": "{finalTimeoutDuration}" }} }} """) + data['vars']['onlyIfNoReply'] = finalContainsDurations is not None + + # TODO: also at replycontains if it exists, with the same timings condition = Condition.initFromJson(data, story) direction.addCondition(condition) + + if finalContainsDurations is not None: + data2 = json.loads(f""" + {{ + "@id": "{self.id}-cr{i}-{self.counter}", + "@type": "Condition", + "type": "replyContains", + "label": "Autogenerated Reply Contains", + "vars": {{ + "regex": "", + "instantMatch": false + }} }} + """) + data2['vars']['delays'] = finalContainsDurations + condition2 = Condition.initFromJson(data2, story) + direction.addCondition(condition2) + story.add(condition2) + direction.isDiversionReturn = True # will clear the currentDiversion on story story.logger.info(f"Created direction: {direction.id} {condition.id} with timeout {finalTimeoutDuration}s") story.add(condition) @@ -790,7 +828,7 @@ class Diversion(object): # no direction is here, as this diversion triggers before a direction is taken self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False) - await story.setCurrentMessage(msg) + await story.setCurrentMessage(msg, allowReplyInterrupt=True) story.currentDiversion = self story.timer.setMark('last_diversion_timeout') return True @@ -798,8 +836,31 @@ class Diversion(object): async def _returnAfterTimeout(self, story): story.logger.info(f"Finalise diversion: {self.id}") -# if self.params['returnAfterStrand']: -# await story.setCurrentMessage(self.returnMessage) + + async def _divergeIfInterrupted(self, story, msgFrom, msgTo, direction): + """ + This is here as a placeholder for the interruption diversion. + These will however be triggered differently + """ + + if story.currentDiversion or story.allowReplyInterrupt: + return False + + msg = story.get(self.params['msgId']) + if msg is None: + story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") + return + + self.returnMessage = story.currentMessage + # no direction is here, as this diversion triggers before a direction is taken + self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False, timeoutDuration=3, replyContainsDurations = [{ + "minReplyDuration": "0", + "waitTime": "2" + }]) + await story.setCurrentMessage(msg) + story.currentDiversion = self + + return True storyClasses = { @@ -875,9 +936,11 @@ class Story(object): self.currentMessage = None self.currentDiversion = None self.currentReply = None + self.allowReplyInterrupt = False self.timer = Stopwatch() self.isRunning = False self.diversions = [] + self.interruptionDiversions = [] self.variables = {} def pause(self): @@ -939,6 +1002,7 @@ class Story(object): self.elements = {} self.strands = {} self.diversions = [] + self.interruptionDiversions = [] self.directionsPerMsg = {} self.startMessage = None # The entrypoint to the graph self.variables = {} @@ -953,6 +1017,7 @@ class Story(object): # self.logger.debug(self.directionsPerMsg) self.diversions = [el for el in self.elements.values() if type(el) == Diversion] + self.interruptionDiversions = [el for el in self.elements.values() if type(el) == Diversion and el.type == 'interrupt'] if currentId: self.currentMessage = self.get(currentId) @@ -1107,21 +1172,28 @@ class Story(object): # messages that come in, in the case google is faster than our playbackFinish event. # (if this setup doesn't work, try to test on self.lastMsgFinish time anyway) # it keeps tricky with all these run conditions - self.logger.info("ignore speech while playing message") - continue + if len(self.interruptionDiversions) and not self.currentDiversion and not self.allowReplyInterrupt: + self.logger.warn("diverge when speech during playing message") + diversion = random.choice(self.interruptionDiversions) + #: :type diversion: Diversion + r = await diversion.divergeIfNeeded(self, None) + print(r) # is always needed :-) + else: + self.logger.info("ignore speech during playing message") + continue +# DEPRECATED WAY OF DOING IT: # message is still playing: - if self.currentMessage and not self.lastMsgFinishTime and self.previousReply and self.previousReply.forMessage.interruptCount < 4: - timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime() - if self.previousReply.forMessage.afterrunTime > timeDiff: - #interrupt only in given interval: - self.logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id)) - self.currentReply = self.previousReply - self.previousReply.forMessage.interruptCount += 1 - self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage, self.previousReply) +# if self.currentMessage and not self.lastMsgFinishTime and self.previousReply and self.previousReply.forMessage.interruptCount < 4: +# timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime() +# if self.previousReply.forMessage.afterrunTime > timeDiff: +# #interrupt only in given interval: +# self.logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id)) +# self.currentReply = self.previousReply +# self.previousReply.forMessage.interruptCount += 1 +# self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage, self.previousReply) # log if somebody starts speaking - # TODO: implement interrupt if self.currentReply is None: self.logger.info("Start speaking") self.currentReply= Reply(self.currentMessage) @@ -1143,13 +1215,15 @@ class Story(object): async def _processDirections(self, directions): ':type directions: list(Direction)' chosenDirection = None + metCondition = None for direction in directions: for condition in direction.conditions: if condition.isMet(self): self.logger.info("Condition is met: {0}, going to {1}".format( condition.id, direction.msgTo.id)) self.hugvey.eventLogger.info("condition: {0}".format(condition.id)) - self.hugvey.eventLogger.info("direction: {0}".format(direction.id)) + self.hugvey.eventLogger.info("direction: {0}".format(direction.id)) + metCondition = condition direction.setMetCondition(condition) self.addToLog(condition) self.addToLog(direction) @@ -1158,11 +1232,19 @@ class Story(object): isDiverging = await self._processDiversions(chosenDirection) + allowReplyInterrupt = False + # in some cases, conditions should be allowed to interrupt the reply + if metCondition: + if metCondition.type == 'timeout' and not ('onlyIfNoReply' in metCondition.vars and metCondition.vars['onlyIfNoReply']): + allowReplyInterrupt = True + if metCondition.usedContainsDuration is not None and metCondition.usedContainsDuration < 0.1: + allowReplyInterrupt = True + if not isDiverging and chosenDirection: if chosenDirection.isDiversionReturn and self.currentDiversion: await self.currentDiversion.finalise(self) - await self.setCurrentMessage(chosenDirection.msgTo) + await self.setCurrentMessage(chosenDirection.msgTo, allowReplyInterrupt=allowReplyInterrupt) return chosenDirection @@ -1180,7 +1262,8 @@ class Story(object): activeNoResponseDiv = None for diversion in self.diversions: #: :type diversion: Diversion - if diversion.disabled or diversion.hasHit: + if diversion.disabled or diversion.hasHit or diversion.type == 'interrupt': + # interruptions are triggered somewhere else. continue if diversion.type == 'timeout': @@ -1269,7 +1352,7 @@ class Story(object): self.logger.debug("Stop renderer") - async def setCurrentMessage(self, message, useReply = None): + async def setCurrentMessage(self, message, useReply = None, allowReplyInterrupt = False): """ Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption. """ @@ -1288,6 +1371,7 @@ class Story(object): self.lastMsgTime = time.time() self.lastMsgFinishTime = None # to be filled in by the event self.lastMsgStartTime = None # to be filled in by the event + self.allowReplyInterrupt = allowReplyInterrupt # if not reset: self.previousReply = self.currentReply # we can use this for interrptions diff --git a/www/js/hugvey_console.js b/www/js/hugvey_console.js index ae09262..d514112 100644 --- a/www/js/hugvey_console.js +++ b/www/js/hugvey_console.js @@ -367,6 +367,9 @@ class Graph { div['params']['msgId'] = ""; div['params']['notForColor'] = ""; } + else if(type == 'interrupt') { + div['params']['msgId'] = ""; + } else if(type == 'timeout') { div['params']['interval'] = 20; div['params']['timesOccured'] = 0; @@ -382,7 +385,7 @@ class Graph { alert('invalid type for diversion'); } - if(type != 'repeat') { + if(type != 'repeat' && type != 'interrupt') { div['params']['notAfterMsgId'] = ""; } @@ -403,7 +406,7 @@ class Graph { let msgEl = document.getElementById( 'msg' ); msgEl.innerHTML = ""; - let divsNoResponse =[], divsRepeat = [], divsReplyContains = [], divsTimeouts = []; + let divsNoResponse =[], divsRepeat = [], divsReplyContains = [], divsTimeouts = [], divsInterrupts = []; for(let div of this.diversions) { let notAfterMsgIdEl = ""; @@ -691,9 +694,37 @@ class Graph { ) )); } + if(div['type'] == 'interrupt'){ + let msgOptions = [crel('option',"")]; + let starts = this.messages.filter( m => m.hasOwnProperty('start') && m['start'] == true); + for(let startMsg of starts) { + let optionParams = {}; + if(div['params']['msgId'] == startMsg['@id']) { + optionParams['selected'] = 'selected'; + } + msgOptions.push(crel('option', optionParams , startMsg['@id'])); + } + + divsInterrupts.push(crel( + 'div', {'class': 'diversion'}, + crel('h3', div['@id']), + crel( + 'div', { + 'class':'btn btn--delete', + 'on': { + 'click': (e) => this.deleteDiversion(div) + } + }, 'Delete diversion'), + crel('label', 'Go to (start message)', + crel('select', {'on': { + 'change': (e) => div['params']['msgId'] = e.target.value + }}, ...msgOptions) + ) + )); + } } - console.log(divsReplyContains, divsNoResponse, divsRepeat, divsTimeouts); + console.log(divsReplyContains, divsNoResponse, divsRepeat, divsTimeouts, divsInterrupts); let divEl = crel( 'div', @@ -752,6 +783,19 @@ class Graph { }, 'New case for timeout' ) + ), + crel('div', + crel('h2', 'Interruptions (random pick)'), + ...divsInterrupts, + crel('div', + { + 'class': 'btn', + 'on': { + 'click': (e) => this.createDiversion('interrupt') + } + }, + 'New case for Interrupt' + ) ) );