Merge branch 'diversion'

This commit is contained in:
Ruben van de Ven 2019-05-01 13:09:23 +02:00
commit 1550bdeb80
4 changed files with 305 additions and 46 deletions

View file

@ -100,6 +100,14 @@ for i in {1..6}; do rsync -av ~/hugvey/ pi@hugvey$i.local:/home/pi/hugvey/ --exc
for i in {1..6}; do ssh pi@hugvey$i.local "sudo shutdown -h now"; done
```
```bash
for i in {1..6}; do ssh pi@hugvey$i.local "supervisorctl restart hugvey_client"; done
```
### Monitoring the server processes
To make sure it will not die with 'Too many files open'
```bash
lsof -p $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $9}'|sort -rn|uniq -c|sort -rn|head -20
@ -110,3 +118,81 @@ or
```bash
lsof | grep $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $11}'|sort -rn|uniq -c|sort -rn|head -20
```
## Branches
multichannel
: Experiment with using alsa+sox to stream multiple channels to google.
# Create Story
## Messages
Things that can/will be said by Hugvey
Text
: The text that will be said. Or just a description if custom audio is uploaded. Variables can be entered by predending them with a $dollar_sign.
Start
: Mark message as being the start of a strand/tree of messages. Used for eg. diversions and formatting of the editor
Beginning
: There can only be one beginning. This is the message that the Hugvey will start with when starting the story.
Chapter Start
: A chapter can be marked. This is used by the timeout diversions, as it only returns to the next chapter marker after diversion. Furthermore, it is used by diversions to prevent them from triggering if specific sections (chapter markers) have been played.
Audio
: Upload a custom audio file to override the auto generated file
Afterrun time
: _deprecated_ Was the time the microphone kept listening after triggering this message. It was used to have the Hugvey reconsider its direction. Not used anymore
Volume factor
: Parameter send to the `play` command to increase/decrease the playback volume
Tempo factor
: Parameter send to the `play` command to increase/decrease the playback speed
Pitch factor
: Parameter send to the `play` command to increase/decrease the playback pitch (minus values for lower pitch)
Color
: Color the message aids in finding it in the editor window. Also is used by _replycontains_ diversions to prevent from running in specific moments
## Directions
Directions connect messages from one to the other. Can be created in the editor by selecting a message, and shift+click on its follow-up message. Another way is to select a message and press 'create message' from the right bar. This new message will automatically be connected and inherit the same color.
## Condition
Messages only head to a specific following message when one of the direction's conditions is matched (OR-condition). First come, first served.
The Conditions Description field allows for giving some info on the condition's reasons.
Types of conditions:
- timeout: timing finishing the playback of the message's audio
+ seconds: the duration
+ Only if no reply: timeout will be disabled after the person has spoken anything
+ Reply needed: If checked, the timeout is counted when it is met. This counter is used by the consecutive-timeouts diversions.
- replyContains: Match the contents of the speech using a regex. Or just any speech. Used to capture variables.
+ regex: The regex to match on. Variables can be matched using the python syntax to give the variable a name (?P<variable_name>\w+)
+ three consecutive timings can be given:
+ delay reply duration: the duration of the reply since hugvey stopped speaking. If it is more than this value (but less than the larger) it will use the given timing
+ delay wait time: The time to wait after a person speaks. It doesn't wait for Google's `is_finished` parameter, but rather checks from Google's last response. This way, also short utterances sutch as 'hey' or 'ok' are also properly timed, as these often don't get an is_finished by Google.
+ instant match: don't use any timings. the moment the regex matches on the speech in progress, the condition is met.
- variable: returns True if variable is set
+ TODO
- diversion: returns True if diversion has ben taken.
+ TODO
## Diversions
TODO
times occured/only on n-th instance: determines the order of diversions of the same type (for Timeout and no_response). Starting at 1, as a diversion with value of 0 can occur always

View file

@ -110,7 +110,7 @@ class VoiceServer(object):
input_device_name = None
input_card_name = None
print(output_device_name, input_device_name)
logger.debug("Output: {}, Input: {}".format(output_device_name, input_device_name))
return {
@ -463,10 +463,10 @@ class Hugvey(object):
def start(self):
logger.debug('Hugvey {}, reporting'.format(self.id))
loop = asyncio.get_event_loop()
self.loop = asyncio.get_event_loop()
self.voice_server = VoiceServer(
loop=loop,
loop=self.loop,
hugvey=self,
config=self.config
)
@ -483,9 +483,18 @@ class Hugvey(object):
logger.info('start')
# self.voice_server.asyncStart(loop)
# loop.run_until_complete(self.voice_server.start())
asyncio.ensure_future(self.voice_server.start())
asyncio.ensure_future(self.cmd_server.command_listener())
asyncio.ensure_future(self.cmd_server.event_sender())
asyncio.ensure_future(self.cmd_server.heartbeat())
loop.run_forever()
asyncio.ensure_future(self.catchException(self.voice_server.start()))
asyncio.ensure_future(self.catchException(self.cmd_server.command_listener()))
asyncio.ensure_future(self.catchException(self.cmd_server.event_sender()))
asyncio.ensure_future(self.catchException(self.cmd_server.heartbeat()))
self.loop.run_forever()
logger.info('done')
async def catchException(self, awaitable):
try:
await awaitable
except Exception as e:
logger.exception(e)
logger.critical("Hugvey quiting")
# self.loop.stop() # not fully quits program for reboot
exit()

View file

@ -37,9 +37,10 @@ class Recorder:
self.currentTranscription = ""
self.currentLog = []
t = time.strftime("%Y%m%d-%H:%M:%S")
day = time.strftime("%Y%m%d")
t = time.strftime("%H:%M:%S")
self.out_folder = os.path.join(self.main_folder, f"{self.hv_id}", t)
self.out_folder = os.path.join(self.main_folder, day, f"{self.hv_id}", t)
if not os.path.exists(self.out_folder):
self.logger.debug(f"Create directory {self.out_folder}")
self.target_folder = os.makedirs(self.out_folder, exist_ok=True)

View file

@ -14,7 +14,6 @@ from zmq.asyncio import Context
import zmq
import wave
from pythonosc import udp_client
from builtins import isinstance
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("narrative")
@ -43,6 +42,7 @@ class Message(object):
self.id = id
self.text = text
self.isStart = False
self.isStrandStart = False
self.chapterStart = False
self.reply = None
# self.replyTime = None
@ -66,6 +66,7 @@ class Message(object):
def initFromJson(message, data, story):
msg = message(data['@id'], data['text'])
msg.isStart = data['beginning'] if 'beginning' in data else False
msg.isStrandStart = data['start'] if 'start' in data else False
msg.chapterStart = bool(data['chapterStart']) if 'chapterStart' in data else False
msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0.
msg.color = data['color'] if 'color' in data else None
@ -447,6 +448,7 @@ class Direction(object):
#: :type self.conditions: list(Condition)
self.conditions = []
self.conditionMet = None
self.isDiversionReturn = False
def addCondition(self, condition: Condition):
self.conditions.append(condition)
@ -483,6 +485,8 @@ class Diversion(object):
self.params = params
self.finaliseMethod = None
self.hasHit = False
self.disabled = False
self.type = type
if type == 'no_response':
self.method = self._divergeIfNoResponse
self.finaliseMethod = self._returnAfterNoResponse
@ -517,7 +521,7 @@ class Diversion(object):
'id': self.id,
}
async def divergeIfNeeded(self, story, msgFrom, msgTo):
async def divergeIfNeeded(self, story, direction: None):
"""
Validate if condition is met for the current story state
Returns True when diverging
@ -530,25 +534,87 @@ class Diversion(object):
story.logger.warn(f"Invalid message selected for diversion: {self.params['notAfterMsgId']} for {self.id}")
elif story.logHasMsg(msg):
# story.logger.warn(f"Block diversion {self.id} because of hit message {self.params['notAfterMsgId']}")
self.disabled = True # never run it and allow following timeouts/no_responses to run
return False
r = await self.method(story, msgFrom, msgTo)
r = await self.method(story,
direction.msgFrom if direction else None,
direction.msgTo if direction else None,
direction if direction else None)
if r:
if self.type != 'repeat':
# repeat diversion should be usable infinte times
self.hasHit = True
story.addToLog(self)
return r
def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True):
"""
The finishes of this diversion's strand should point to the return message
with the right timeout/timing. If hit, this direction should also notify
this diversion.
"""
finishMessageIds = story.getFinishesForMsg(startMsg)
finalTimeoutDuration = 0.5
#: :type story: Story
#: :type originalDirection: Direction
# story.directionsPerMsg[story.currentMessage.id]
# take the timeouts that are on the current message, and apply it to our return
# as to have somewhat equal pace as to where we originate from
if inheritTiming:
for originalDirection in story.getCurrentDirections():
# if originalDirection:
for condition in originalDirection.conditions:
if condition.type == 'timeout':
finalTimeoutDuration = float(condition.vars['seconds'])
break
i = 0
for msgId in finishMessageIds:
# Some very ugly hack to add a direction & condition
i+=1
msg = story.get(msgId)
if not msg:
continue
direction = Direction(f"{self.id}-{i}", msg, returnMsg)
data = json.loads(f"""
{{
"@id": "{self.id}-c{i}",
"@type": "Condition",
"type": "timeout",
"label": "",
"vars": {{
"seconds": "{finalTimeoutDuration}",
"onlyIfNoReply": false
}}
}}
""")
condition = Condition.initFromJson(data, story)
direction.addCondition(condition)
direction.isDiversionReturn = True # will clear the currentDiversion on story
story.logger.info(f"Created direction: {direction.id} {condition.id} with timeout {finalTimeoutDuration}s")
story.add(condition)
story.add(direction)
async def finalise(self, story):
""""
Only used if the Diversion sets the story.currentDiversion
"""
story.logger.info("end of diversion")
if not self.finaliseMethod:
story.logger.info(f"No finalisation for diversion {self.id}")
story.currentDiversion = None
return False
await self.finaliseMethod(story)
story.currentDiversion = None
return True
async def _divergeIfNoResponse(self, story, msgFrom, msgTo):
async def _divergeIfNoResponse(self, story, msgFrom, msgTo, direction):
"""
Participant doesn't speak for x consecutive replies (has had timeout)
"""
@ -556,7 +622,7 @@ class Diversion(object):
if story.currentDiversion or not msgFrom or not msgTo:
return False
if story.stats['diversions']['no_response'] + 1 == self.params['timesOccured'] and story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']):
if story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']):
story.stats['diversions']['no_response'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
@ -567,6 +633,9 @@ class Diversion(object):
self.returnMessage = msgTo
if self.params['returnAfterStrand']:
self.createReturnDirectionsTo(story, msg, msgTo, direction)
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
@ -576,10 +645,10 @@ class Diversion(object):
async def _returnAfterNoResponse(self, story):
story.logger.info(f"Finalise diversion: {self.id}")
story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging
if self.params['returnAfterStrand']:
await story.setCurrentMessage(self.returnMessage)
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage)
async def _divergeIfReplyContains(self, story, msgFrom, msgTo):
async def _divergeIfReplyContains(self, story, msgFrom, msgTo, direction):
"""
Participant doesn't speak for x consecutive replies (has had timeout)
"""
@ -614,16 +683,20 @@ class Diversion(object):
return
self.returnMessage = msgTo
if self.params['returnAfterStrand']:
self.createReturnDirectionsTo(story, msg, msgTo, direction)
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
async def _returnAfterReplyContains(self, story):
story.logger.info(f"Finalise diversion: {self.id}")
if self.params['returnAfterStrand']:
await story.setCurrentMessage(self.returnMessage)
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage)
async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo):
async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo, direction):
"""
Participant asks if message can be repeated.
"""
@ -644,7 +717,7 @@ class Diversion(object):
await story.setCurrentMessage(msgFrom)
return True
async def _divergeIfTimeout(self, story, msgFrom, msgTo):
async def _divergeIfTimeout(self, story, msgFrom, msgTo, direction):
"""
(1) last spoken at all
(2) or duration for this last reply only
@ -673,8 +746,6 @@ class Diversion(object):
interval = float(self.params['interval'])
if not self.params['fromLastMessage']:
# (1) last spoken at all
if story.stats['diversions']['timeout_total'] + 1 != self.params['timesOccured']:
return
timeSince = story.timer.getElapsed('last_speech') if story.timer.hasMark('last_speech') else story.timer.getElapsed('start')
if story.timer.hasMark('last_diversion_timeout') and story.timer.getElapsed('last_diversion_timeout') > timeSince:
@ -688,8 +759,6 @@ class Diversion(object):
return
# if story.currentMessage.timeoutDiversionCount + 1
if story.stats['diversions']['timeout_last'] + 1 != self.params['timesOccured']:
return
if story.currentReply is not None:
# still playing back
@ -716,6 +785,11 @@ class Diversion(object):
# blocked alltogether?
self.returnMessage = story.getNextChapterForMsg(story.currentMessage, False) or story.currentMessage
if self.params['returnAfterStrand']:
# no direction is here, as this diversion triggers before a direction is taken
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False)
await story.setCurrentMessage(msg)
story.currentDiversion = self
story.timer.setMark('last_diversion_timeout')
@ -723,8 +797,9 @@ class Diversion(object):
async def _returnAfterTimeout(self, story):
story.logger.info(f"Finalise diversion: {self.id}")
if self.params['returnAfterStrand']:
await story.setCurrentMessage(self.returnMessage)
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage)
storyClasses = {
@ -862,6 +937,7 @@ class Story(object):
currentId = self.currentMessage.id if self.currentMessage else None
self.elements = {}
self.strands = {}
self.diversions = []
self.directionsPerMsg = {}
self.startMessage = None # The entrypoint to the graph
@ -898,6 +974,8 @@ class Story(object):
self.logger.info(f'has variables: {self.variables}')
self.logger.info(f'has {len(self.strands)} strands: {self.strands}')
self.calculateFinishesForStrands()
def reset(self):
self.timer.reset()
@ -939,14 +1017,18 @@ class Story(object):
# print(obj)
raise Exception("Duplicate id for ''".format(obj.id))
if type(obj) == Message and obj.isStart:
self.startMessage = obj
self.elements[obj.id] = obj
if type(obj) == Diversion:
self.diversions.append(obj)
if type(obj) == Message:
if obj.isStart:
#confusingly, isStart is 'beginning' in the story json file
self.startMessage = obj
if obj.isStrandStart:
self.strands[obj.id] = []
if type(obj) == Direction:
if obj.msgFrom.id not in self.directionsPerMsg:
self.directionsPerMsg[obj.msgFrom.id] = []
@ -1005,12 +1087,10 @@ class Story(object):
# self.hugvey.google.resume()
if self.currentMessage.id not in self.directionsPerMsg:
print(self.currentDiversion)
if self.currentDiversion is not None:
self.logger.info("end of diversion")
await self.currentDiversion.finalise(self)
self.currentDiversion = None
else:
# print(self.currentDiversion)
# if self.currentDiversion is not None:
# await self.currentDiversion.finalise(self)
# else:
self.logger.info("THE END!")
self._finish()
return
@ -1076,23 +1156,62 @@ class Story(object):
self.currentMessage.setFinished(self.timer.getElapsed())
chosenDirection = direction
isDiverging = await self._processDiversions(
chosenDirection.msgFrom if chosenDirection else None,
chosenDirection.msgTo if chosenDirection else None)
isDiverging = await self._processDiversions(chosenDirection)
if not isDiverging and chosenDirection:
if chosenDirection.isDiversionReturn and self.currentDiversion:
await self.currentDiversion.finalise(self)
await self.setCurrentMessage(chosenDirection.msgTo)
return chosenDirection
async def _processDiversions(self, msgFrom, msgTo) -> bool:
async def _processDiversions(self, direction: None) -> bool:
"""
Process the diversions on stack. If diverging, return True, else False
msgFrom and msgTo contain the source and target of a headed direction if given
Else, they are None
"""
diverge = False
activeDiversions = []
activeTimeoutDiv = None
activeTimeoutLastDiv = None
activeNoResponseDiv = None
for diversion in self.diversions:
d = await diversion.divergeIfNeeded(self, msgFrom, msgTo)
#: :type diversion: Diversion
if diversion.disabled or diversion.hasHit:
continue
if diversion.type == 'timeout':
if diversion.params['timesOccured'] > 0:
if not diversion.params['fromLastMessage']:
# perhaps neater if we collect them in a list, and then sort by key, but this works just as well
if not activeTimeoutDiv or activeTimeoutDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeTimeoutDiv = diversion
else:
if not activeTimeoutLastDiv or activeTimeoutLastDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeTimeoutLastDiv = diversion
continue
if diversion.type == 'no_response':
if diversion.params['timesOccured'] > 0:
if not activeNoResponseDiv or activeNoResponseDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeNoResponseDiv = diversion
continue
activeDiversions.append(diversion)
if activeTimeoutDiv:
activeDiversions.append(activeTimeoutDiv)
if activeTimeoutLastDiv:
activeDiversions.append(activeTimeoutLastDiv)
if activeNoResponseDiv:
activeDiversions.append(activeNoResponseDiv)
for diversion in activeDiversions:
# TODO: collect diversions and order by times + timesOccured (for timeout & no_response)
d = await diversion.divergeIfNeeded(self, direction)
if d:
diverge = True
return diverge
@ -1281,3 +1400,47 @@ class Story(object):
self.stop()
self.finish_time = time.time()
self.timer.pause()
def calculateFinishesForMsg(self, msgId, depth = 0):
if not msgId in self.directionsPerMsg or len(self.directionsPerMsg[msgId]) < 1:
# is finish
return [msgId]
if depth > 40:
return []
finishes = []
for d in self.directionsPerMsg[msgId]:
if d.msgTo.id == msgId:
continue
finishes.extend(self.calculateFinishesForMsg(d.msgTo.id, depth+1))
# de-duplicate before returning
return list(set(finishes))
def calculateFinishesForStrands(self):
for startMsgId in self.strands:
msg = self.get(startMsgId) #: :type msg: Message
if msg.isStart:
# ignore for the beginning
continue
self.logger.log(LOG_BS, f"Get finishes for {startMsgId}")
self.strands[startMsgId] = self.calculateFinishesForMsg(startMsgId)
self.logger.log(LOG_BS, f"Finishes: {self.strands}")
def getFinishesForMsg(self, msg):
"""
Find the end of strands
Most often they will be 'start's so to speed up these are pre-calculated
Others can be calculated on the spot
returns message ids
"""
if msg.id in self.strands:
return self.strands[msg.id]
return self.calculateFinishesForMsg(msg.id)