import json
import time
import logging
import re
import asyncio
import urllib.parse
from .communication import LOG_BS
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
import uuid
import shortuuid
import threading
import faulthandler
from zmq.asyncio import Context
import zmq
import wave
from pythonosc import udp_client
import random
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("narrative")
class Utterance(object):
"""Part of a reply"""
def __init__(self, startTime):
self.startTime = startTime
self.endTime = None
self.text = ""
self.lastUpdateTime = startTime
def setText(self, text, now):
self.text = text
self.lastUpdateTime = now
def setFinished(self, endTime):
self.endTime = endTime
def isFinished(self):
return self.endTime is not None
class Message(object):
def __init__(self, id, text): = id
self.text = text
self.isStart = False
self.isStrandStart = False
self.chapterStart = False
self.reply = None
# self.replyTime = None
self.audioFile= None
self.filenameFetchLock = asyncio.Lock()
self.interruptCount = 0
self.timeoutDiversionCount = 0
self.afterrunTime = 0. # the time after this message to allow for interrupts
self.finishTime = None # message can be finished without finished utterance (with instant replycontains)
self.params = {}
self.variableValues = {}
self.uuid = None # Have a unique id each time the message is played back.
self.color = None
def setStory(self, story):
self.story = story
self.logger = story.logger.getChild("message")
def initFromJson(message, data, story):
msg = message(data['@id'], data['text'])
msg.isStart = data['beginning'] if 'beginning' in data else False
msg.isStrandStart = data['start'] if 'start' in data else False
msg.chapterStart = bool(data['chapterStart']) if 'chapterStart' in data else False
msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0.
msg.color = data['color'] if 'color' in data else None
if 'audio' in data and data['audio'] is not None:
msg.audioFile = data['audio']['file']
if 'params' in data:
msg.params = data['params']
if not 'vol' in msg.params:
# prevent clipping on some Lyrebird tracks
msg.params['vol'] = .8
return msg
def parseForVariables(self):
Find variables in text
self.variables = re.findall('\$(\w+)', self.text)
for var in self.variables:
self.variableValues[var] = None
def hasVariables(self) -> bool:
return len(self.variables) > 0
def setVariable(self, name, value):
if name not in self.variables:
self.logger.critical("Set nonexisting variable")
if self.variableValues[name] == value:
self.variableValues[name] = value
self.logger.warn(f"Set variable, fetch {name}")
if not None in self.variableValues.values():
self.logger.warn(f"now fetch {name}")
# asyncio.get_event_loop().call_soon_threadsafe(self.getAudioFilePath)
self.logger.warn(f"started {name}")
def getText(self):
# sort reverse to avoid replacing the wrong variable
self.variables.sort(key=len, reverse=True)
text = self.text
# self.logger.debug(f"Getting text for {}")
for var in self.variables:
self.logger.debug(f"try replacing ${var} with {self.variableValues[var]} in {text}")
replacement = self.variableValues[var] if (self.variableValues[var] is not None) else "nothing" #TODO: translate nothing to each language
text = text.replace('$'+var, replacement)
return text
def setReply(self, reply):
self.reply = reply
def hasReply(self):
return self.reply is not None
def getReply(self):
if not self.hasReply():
raise Exception(
"Getting reply while there is none! {0}".format(
return self.reply
def isFinished(self):
return self.finishTime is not None
def setFinished(self, currentTime):
self.finishTime = currentTime
def getFinishedTime(self):
return self.finishTime
def getParams(self):
return self.params
def getLogSummary(self):
return {
'time': None if self.reply is None else [u.startTime for u in self.reply.utterances],
'text': self.getText(),
'replyText': None if self.reply is None else [u.text for u in self.reply.utterances]
async def getAudioFilePath(self):
if self.audioFile is not None:
return self.audioFile
text = self.getText()
self.logger.debug(f"Fetching audio for {text}")
# return "test";
async with self.filenameFetchLock:
# print(threading.enumerate())
info = {
'text': text,
'variable': True if self.hasVariables() else False
s = Context.instance().socket(zmq.REQ) #: :type s: zmq.sugar.Socket
voiceAddr = f"ipc://voice{}"
await s.send_json(info)
filename = await s.recv_string()
# print(threading.enumerate())
self.logger.debug(f"Fetched audio for {text}: {filename}")
return filename
class Reply(object):
def __init__(self, message: Message):
self.forMessage = None
self.utterances = []
def setForMessage(self, message: Message):
self.forMessage = message
def getLastUtterance(self) -> Utterance:
if not self.hasUtterances():
return None
u = self.utterances[-1] #: :type u: Utterance
# attempt to fix a glitch that google does not always send is_finished
if u.isFinished():
return u
now = self.forMessage.story.timer.getElapsed()
diff = now - u.lastUpdateTime
if diff > 5: # time in seconds to force silence in utterance
# useful for eg. 'hello', or 'no'
f"Set finish time for utterance after {diff}s {u.text}"
return u
def getFirstUtterance(self) -> Utterance:
if not self.hasUtterances():
return None
return self.utterances[0]
def hasUtterances(self) -> bool:
return len(self.utterances) > 0
def addUtterance(self, utterance: Utterance):
def getText(self) -> str:
return ". ".join([u.text for u in self.utterances])
def getActiveUtterance(self, currentTime) -> Utterance:
If no utterance is active, create a new one. Otherwise return non-finished utterance for update
if len(self.utterances) < 1 or self.getLastUtterance().isFinished():
u = Utterance(currentTime)
u = self.getLastUtterance()
return u
def isSpeaking(self):
u = self.getLastUtterance()
if u is not None and not u.isFinished():
return True
return False
def getTimeSinceLastUtterance(self):
if not self.hasUtterances():
return None
return self.forMessage.story.timer.getElapsed() - self.getLastUtterance().lastUpdateTime
class Condition(object):
A condition, basic conditions are built in, custom condition can be given by
providing a custom method.
def __init__(self, id): = id
self.method = None
self.type = None
self.vars = {}
self.logInfo = None
self.originalJsonString = None
self.usedContainsDuration = None
def initFromJson(conditionClass, data, story):
condition = conditionClass(data['@id'])
condition.type = data['type']
condition.originalJsonString = json.dumps(data)
# TODO: should Condition be subclassed?
if data['type'] == "replyContains":
condition.method = condition._hasMetReplyContains
if data['type'] == "timeout":
condition.method = condition._hasMetTimeout
if data['type'] == "variable":
condition.method = condition._hasVariable
if data['type'] == "diversion":
condition.method = condition._hasDiverged
if 'vars' in data:
condition.vars = data['vars']
return condition
def isMet(self, story):
Validate if condition is met for the current story state
return self.method(story)
def _hasMetTimeout(self, story):
now = story.timer.getElapsed()
# check if the message already finished playing
if not story.lastMsgFinishTime:
return False
if 'onlyIfNoReply' in self.vars and self.vars['onlyIfNoReply']:
if story.currentReply and story.currentReply is not None and story.currentReply.hasUtterances():
story.logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}')
# 'onlyIfNoReply': only use this timeout if participants doesn't speak.
return False
# else:
# story.logger.debug('Only if no reply has no text yet!')
hasMetTimeout = now - story.lastMsgFinishTime >= float(self.vars['seconds'])
if not hasMetTimeout:
return False
# update stats:
story.stats['timeouts'] +=1
if 'needsReply' in self.vars and self.vars['needsReply'] is True:
if story.currentReply is None or not story.currentReply.hasUtterances():
story.stats['silentTimeouts'] +=1
story.stats['consecutiveSilentTimeouts'] += 1
self.logInfo = "{}s".format(self.vars['seconds'])
return True
def _hasVariable(self, story) -> bool:
if not story.lastMsgFinishTime:
return False
r = story.hasVariableSet(self.vars['variable'])
if r:
story.logger.debug(f"Variable {self.vars['variable']} is set.")
if 'notSet' in self.vars and self.vars['notSet']:
# inverse:
r = not r
self.logInfo = "Does {} have variable {}".format(
'not' if 'notSet' in self.vars and self.vars['notSet'] else '',
return r
def _hasDiverged(self, story) -> bool:
if not story.lastMsgFinishTime:
return False
d = story.get(self.vars['diversionId'])
if not d:
story.logger.critical(f"Condition on non-existing diversion: {self.vars['diversionId']}")
r = d.hasHit
if r:
story.logger.debug(f"Diversion {self.vars['diversionId']} has been hit.")
if 'inverseMatch' in self.vars and self.vars['inverseMatch']:
# inverse:
r = not r
self.logInfo = "Has {} diverged to {}".format(
'not' if 'inverseMatch' in self.vars and self.vars['inverseMatch'] else '',
return r
def _hasMetReplyContains(self, story) -> bool:
Check the reply for specific characteristics:
- regex: regular expression. If empy, just way for isFinished()
- delays: an array of [{'minReplyDuration', 'waitTime'},...]
- minReplyDuration: the nr of seconds the reply should take. Preferably have one with 0
- waitTime: the time to wait after isFinished() before continuing
r = story.currentReply # make sure we keep working with the same object
if not r or not r.hasUtterances():
return False
capturedVariables = None
if 'regex' in self.vars and len(self.vars['regex']):
if 'regexCompiled' not in self.vars:
# Compile once, as we probably run it more than once
self.vars['regexCompiled'] = re.compile(self.vars['regex'])
t = r.getText().lower()
story.logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t))
result = self.vars['regexCompiled'].search(t)
if result is None:
#if there is something to match, but not found, it's never ok
return False
story.logger.debug('Got match on {}'.format(self.vars['regex']))
capturedVariables = result.groupdict()
if ('instantMatch' in self.vars and self.vars['instantMatch']) or not r.isSpeaking():
# try to avoid setting variables for intermediate strings
for captureGroup in capturedVariables:
story.setVariableValue(captureGroup, capturedVariables[captureGroup])
if 'instantMatch' in self.vars and self.vars['instantMatch']:"Instant match on {self.vars['regex']}, {self.vars}")
self.logInfo = "Instant match of {}, captured {}".format(
return True
# TODO: implement 'instant match' -> don't wait for isFinished()
# print(self.vars)
# either there's a match, or nothing to match at all
if 'delays' in self.vars:
if story.lastMsgFinishTime is None:
story.logger.debug("not finished playback yet")
return False
# time between finishing playback and ending of speaking:
replyDuration = r.getLastUtterance().lastUpdateTime - story.lastMsgFinishTime # using lastUpdateTime instead of endTime
delays = sorted(self.vars['delays'], key=lambda k: float(k['minReplyDuration']), reverse=True)
for delay in delays:
if replyDuration > float(delay['minReplyDuration']):
timeSinceReply = r.getTimeSinceLastUtterance()
story.logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {delay['waitTime']}")
if timeSinceReply > float(delay['waitTime']):
# if variables are captured, only set them the moment the condition matches
if capturedVariables is not None:
for captureGroup in capturedVariables:
story.setVariableValue(captureGroup, capturedVariables[captureGroup])
self.logInfo = "Match of {}, captured {} after, {}".format(
self.usedContainsDuration = float(delay['waitTime'])
return True
break # don't check other delays
# wait for delay to match
story.logger.log(LOG_BS, "Wait for it...")
return False
# If there is a delay, it takes precedence of isSpeaking, since google does not always give an is_finished on short utterances (eg. "hello" or "no")
if r.isSpeaking():
story.logger.log(LOG_BS, f"is speaking: {r.getLastUtterance().text} - {r.getLastUtterance().startTime}")
return False
# There is a match and no delay say, person finished speaking. Go ahead sir!
self.logInfo = "Match"
return True
def getLogSummary(self):
return {
class Direction(object):
A condition based edge in the story graph
def __init__(self, id, msgFrom: Message, msgTo: Message): = id
self.msgFrom = msgFrom
self.msgTo = msgTo
#: :type self.conditions: list(Condition)
self.conditions = []
self.conditionMet = None
self.isDiversionReturn = False
def addCondition(self, condition: Condition):
def setMetCondition(self, condition: Condition):
self.conditionMet = condition
def initFromJson(direction, data, story):
msgFrom = story.get(data['source'])
msgTo = story.get(data['target'])
direction = direction(data['@id'], msgFrom, msgTo)
if 'conditions' in data:
for conditionId in data['conditions']:
c = story.get(conditionId)
return direction
def getLogSummary(self):
return {
'condition': if self.conditionMet else None
class Diversion(object):
An Diversion. Used to catch events outside of story flow.
Not sure why I'm not using subclasses here o:)
def __init__(self, id, type: str, params: dict): = id
self.params = params
self.finaliseMethod = None
self.hasHit = False
self.disabled = False
self.type = type
self.counter = 0
if type == 'no_response':
self.method = self._divergeIfNoResponse
self.finaliseMethod = self._returnAfterNoResponse
self.counter = 0
if type == 'reply_contains':
self.method = self._divergeIfReplyContains
self.finaliseMethod = self._returnAfterReplyContains
if len(self.params['regex']) > 0:
self.regex = re.compile(self.params['regex'])
self.regex = None
if type == 'timeout':
self.method = self._divergeIfTimeout
self.finaliseMethod = self._returnAfterTimeout
if type == 'repeat':
self.method = self._divergeIfRepeatRequest
self.regex = re.compile(self.params['regex'])
if type == 'interrupt':
self.method = self._divergeIfInterrupted
if not self.method:
raise Exception("No valid type given for diversion")
def initFromJson(diversionClass, data, story):
diversion = diversionClass(data['@id'], data['type'], data['params'])
return diversion
def getLogSummary(self):
return {
async def divergeIfNeeded(self, story, direction = None):
Validate if condition is met for the current story state
Returns True when diverging
# For all diversion except repeat (which simply doesn't have the variable)
if 'notAfterMsgId' in self.params and self.params['notAfterMsgId']:
msg = story.get(self.params['notAfterMsgId'])
if msg is None:
story.logger.warn(f"Invalid message selected for diversion: {self.params['notAfterMsgId']} for {}")
elif story.logHasMsg(msg):
# story.logger.warn(f"Block diversion {} because of hit message {self.params['notAfterMsgId']}")
self.disabled = True # never run it and allow following timeouts/no_responses to run
return False
r = await self.method(story,
direction.msgFrom if direction else None,
direction.msgTo if direction else None,
direction if direction else None)
if r:
if self.type != 'repeat' and self.type !='interrupt':
# repeat diversion should be usable infinte times
self.hasHit = True
return r
def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True, timeoutDuration = .5, replyContainsDurations = None):
The finishes of this diversion's strand should point to the return message
with the right timeout/timing. If hit, this direction should also notify
this diversion.
replyContainsDurations: list formatted as in JSON
"minReplyDuration": "0",
"waitTime": "3"
self.counter +=1
finishMessageIds = story.getFinishesForMsg(startMsg)
finalTimeoutDuration = timeoutDuration
finalContainsDurations = replyContainsDurations
#: :type story: Story
#: :type originalDirection: Direction
# story.directionsPerMsg[]
# take the timeouts that are on the current message, and apply it to our return
# as to have somewhat equal pace as to where we originate from
if inheritTiming:
for originalDirection in story.getCurrentDirections():
# if originalDirection:
for condition in originalDirection.conditions:
if condition.type == 'timeout':
finalTimeoutDuration = float(condition.vars['seconds'])
if condition.type == 'replyContains':
finalContainsDurations = json.loads(condition.originalJsonString)['vars']['delays']
i = 0
for msgId in finishMessageIds:
# Some very ugly hack to add a direction & condition
msg = story.get(msgId)
if not msg:
direction = Direction(f"{}-{i}-{self.counter}", msg, returnMsg)
data = json.loads(f"""
"@id": "{}-ct{i}-{self.counter}",
"@type": "Condition",
"type": "timeout",
"label": "Autogenerated Timeout",
"vars": {{
"seconds": "{finalTimeoutDuration}"
data['vars']['onlyIfNoReply'] = finalContainsDurations is not None
# TODO: also at replycontains if it exists, with the same timings
condition = Condition.initFromJson(data, story)
if finalContainsDurations is not None:
data2 = json.loads(f"""
"@id": "{}-cr{i}-{self.counter}",
"@type": "Condition",
"type": "replyContains",
"label": "Autogenerated Reply Contains",
"vars": {{
"regex": "",
"instantMatch": false
}} }}
data2['vars']['delays'] = finalContainsDurations
condition2 = Condition.initFromJson(data2, story)
direction.isDiversionReturn = True # will clear the currentDiversion on story"Created direction: {} {} with timeout {finalTimeoutDuration}s")
async def finalise(self, story):
Only used if the Diversion sets the story.currentDiversion
""""end of diversion")
if not self.finaliseMethod:"No finalisation for diversion {}")
story.currentDiversion = None
return False
await self.finaliseMethod(story)
story.currentDiversion = None
return True
async def _divergeIfNoResponse(self, story, msgFrom, msgTo, direction):
Participant doesn't speak for x consecutive replies (has had timeout)
':type story: Story'
if story.currentDiversion or not msgFrom or not msgTo:
return False
if story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']):
story.stats['diversions']['no_response'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
return"Diverge: No response {} {story.stats}")
self.returnMessage = msgTo
if self.params['returnAfterStrand']:
self.createReturnDirectionsTo(story, msg, msgTo, direction)
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
async def _returnAfterNoResponse(self, story):"Finalise diversion: {}")
story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage)
async def _divergeIfReplyContains(self, story, msgFrom, msgTo, _):
Participant doesn't speak for x consecutive replies (has had timeout)
':type story: Story'
# use story.currentReply.getTimeSinceLastUtterance() > 2
if story.currentDiversion: # or not msgFrom or not msgTo:
# don't do nested diversions
return False
if self.hasHit:
# don't match twice
if story.currentReply is None or not self.regex:
direction = story.getDefaultDirectionForMsg(story.currentMessage)
if not direction:
# ignore the direction argument, and only check if the current message has a valid default
msgTo = direction.msgTo
if not direction:
waitTime = 1.8 if 'waitTime' not in self.params else float(self.params['waitTime'])
timeSince = story.currentReply.getTimeSinceLastUtterance()
if timeSince < waitTime:
story.logger.log(LOG_BS, f"Waiting for replyContains: {timeSince} (needs {waitTime})")
r =
if r is None:
if 'notForColor' in self.params and self.params['notForColor'] and story.currentMessage.color:
if story.currentMessage.color.lower() == self.params['notForColor'].lower():
story.logger.debug(f"Skip diversion {} because of section color")
return"Diverge: reply contains {}")
story.stats['diversions']['reply_contains'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
# TODO: pick the direction with timeout as next Message.
self.returnMessage = msgTo
if self.params['returnAfterStrand']:
self.createReturnDirectionsTo(story, msg, msgTo, direction)
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
async def _returnAfterReplyContains(self, story):"Finalise diversion: {}")
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage)
async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo, direction):
Participant asks if message can be repeated.
# if not msgFrom or not msgTo:
# return
# TODO: how to handle this now we sometimes use different timings.
# Perhaps set isFinished when matching condition.
if story.currentReply is None or story.currentReply.getTimeSinceLastUtterance() > 1:
r =
print('repeat?', r)
if r is None:
return"Diverge: request repeat {}")
story.stats['diversions']['repeat'] += 1
await story.setCurrentMessage(story.currentMessage)
return True
async def _divergeIfTimeout(self, story, msgFrom, msgTo, direction):
(1) last spoken at all
(2) or duration for this last reply only
if story.currentDiversion:
if msgFrom or msgTo:
# not applicable a direction has been chosen
if not story.lastMsgFinishTime:
# not during play back
# not applicable when timeout is set
directions = story.getCurrentDirections()
for direction in directions:
for condition in direction.conditions:
if condition.type == 'timeout':
now = story.timer.getElapsed()
if now - story.lastMsgFinishTime < float(self.params['minTimeAfterMessage']):
# not less than x sec after it
interval = float(self.params['interval'])
if not self.params['fromLastMessage']:
# (1) last spoken at all
timeSince = story.timer.getElapsed('last_speech') if story.timer.hasMark('last_speech') else story.timer.getElapsed('start')
if story.timer.hasMark('last_diversion_timeout') and story.timer.getElapsed('last_diversion_timeout') > timeSince:
timeSince = story.timer.getElapsed('last_diversion_timeout')
if timeSince < interval:
story.stats['diversions']['timeout_total'] += 1
if story.currentMessage is None:
# if story.currentMessage.timeoutDiversionCount + 1
if story.currentReply is not None:
# still playing back
# or somebody has spoken already (timeout only works on silences)
if now - story.lastMsgFinishTime < interval:
story.currentMessage.timeoutDiversionCount += 1
story.stats['diversions']['timeout_last'] += 1
# if we're still here, there's a match!"Diverge: Timeout {} of {self.params['interval']}")
story.stats['diversions']['timeout'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
# fall back to the currentMessage to return on.
# TODO: maybe, if not chapter is found, the diversion should be
# blocked alltogether?
self.returnMessage = story.getNextChapterForMsg(story.currentMessage, False) or story.currentMessage
if self.params['returnAfterStrand']:
# no direction is here, as this diversion triggers before a direction is taken
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False)
await story.setCurrentMessage(msg, allowReplyInterrupt=True)
story.currentDiversion = self
return True
async def _returnAfterTimeout(self, story):"Finalise diversion: {}")
async def _divergeIfInterrupted(self, story, msgFrom, msgTo, direction):
This is here as a placeholder for the interruption diversion.
These will however be triggered differently
if story.currentDiversion or story.allowReplyInterrupt:
return False
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
self.returnMessage = story.currentMessage
# no direction is here, as this diversion triggers before a direction is taken
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False, timeoutDuration=3, replyContainsDurations = [{
"minReplyDuration": "0",
"waitTime": "2"
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
storyClasses = {
'Msg': Message,
'Direction': Direction,
'Condition': Condition,
'Diversion': Diversion,
class Stopwatch(object):
Keep track of elapsed time. Use multiple markers, but a single pause/resume button
def __init__(self):
self.isRunning = asyncio.Event()
def getElapsed(self, since_mark='start'):
t = time.time()
if self.paused_at != 0:
pause_duration = t - self.paused_at
pause_duration = 0
return t - self.marks[since_mark] - pause_duration
def pause(self):
self.paused_at = time.time()
def resume(self):
if self.paused_at == 0:
pause_duration = time.time() - self.paused_at
for m in self.marks:
self.marks[m] += pause_duration
self.paused_at = 0
def reset(self):
self.marks = {}
self.paused_at = 0
def setMark(self, name):
self.marks[name] = time.time()
def hasMark(self, name):
return name in self.marks
def clearMark(self, name):
if name in self.marks:
class Story(object):
"""Story represents and manages a story/narrative flow"""
# TODO should we separate 'narrative' (the graph) from the story (the
# current user flow)
def __init__(self, hugvey_state, panopticon_port):
super(Story, self).__init__()
self.hugvey = hugvey_state
self.panopticon_port = panopticon_port = [] # queue of received events
self.commands = [] # queue of commands to send
self.log = [] # all nodes/elements that are triggered
self.msgLog = [] # hit messages
self.logger = mainLogger.getChild(f"{}").getChild("story")
self.currentMessage = None
self.currentDiversion = None
self.currentReply = None
self.allowReplyInterrupt = False
self.timer = Stopwatch()
self.isRunning = False
self.diversions = []
self.interruptionDiversions = []
self.variables = {}
def pause(self):
self.logger.debug('pause hugvey')
def resume(self):
self.logger.debug('resume hugvey')
def getLogSummary(self):
summary = {
# e[0]: the entity, e[1]: the logged time
'messages': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Message)],
'directions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Direction)],
'diversions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Diversion)],
# print(self.log)
return summary
def getLogCounts(self):
return {
'messages': len([1 for e in self.log if isinstance(e[0], Message)]),
'diversions': len([1 for e in self.log if isinstance(e[0], Diversion)]),
def registerVariable(self, variableName, message):
if variableName not in self.variables:
self.variables[variableName] = [message]
def setVariableValue(self, name, value):
if name not in self.variables:
self.logger.warn(f"Set variable that is not needed in the story: {name}")
self.logger.debug(f"Set variable {name} to {value}")
self.variableValues[name] = value
if name not in self.variables:
for message in self.variables[name]:
message.setVariable(name, value)
def hasVariableSet(self, name) -> bool:
return name in self.variableValues and self.variableValues is not None
def setStoryData(self, story_data):
Parse into a working story engine
""" = story_data
# keep to be able to reset it in the end
currentId = if self.currentMessage else None
self.elements = {}
self.strands = {}
self.diversions = []
self.interruptionDiversions = []
self.directionsPerMsg = {}
self.startMessage = None # The entrypoint to the graph
self.variables = {}
for el in
className = storyClasses[el['@type']]
obj = className.initFromJson(el, self)
except Exception as e:
self.logger.critical(f"Error loading story element: {el}")
raise e
# self.logger.debug(self.elements)
# self.logger.debug(self.directionsPerMsg)
self.diversions = [el for el in self.elements.values() if type(el) == Diversion]
self.interruptionDiversions = [el for el in self.elements.values() if type(el) == Diversion and el.type == 'interrupt']
if currentId:
self.currentMessage = self.get(currentId)
if self.currentMessage:
f"Reinstantiated current message: {}")
"Could not reinstatiate current message. Starting over")
# Register variables
for msg in self.getMessages():
# print(, msg.hasVariables())
if not msg.hasVariables():
for var in msg.variables:
self.registerVariable(var, msg)'has variables: {self.variables}')'has {len(self.strands)} strands: {self.strands}')
def reset(self):
# self.startTime = time.time()
# currently active message, determines active listeners etc.
self.currentMessage = None
self.currentDiversion = None
self.lastMsgTime = None
self.lastSpeechStartTime = None
self.lastSpeechEndTime = None
self.variableValues = {} # captured variables from replies
self.finish_time = False = [] # queue of received events
self.commands = [] # queue of commands to send
self.log = [] # all nodes/elements that are triggered
self.msgLog = []
self.currentReply = None
self.stats = {
'timeouts': 0,
'silentTimeouts': 0,
'consecutiveSilentTimeouts': 0,
'diversions': {
'no_response': 0,
'repeat': 0,
'reply_contains': 0,
'timeout': 0,
'timeout_total': 0,
'timeout_last': 0
for msg in self.getMessages():
def add(self, obj):
if in self.elements:
# print(obj)
raise Exception("Duplicate id for ''".format(
self.elements[] = obj
if type(obj) == Diversion:
if type(obj) == Message:
if obj.isStart:
#confusingly, isStart is 'beginning' in the story json file
self.startMessage = obj
if obj.isStrandStart:
self.strands[] = []
if type(obj) == Direction:
if not in self.directionsPerMsg:
self.directionsPerMsg[] = []
def get(self, id):
Get a story element by its id
if id in self.elements:
return self.elements[id]
return None
def getMessages(self):
return [el for el in self.elements.values() if type(el) == Message]
def stop(self):"Stop Story")
if self.isRunning:
self.isRunning = False
def shutdown(self):
self.hugvey = None
async def _processPendingEvents(self):
# Gather events:
nr = len(
for i in range(nr):
e =
self.logger.debug("handle '{}'".format(e))
if e['event'] == "exit":
if e['event'] == 'connect':
# a client connected. Should only happen in the beginning or in case of error
# that is, until we have a 'reset' or 'start' event.
# reinitiate current message
await self.setCurrentMessage(self.currentMessage)
if e['event'] == "playbackStart":
if e['msgId'] !=
self.lastMsgStartTime = self.timer.getElapsed()
self.logger.debug("Start playback")
if e['event'] == "playbackFinish":
if e['msgId'] ==
#TODO: migrate value to Messagage instead of Story
self.lastMsgFinishTime = self.timer.getElapsed()"message: {} {self.currentMessage.uuid} done")
# 2019-02-22 temporary disable listening while playing audio:
# if is not None:
# self.logger.warn("Temporary 'fix' -> resume recording?")
if not in self.directionsPerMsg:
# print(self.currentDiversion)
# if self.currentDiversion is not None:
# await self.currentDiversion.finalise(self)
# else:"THE END!")
if e['event'] == 'speech':
# participants speaks, reset counter
self.stats['consecutiveSilentTimeouts'] = 0
# if self.currentMessage and not self.lastMsgStartTime:
if self.currentMessage and not self.lastMsgFinishTime:
# Ignore incoming speech events until we receive a 'playbackStart' event.
# After that moment the mic will be muted, so nothing should come in _anyway_
# unless google is really slow on us. But by taking the start time we don't ignore
# messages that come in, in the case google is faster than our playbackFinish event.
# (if this setup doesn't work, try to test on self.lastMsgFinish time anyway)
# it keeps tricky with all these run conditions
# if len(self.interruptionDiversions) and not self.currentDiversion and not self.allowReplyInterrupt:
# self.logger.warn("diverge when speech during playing message")
# diversion = random.choice(self.interruptionDiversions)
# #: :type diversion: Diversion
# r = await diversion.divergeIfNeeded(self, None)
# print(r) # is always needed :-)
# else:"ignore speech during playing message")
# log if somebody starts speaking
if self.currentReply is None:"Start speaking")
self.currentReply= Reply(self.currentMessage)
now = self.timer.getElapsed()
utterance = self.currentReply.getActiveUtterance(now)
utterance.setText(e['transcript'], now)"speaking: content {} \"{}\"".format(id(utterance), e['transcript']))
if e['is_final']:
utterance.setFinished(self.timer.getElapsed())"speaking: stop {}".format(id(utterance)))
if self.hugvey.recorder:
async def _processDirections(self, directions):
':type directions: list(Direction)'
chosenDirection = None
metCondition = None
for direction in directions:
for condition in direction.conditions:
if condition.isMet(self):"Condition is met: {0}, going to {1}".format(,"condition: {0}".format("direction: {0}".format(
metCondition = condition
chosenDirection = direction
isDiverging = await self._processDiversions(chosenDirection)
allowReplyInterrupt = False
# in some cases, conditions should be allowed to interrupt the reply
if metCondition:
if metCondition.type == 'timeout' and not ('onlyIfNoReply' in metCondition.vars and metCondition.vars['onlyIfNoReply']):
allowReplyInterrupt = True
if metCondition.usedContainsDuration is not None and metCondition.usedContainsDuration < 0.1:
allowReplyInterrupt = True
if not isDiverging and chosenDirection:
if chosenDirection.isDiversionReturn and self.currentDiversion:
await self.currentDiversion.finalise(self)
await self.setCurrentMessage(chosenDirection.msgTo, allowReplyInterrupt=allowReplyInterrupt)
return chosenDirection
async def _processDiversions(self, direction: None) -> bool:
Process the diversions on stack. If diverging, return True, else False
msgFrom and msgTo contain the source and target of a headed direction if given
Else, they are None
diverge = False
activeDiversions = []
activeTimeoutDiv = None
activeTimeoutLastDiv = None
activeNoResponseDiv = None
for diversion in self.diversions:
#: :type diversion: Diversion
if diversion.disabled or diversion.hasHit or diversion.type == 'interrupt':
# interruptions are triggered somewhere else.
if diversion.type == 'timeout':
if diversion.params['timesOccured'] > 0:
if not diversion.params['fromLastMessage']:
# perhaps neater if we collect them in a list, and then sort by key, but this works just as well
if not activeTimeoutDiv or activeTimeoutDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeTimeoutDiv = diversion
if not activeTimeoutLastDiv or activeTimeoutLastDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeTimeoutLastDiv = diversion
if diversion.type == 'no_response':
if diversion.params['timesOccured'] > 0:
if not activeNoResponseDiv or activeNoResponseDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeNoResponseDiv = diversion
if activeTimeoutDiv:
if activeTimeoutLastDiv:
if activeNoResponseDiv:
for diversion in activeDiversions:
# TODO: collect diversions and order by times + timesOccured (for timeout & no_response)
d = await diversion.divergeIfNeeded(self, direction)
if d:
diverge = True
return diverge
def addToLog(self, node):
self.log.append((node, self.timer.getElapsed()))
if isinstance(node, Message):
if self.hugvey.recorder:
if isinstance(node, Message):
self.hugvey.recorder.log('hugvey', node.text,
if isinstance(node, Diversion):
if isinstance(node, Condition):
def logHasMsg(self, node):
return node in self.msgLog
async def _renderer(self):
every 1/10 sec. determine what needs to be done based on the current story state
loopDuration = 0.1 # Configure fps
lastTime = time.time()
self.logger.debug("Start renderer")
while self.isRunning:
if self.isRunning is False:
# pause on timer paused
await self.timer.isRunning.wait() # wait for un-pause
for i in range(len(
await self._processPendingEvents()
# Test stability of Central Command with deliberate crash
# if self.timer.getElapsed() > 5:
# raise Exception('test')
# The finish is not here anymore, but only on the playbackFinish event.
directions = self.getCurrentDirections()
await self._processDirections(directions)
# TODO create timer event
# self.commands.append({'msg':'TEST!'})
# wait for next iteration to avoid too high CPU
t = time.time()
await asyncio.sleep(max(0, loopDuration - (t - lastTime)))
lastTime = t
self.logger.debug("Stop renderer")
async def setCurrentMessage(self, message, useReply = None, allowReplyInterrupt = False):
Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption.
if self.currentMessage and not self.lastMsgFinishTime:"Interrupt playback {}".format("interrupt")
# message is playing
'action': 'stop',
message.uuid = shortuuid.uuid()
self.currentMessage = message
self.lastMsgTime = time.time()
self.lastMsgFinishTime = None # to be filled in by the event
self.lastMsgStartTime = None # to be filled in by the event
self.allowReplyInterrupt = allowReplyInterrupt
# if not reset:
self.previousReply = self.currentReply # we can use this for interrptions
self.currentReply = useReply #self.currentMessage.reply
# send command to already mute mic
'action': 'prepare',
# else:
# # if we press 'save & play', it should not remember it's last reply to that msg
# self.previousReply = self.currentReply # we can use this for interrptions
# self.currentReply = self.currentMessage.reply"Current message: ({0}) \"{1}\"".format(, message.getText()))
self.addToLog(message)"message: {} {message.uuid} start \"{message.getText()}\"")
# TODO: prep events & timer etc.
fn = await message.getAudioFilePath()
# get duration of audio file, so the client can detect a hang of 'play'
with,'r') as fp:
frames = fp.getnframes()
rate = fp.getframerate()
duration = frames/float(rate)
# # pause STT to avoid text events while decision is made
'action': 'play',
'file': fn,
'params': message.getParams(),
'duration': duration
# 2019-02-22 temporary disable listening while playing audio:
# if is not None:
# self.logger.warn("Temporary 'fix' -> stop recording")
logmsg = "Pending directions:"
for direction in self.getCurrentDirections():
conditions = [ for c in direction.conditions]
logmsg += "\n- {0} -> {1} (when: {2}) ".format(,, conditions)
def getCurrentDirections(self):
if not in self.directionsPerMsg:
return []
return self.directionsPerMsg[]
def getNextChapterForMsg(self, msg, canIncludeSelf = True, depth = 0):
if canIncludeSelf and msg.chapterStart:"Next chapter: {}")
return msg
if depth >= 70:
# protection against infinite loop?
return None
if not in self.directionsPerMsg:
return None
for direction in self.directionsPerMsg[]:
r = self.getNextChapterForMsg(direction.msgTo, True, depth+1)
if r:
return r
# none found
return None
async def run(self, customStartMsgId = None):"Starting story")"story: start")
self.isRunning = True
if customStartMsgId is not None:
startMsg = self.get(customStartMsgId)
startMsg = self.startMessage
await self.setCurrentMessage(startMsg)
await self._renderer()
def isFinished(self):
if hasattr(self, 'finish_time') and self.finish_time:
return time.time() - self.finish_time
return False
def _finish(self):
Finish story and set hugvey to the right state
#stop google etc:
def finish(self):
Finish only the story
""""Finished story for {}")"story: finished")
self.finish_time = time.time()
def calculateFinishesForMsg(self, msgId, depth = 0):
if not msgId in self.directionsPerMsg or len(self.directionsPerMsg[msgId]) < 1:
# is finish
return [msgId]
if depth > 40:
return []
finishes = []
for d in self.directionsPerMsg[msgId]:
if == msgId:
finishes.extend(self.calculateFinishesForMsg(, depth+1))
# de-duplicate before returning
return list(set(finishes))
def calculateFinishesForStrands(self):
for startMsgId in self.strands:
msg = self.get(startMsgId) #: :type msg: Message
if msg.isStart:
# ignore for the beginning
self.logger.log(LOG_BS, f"Get finishes for {startMsgId}")
self.strands[startMsgId] = self.calculateFinishesForMsg(startMsgId)
self.logger.log(LOG_BS, f"Finishes: {self.strands}")
def getFinishesForMsg(self, msg):
Find the end of strands
Most often they will be 'start's so to speed up these are pre-calculated
Others can be calculated on the spot
returns message ids
if in self.strands:
return self.strands[]
return self.calculateFinishesForMsg(
def getDefaultDirectionForMsg(self, msg):
There is only a default direction (for reply contains diversion) if it has
one, and only one, direction to go. If there's more, it should do nothing.
if not in self.directionsPerMsg:
# is finish
return None
if len(self.directionsPerMsg[]) > 1:
return None
# TODO: should the direction have at least a timeout condition set, or not perse?
return self.directionsPerMsg[][0]