From 062f056e9677e8616010d4232695d20e1449d01e Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Wed, 1 May 2019 12:37:35 +0200 Subject: [PATCH 1/3] Fix #36 - Timing for returning after diversion --- README.md | 83 +++++++++++++++++ hugvey/client.py | 2 +- hugvey/speech/recorder.py | 5 +- hugvey/story.py | 185 +++++++++++++++++++++++++++++++------- 4 files changed, 242 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index be3119b..4616f88 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,14 @@ for i in {1..6}; do rsync -av ~/hugvey/ pi@hugvey$i.local:/home/pi/hugvey/ --exc for i in {1..6}; do ssh pi@hugvey$i.local "sudo shutdown -h now"; done ``` +```bash +for i in {1..6}; do ssh pi@hugvey$i.local "supervisorctl restart hugvey_client"; done +``` + + +### Monitoring the server processes + +To make sure it will not die with 'Too many files open' ```bash lsof -p $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $9}'|sort -rn|uniq -c|sort -rn|head -20 @@ -110,3 +118,78 @@ or ```bash lsof | grep $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $11}'|sort -rn|uniq -c|sort -rn|head -20 ``` + + +## Branches + +multichannel +: Experiment with using alsa+sox to stream multiple channels to google. + + + +# Create Story + +## Messages + +Things that can/will be said by Hugvey + +Text +: The text that will be said. Or just a description if custom audio is uploaded. Variables can be entered by predending them with a $dollar_sign. + +Start +: Mark message as being the start of a strand/tree of messages. Used for eg. diversions and formatting of the editor + +Beginning +: There can only be one beginning. This is the message that the Hugvey will start with when starting the story. + +Chapter Start +: A chapter can be marked. This is used by the timeout diversions, as it only returns to the next chapter marker after diversion. Furthermore, it is used by diversions to prevent them from triggering if specific sections (chapter markers) have been played. + +Audio +: Upload a custom audio file to override the auto generated file + +Afterrun time +: _deprecated_ Was the time the microphone kept listening after triggering this message. It was used to have the Hugvey reconsider its direction. Not used anymore + +Volume factor +: Parameter send to the `play` command to increase/decrease the playback volume + +Tempo factor +: Parameter send to the `play` command to increase/decrease the playback speed + +Pitch factor +: Parameter send to the `play` command to increase/decrease the playback pitch (minus values for lower pitch) + +Color +: Color the message aids in finding it in the editor window. Also is used by _replycontains_ diversions to prevent from running in specific moments + +## Directions + +Directions connect messages from one to the other. Can be created in the editor by selecting a message, and shift+click on its follow-up message. Another way is to select a message and press 'create message' from the right bar. This new message will automatically be connected and inherit the same color. + +## Condition + +Messages only head to a specific following message when one of the direction's conditions is matched (OR-condition). First come, first served. + +The Conditions Description field allows for giving some info on the condition's reasons. + +Types of conditions: + +- timeout: timing finishing the playback of the message's audio + + seconds: the duration + + Only if no reply: timeout will be disabled after the person has spoken anything + + Reply needed: If checked, the timeout is counted when it is met. This counter is used by the consecutive-timeouts diversions. +- replyContains: Match the contents of the speech using a regex. Or just any speech. Used to capture variables. + + regex: The regex to match on. Variables can be matched using the python syntax to give the variable a name (?P\w+) + + three consecutive timings can be given: + + delay reply duration: the duration of the reply since hugvey stopped speaking. If it is more than this value (but less than the larger) it will use the given timing + + delay wait time: The time to wait after a person speaks. It doesn't wait for Google's `is_finished` parameter, but rather checks from Google's last response. This way, also short utterances sutch as 'hey' or 'ok' are also properly timed, as these often don't get an is_finished by Google. + + instant match: don't use any timings. the moment the regex matches on the speech in progress, the condition is met. +- variable: returns True if variable is set + + TODO +- diversion: returns True if diversion has ben taken. + + TODO + +## Diversions + +TODO \ No newline at end of file diff --git a/hugvey/client.py b/hugvey/client.py index 43508a7..9880248 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -110,7 +110,7 @@ class VoiceServer(object): input_device_name = None input_card_name = None - print(output_device_name, input_device_name) + logger.debug("Output: {}, Input: {}".format(output_device_name, input_device_name)) return { diff --git a/hugvey/speech/recorder.py b/hugvey/speech/recorder.py index e0eed63..9619dd5 100644 --- a/hugvey/speech/recorder.py +++ b/hugvey/speech/recorder.py @@ -37,9 +37,10 @@ class Recorder: self.currentTranscription = "" self.currentLog = [] - t = time.strftime("%Y%m%d-%H:%M:%S") + day = time.strftime("%Y%m%d") + t = time.strftime("%H:%M:%S") - self.out_folder = os.path.join(self.main_folder, f"{self.hv_id}", t) + self.out_folder = os.path.join(self.main_folder, day, f"{self.hv_id}", t) if not os.path.exists(self.out_folder): self.logger.debug(f"Create directory {self.out_folder}") self.target_folder = os.makedirs(self.out_folder, exist_ok=True) diff --git a/hugvey/story.py b/hugvey/story.py index 903f058..3595f77 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -14,7 +14,6 @@ from zmq.asyncio import Context import zmq import wave from pythonosc import udp_client -from builtins import isinstance mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("narrative") @@ -43,6 +42,7 @@ class Message(object): self.id = id self.text = text self.isStart = False + self.isStrandStart = False self.chapterStart = False self.reply = None # self.replyTime = None @@ -66,6 +66,7 @@ class Message(object): def initFromJson(message, data, story): msg = message(data['@id'], data['text']) msg.isStart = data['beginning'] if 'beginning' in data else False + msg.isStrandStart = data['start'] if 'start' in data else False msg.chapterStart = bool(data['chapterStart']) if 'chapterStart' in data else False msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0. msg.color = data['color'] if 'color' in data else None @@ -447,6 +448,7 @@ class Direction(object): #: :type self.conditions: list(Condition) self.conditions = [] self.conditionMet = None + self.isDiversionReturn = False def addCondition(self, condition: Condition): self.conditions.append(condition) @@ -517,7 +519,7 @@ class Diversion(object): 'id': self.id, } - async def divergeIfNeeded(self, story, msgFrom, msgTo): + async def divergeIfNeeded(self, story, direction: None): """ Validate if condition is met for the current story state Returns True when diverging @@ -532,23 +534,81 @@ class Diversion(object): # story.logger.warn(f"Block diversion {self.id} because of hit message {self.params['notAfterMsgId']}") return False - r = await self.method(story, msgFrom, msgTo) + r = await self.method(story, + direction.msgFrom if direction else None, + direction.msgTo if direction else None, + direction if direction else None) if r: self.hasHit = True story.addToLog(self) return r + def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True): + """ + The finishes of this diversion's strand should point to the return message + with the right timeout/timing. If hit, this direction should also notify + this diversion. + """ + finishMessageIds = story.getFinishesForMsg(startMsg) + finalTimeoutDuration = 0.5 + #: :type story: Story + #: :type originalDirection: Direction +# story.directionsPerMsg[story.currentMessage.id] + # take the timeouts that are on the current message, and apply it to our return + # as to have somewhat equal pace as to where we originate from + if inheritTiming: + for originalDirection in story.getCurrentDirections(): + # if originalDirection: + for condition in originalDirection.conditions: + if condition.type == 'timeout': + finalTimeoutDuration = float(condition.vars['seconds']) + break + + i = 0 + for msgId in finishMessageIds: + # Some very ugly hack to add a direction & condition + i+=1 + msg = story.get(msgId) + if not msg: + continue + + direction = Direction(f"{self.id}-{i}", msg, returnMsg) + data = json.loads(f""" + {{ + "@id": "{self.id}-c{i}", + "@type": "Condition", + "type": "timeout", + "label": "", + "vars": {{ + "seconds": "{finalTimeoutDuration}", + "onlyIfNoReply": false + }} + }} + """) + condition = Condition.initFromJson(data, story) + direction.addCondition(condition) + direction.isDiversionReturn = True # will clear the currentDiversion on story + story.logger.info(f"Created direction: {direction.id} {condition.id} with timeout {finalTimeoutDuration}s") + story.add(condition) + story.add(direction) + + + async def finalise(self, story): """" Only used if the Diversion sets the story.currentDiversion """ + story.logger.info("end of diversion") if not self.finaliseMethod: story.logger.info(f"No finalisation for diversion {self.id}") + story.currentDiversion = None return False + await self.finaliseMethod(story) + story.currentDiversion = None return True - async def _divergeIfNoResponse(self, story, msgFrom, msgTo): + async def _divergeIfNoResponse(self, story, msgFrom, msgTo, direction): """ Participant doesn't speak for x consecutive replies (has had timeout) """ @@ -567,6 +627,9 @@ class Diversion(object): self.returnMessage = msgTo + if self.params['returnAfterStrand']: + self.createReturnDirectionsTo(story, msg, msgTo, direction) + await story.setCurrentMessage(msg) story.currentDiversion = self return True @@ -576,10 +639,10 @@ class Diversion(object): async def _returnAfterNoResponse(self, story): story.logger.info(f"Finalise diversion: {self.id}") story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging - if self.params['returnAfterStrand']: - await story.setCurrentMessage(self.returnMessage) +# if self.params['returnAfterStrand']: +# await story.setCurrentMessage(self.returnMessage) - async def _divergeIfReplyContains(self, story, msgFrom, msgTo): + async def _divergeIfReplyContains(self, story, msgFrom, msgTo, direction): """ Participant doesn't speak for x consecutive replies (has had timeout) """ @@ -614,16 +677,20 @@ class Diversion(object): return self.returnMessage = msgTo + + if self.params['returnAfterStrand']: + self.createReturnDirectionsTo(story, msg, msgTo, direction) + await story.setCurrentMessage(msg) story.currentDiversion = self return True async def _returnAfterReplyContains(self, story): story.logger.info(f"Finalise diversion: {self.id}") - if self.params['returnAfterStrand']: - await story.setCurrentMessage(self.returnMessage) +# if self.params['returnAfterStrand']: +# await story.setCurrentMessage(self.returnMessage) - async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo): + async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo, direction): """ Participant asks if message can be repeated. """ @@ -644,7 +711,7 @@ class Diversion(object): await story.setCurrentMessage(msgFrom) return True - async def _divergeIfTimeout(self, story, msgFrom, msgTo): + async def _divergeIfTimeout(self, story, msgFrom, msgTo, direction): """ (1) last spoken at all (2) or duration for this last reply only @@ -716,6 +783,11 @@ class Diversion(object): # blocked alltogether? self.returnMessage = story.getNextChapterForMsg(story.currentMessage, False) or story.currentMessage + + if self.params['returnAfterStrand']: + # no direction is here, as this diversion triggers before a direction is taken + self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False) + await story.setCurrentMessage(msg) story.currentDiversion = self story.timer.setMark('last_diversion_timeout') @@ -723,8 +795,9 @@ class Diversion(object): async def _returnAfterTimeout(self, story): story.logger.info(f"Finalise diversion: {self.id}") - if self.params['returnAfterStrand']: - await story.setCurrentMessage(self.returnMessage) + +# if self.params['returnAfterStrand']: +# await story.setCurrentMessage(self.returnMessage) storyClasses = { @@ -862,6 +935,7 @@ class Story(object): currentId = self.currentMessage.id if self.currentMessage else None self.elements = {} + self.strands = {} self.diversions = [] self.directionsPerMsg = {} self.startMessage = None # The entrypoint to the graph @@ -898,6 +972,9 @@ class Story(object): self.logger.info(f'has variables: {self.variables}') + self.logger.info(f'has {len(self.strands)} strands: {self.strands}') + self.calculateFinishesForStrands() + self.logger.warn("Calculated strands!") def reset(self): self.timer.reset() @@ -939,14 +1016,18 @@ class Story(object): # print(obj) raise Exception("Duplicate id for ''".format(obj.id)) - if type(obj) == Message and obj.isStart: - self.startMessage = obj - self.elements[obj.id] = obj if type(obj) == Diversion: self.diversions.append(obj) + if type(obj) == Message: + if obj.isStart: + #confusingly, isStart is 'beginning' in the story json file + self.startMessage = obj + if obj.isStrandStart: + self.strands[obj.id] = [] + if type(obj) == Direction: if obj.msgFrom.id not in self.directionsPerMsg: self.directionsPerMsg[obj.msgFrom.id] = [] @@ -1005,15 +1086,13 @@ class Story(object): # self.hugvey.google.resume() if self.currentMessage.id not in self.directionsPerMsg: - print(self.currentDiversion) - if self.currentDiversion is not None: - self.logger.info("end of diversion") - await self.currentDiversion.finalise(self) - self.currentDiversion = None - else: - self.logger.info("THE END!") - self._finish() - return +# print(self.currentDiversion) +# if self.currentDiversion is not None: +# await self.currentDiversion.finalise(self) +# else: + self.logger.info("THE END!") + self._finish() + return if e['event'] == 'speech': # participants speaks, reset counter @@ -1076,15 +1155,17 @@ class Story(object): self.currentMessage.setFinished(self.timer.getElapsed()) chosenDirection = direction - isDiverging = await self._processDiversions( - chosenDirection.msgFrom if chosenDirection else None, - chosenDirection.msgTo if chosenDirection else None) + isDiverging = await self._processDiversions(chosenDirection) + if not isDiverging and chosenDirection: + if chosenDirection.isDiversionReturn and self.currentDiversion: + await self.currentDiversion.finalise(self) + await self.setCurrentMessage(chosenDirection.msgTo) return chosenDirection - async def _processDiversions(self, msgFrom, msgTo) -> bool: + async def _processDiversions(self, direction: None) -> bool: """ Process the diversions on stack. If diverging, return True, else False msgFrom and msgTo contain the source and target of a headed direction if given @@ -1092,7 +1173,7 @@ class Story(object): """ diverge = False for diversion in self.diversions: - d = await diversion.divergeIfNeeded(self, msgFrom, msgTo) + d = await diversion.divergeIfNeeded(self, direction) if d: diverge = True return diverge @@ -1281,3 +1362,47 @@ class Story(object): self.stop() self.finish_time = time.time() self.timer.pause() + + def calculateFinishesForMsg(self, msgId, depth = 0): + if not msgId in self.directionsPerMsg or len(self.directionsPerMsg[msgId]) < 1: + # is finish + return [msgId] + + if depth > 40: + return [] + + finishes = [] + for d in self.directionsPerMsg[msgId]: + if d.msgTo.id == msgId: + continue + finishes.extend(self.calculateFinishesForMsg(d.msgTo.id, depth+1)) + + # de-duplicate before returning + return list(set(finishes)) + + def calculateFinishesForStrands(self): + for startMsgId in self.strands: + msg = self.get(startMsgId) #: :type msg: Message + if msg.isStart: + # ignore for the beginning + continue + + self.logger.log(LOG_BS, f"Get finishes for {startMsgId}") + self.strands[startMsgId] = self.calculateFinishesForMsg(startMsgId) + + self.logger.log(LOG_BS, f"Finishes: {self.strands}") + + def getFinishesForMsg(self, msg): + """ + Find the end of strands + + Most often they will be 'start's so to speed up these are pre-calculated + Others can be calculated on the spot + + returns message ids + """ + if msg.id in self.strands: + return self.strands[msg.id] + + return self.calculateFinishesForMsg(msg.id) + \ No newline at end of file From ef7eee80724fd651c79395e42c575ecdc9fe89a2 Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Wed, 1 May 2019 13:08:41 +0200 Subject: [PATCH 2/3] Crash hugvey client on exception so it can be restarted by supervisor --- hugvey/client.py | 23 ++++++++++++++------- hugvey/story.py | 52 +++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/hugvey/client.py b/hugvey/client.py index 9880248..066f406 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -463,10 +463,10 @@ class Hugvey(object): def start(self): logger.debug('Hugvey {}, reporting'.format(self.id)) - loop = asyncio.get_event_loop() + self.loop = asyncio.get_event_loop() self.voice_server = VoiceServer( - loop=loop, + loop=self.loop, hugvey=self, config=self.config ) @@ -483,9 +483,18 @@ class Hugvey(object): logger.info('start') # self.voice_server.asyncStart(loop) # loop.run_until_complete(self.voice_server.start()) - asyncio.ensure_future(self.voice_server.start()) - asyncio.ensure_future(self.cmd_server.command_listener()) - asyncio.ensure_future(self.cmd_server.event_sender()) - asyncio.ensure_future(self.cmd_server.heartbeat()) - loop.run_forever() + asyncio.ensure_future(self.catchException(self.voice_server.start())) + asyncio.ensure_future(self.catchException(self.cmd_server.command_listener())) + asyncio.ensure_future(self.catchException(self.cmd_server.event_sender())) + asyncio.ensure_future(self.catchException(self.cmd_server.heartbeat())) + self.loop.run_forever() logger.info('done') + + async def catchException(self, awaitable): + try: + await awaitable + except Exception as e: + logger.exception(e) + logger.critical("Hugvey quiting") +# self.loop.stop() # not fully quits program for reboot + exit() diff --git a/hugvey/story.py b/hugvey/story.py index 3595f77..355ec50 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -485,6 +485,8 @@ class Diversion(object): self.params = params self.finaliseMethod = None self.hasHit = False + self.disabled = False + self.type = type if type == 'no_response': self.method = self._divergeIfNoResponse self.finaliseMethod = self._returnAfterNoResponse @@ -532,6 +534,7 @@ class Diversion(object): story.logger.warn(f"Invalid message selected for diversion: {self.params['notAfterMsgId']} for {self.id}") elif story.logHasMsg(msg): # story.logger.warn(f"Block diversion {self.id} because of hit message {self.params['notAfterMsgId']}") + self.disabled = True # never run it and allow following timeouts/no_responses to run return False r = await self.method(story, @@ -539,7 +542,10 @@ class Diversion(object): direction.msgTo if direction else None, direction if direction else None) if r: - self.hasHit = True + if self.type != 'repeat': + # repeat diversion should be usable infinte times + self.hasHit = True + story.addToLog(self) return r @@ -616,7 +622,7 @@ class Diversion(object): if story.currentDiversion or not msgFrom or not msgTo: return False - if story.stats['diversions']['no_response'] + 1 == self.params['timesOccured'] and story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']): + if story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']): story.stats['diversions']['no_response'] += 1 msg = story.get(self.params['msgId']) if msg is None: @@ -740,8 +746,6 @@ class Diversion(object): interval = float(self.params['interval']) if not self.params['fromLastMessage']: # (1) last spoken at all - if story.stats['diversions']['timeout_total'] + 1 != self.params['timesOccured']: - return timeSince = story.timer.getElapsed('last_speech') if story.timer.hasMark('last_speech') else story.timer.getElapsed('start') if story.timer.hasMark('last_diversion_timeout') and story.timer.getElapsed('last_diversion_timeout') > timeSince: @@ -755,8 +759,6 @@ class Diversion(object): return # if story.currentMessage.timeoutDiversionCount + 1 - if story.stats['diversions']['timeout_last'] + 1 != self.params['timesOccured']: - return if story.currentReply is not None: # still playing back @@ -974,7 +976,6 @@ class Story(object): self.logger.info(f'has variables: {self.variables}') self.logger.info(f'has {len(self.strands)} strands: {self.strands}') self.calculateFinishesForStrands() - self.logger.warn("Calculated strands!") def reset(self): self.timer.reset() @@ -1172,7 +1173,44 @@ class Story(object): Else, they are None """ diverge = False + + activeDiversions = [] + activeTimeoutDiv = None + activeTimeoutLastDiv = None + activeNoResponseDiv = None for diversion in self.diversions: + #: :type diversion: Diversion + if diversion.disabled or diversion.hasHit: + continue + + if diversion.type == 'timeout': + if diversion.params['timesOccured'] > 0: + if not diversion.params['fromLastMessage']: + # perhaps neater if we collect them in a list, and then sort by key, but this works just as well + if not activeTimeoutDiv or activeTimeoutDiv.params['timesOccured'] > diversion.params['timesOccured']: + activeTimeoutDiv = diversion + else: + if not activeTimeoutLastDiv or activeTimeoutLastDiv.params['timesOccured'] > diversion.params['timesOccured']: + activeTimeoutLastDiv = diversion + continue + + if diversion.type == 'no_response': + if diversion.params['timesOccured'] > 0: + if not activeNoResponseDiv or activeNoResponseDiv.params['timesOccured'] > diversion.params['timesOccured']: + activeNoResponseDiv = diversion + continue + + activeDiversions.append(diversion) + + if activeTimeoutDiv: + activeDiversions.append(activeTimeoutDiv) + if activeTimeoutLastDiv: + activeDiversions.append(activeTimeoutLastDiv) + if activeNoResponseDiv: + activeDiversions.append(activeNoResponseDiv) + + for diversion in activeDiversions: + # TODO: collect diversions and order by times + timesOccured (for timeout & no_response) d = await diversion.divergeIfNeeded(self, direction) if d: diverge = True From b16724990d22e078ed49fe312855fe96f4770e6d Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Wed, 1 May 2019 13:09:13 +0200 Subject: [PATCH 3/3] readme --- README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4616f88..947f964 100644 --- a/README.md +++ b/README.md @@ -192,4 +192,7 @@ Types of conditions: ## Diversions -TODO \ No newline at end of file +TODO + + +times occured/only on n-th instance: determines the order of diversions of the same type (for Timeout and no_response). Starting at 1, as a diversion with value of 0 can occur always \ No newline at end of file