hugvey/hugvey/story.py

2231 lines
82 KiB
Python

import asyncio
import json
import logging
import os
import pickle
import random
import re
import threading
import time
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
import traceback
import urllib.parse
import uuid
import wave
import zmq
from zmq.asyncio import Context
from pythonosc import udp_client
import shortuuid
import sox
from .communication import LOG_BS
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("narrative")
class Utterance(object):
"""Part of a reply"""
def __init__(self, startTime):
self.startTime = startTime
self.endTime = None
self.text = ""
self.lastUpdateTime = startTime
def setText(self, text, now):
self.text = text.lower() # always lowercase
self.lastUpdateTime = now
def hasText(self):
return len(self.text) > 0
def setFinished(self, endTime):
self.endTime = endTime
def isFinished(self):
return self.endTime is not None
def __getstate__(self):
# print(f'get utterance {self}')
state = self.__dict__.copy()
return state
class Message(object):
def __init__(self, id, text):
self.id = id
self.text = text
self.label = None
self.isStart = False
self.isStrandStart = False
self.chapterStart = False
self.reply = None
# self.replyTime = None
self.audioFile= None
self.filenameFetchLock = asyncio.Lock()
self.interruptCount = 0
self.timeoutDiversionCount = 0
self.afterrunTime = 0. # the time after this message to allow for interrupts
self.finishTime = None # message can be finished without finished utterance (with instant replycontains)
self.params = {}
self.variableValues = {}
self.parseForVariables()
self.uuid = None # Have a unique id each time the message is played back.
self.color = None
self.lightChange = None
self.didRepeat = False
self.fileError = False
# Used by diversions, autogenerated directions should link to next chapter mark instead of the given msgTo
self.generatedDirectionsJumpToChapter = False
# Used by diversions, no return directions should be autogenerated, so this message becomes an ending
self.dontGenerateDirections = False
def __getstate__(self):
# Copy the object's state from self.__dict__ which contains
# all our instance attributes. Always use the dict.copy()
# method to avoid modifying the original state.
# print(f'get msg {self.id}')
state = self.__dict__.copy()
# Remove the unpicklable entries.
del state['filenameFetchLock']
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.filenameFetchLock = asyncio.Lock()
def setStory(self, story):
self.story = story
self.logger = story.logger.getChild("message")
@classmethod
def initFromJson(message, data, story):
msg = message(data['@id'], data['text'])
msg.isStart = data['beginning'] if 'beginning' in data else False
msg.isStrandStart = data['start'] if 'start' in data else False
msg.chapterStart = bool(data['chapterStart']) if 'chapterStart' in data else False
msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0. # TODO: investigate deprecation?
msg.color = data['color'] if 'color' in data else None
msg.label = data['label'] if 'label' in data else None
if 'audio' in data and data['audio'] is not None:
msg.audioFile = data['audio']['file']
msg.setStory(story)
if 'params' in data:
msg.params = data['params']
if not 'vol' in msg.params:
# prevent clipping on some Lyrebird tracks
msg.params['vol'] = .8
msg.lightChange = data['light'] if 'light' in data else None
msg.generatedDirectionsJumpToChapter = bool(data['generatedDirectionsJumpToChapter']) if 'generatedDirectionsJumpToChapter' in data else False
msg.dontGenerateDirections = bool(data['dontGenerateDirections']) if 'dontGenerateDirections' in data else False
msg.params['vol'] = float(msg.params['vol'])
return msg
def parseForVariables(self):
"""
Find variables in text
"""
self.variables = re.findall('\$(\w+)', self.text)
for var in self.variables:
self.variableValues[var] = None
def hasVariables(self) -> bool:
return len(self.variables) > 0
def setVariable(self, name, value):
if name not in self.variables:
self.logger.critical("Set nonexisting variable")
return
if self.variableValues[name] == value:
return
self.variableValues[name] = value
self.logger.warn(f"Set variable, fetch {name}")
# disable prefetching of audio files, to avoid a bunch of calls at once to the voice api's
# Other possiblity would be to wrap getAudioFilePath() into a method that delays execution incrementally
# with an asyncio.sleep(i)
# if not None in self.variableValues.values():
# self.logger.warn(f"now fetch {name} for {self.id}")
# asyncio.get_event_loop().create_task(self.getAudioFilePath())
def getVariableValue(self, var):
return self.variableValues[var] if (self.variableValues[var] is not None) else self.story.configuration.nothing_text #TODO: translate nothing to each language
def getText(self):
# sort reverse to avoid replacing the wrong variable
self.variables.sort(key=len, reverse=True)
text = self.text
# self.logger.debug(f"Getting text for {self.id}")
for var in self.variables:
self.logger.debug(f"try replacing ${var} with {self.variableValues[var]} in {text}")
replacement = self.getVariableValue(var)
text = text.replace('$'+var, replacement)
return text
def getLabel(self):
"""
When a label is set, return that, else the original text
"""
if self.label and len(self.label):
return self.label
return self.getText()
def getTextLabel(self):
"""
A combination of getText and getLabel for maximum verbosity
"""
l = f" ({self.label})" if self.label and len(self.label) else ""
return f"{self.getText()}{l}"
def setReply(self, reply):
self.reply = reply
def hasReply(self):
return self.reply is not None
def getReply(self):
if not self.hasReply():
raise Exception(
"Getting reply while there is none! {0}".format(self.id))
return self.reply
def isFinished(self):
return self.finishTime is not None
def setFinished(self, currentTime):
self.finishTime = currentTime
def getFinishedTime(self):
return self.finishTime
def getParams(self):
return self.params
def getLogSummary(self):
return {
'id': self.id,
'time': None if self.reply is None else [u.startTime for u in self.reply.utterances],
'text': self.getTextLabel(),
'replyText': None if self.reply is None else [u.text for u in self.reply.utterances]
}
async def getAudioFilePath(self):
if self.audioFile is not None:
return self.audioFile
text = self.getText()
textlabel = self.getTextLabel()
self.logger.debug(f"Fetching audio for {textlabel}")
# return "test";
async with self.filenameFetchLock:
# print(threading.enumerate())
info = {
'text': text,
'variable': True if self.hasVariables() else False
}
s = Context.instance().socket(zmq.REQ) #: :type s: zmq.sugar.Socket
voiceAddr = f"ipc://voice{self.story.hugvey.id}"
s.connect(voiceAddr)
await s.send_json(info)
filename = await s.recv_string()
s.close()
# TODO: should this go trough the event Queue? risking a too long delay though
if filename == 'local/crash.wav' or len(filename) < 1:
self.logger.warning("Noting crash")
self.fileError = True
# print(threading.enumerate())
self.logger.debug(f"Fetched audio for {textlabel}: {filename}")
return filename
class Reply(object):
def __init__(self, message: Message):
self.forMessage = None
self.utterances = []
self.setForMessage(message)
def __getstate__(self):
# print(f'get reply {self}')
state = self.__dict__.copy()
return state
def setForMessage(self, message: Message):
self.forMessage = message
message.setReply(self)
def getLastUtterance(self) -> Utterance:
if not self.hasUtterances():
return None
u = self.utterances[-1] #: :type u: Utterance
# attempt to fix a glitch that google does not always send is_finished
if u.isFinished():
return u
now = self.forMessage.story.timer.getElapsed()
diff = now - u.lastUpdateTime
if diff > 5: # time in seconds to force silence in utterance
# useful for eg. 'hello', or 'no'
self.forMessage.story.logger.warn(
f"Set finish time for utterance after {diff}s {u.text}"
)
u.setFinished(now)
return u
def getFirstUtterance(self) -> Utterance:
if not self.hasUtterances():
return None
return self.utterances[0]
def hasUtterances(self) -> bool:
return len(self.utterances) > 0
def addUtterance(self, utterance: Utterance):
self.utterances.append(utterance)
def getText(self) -> str:
# for some reason utterances are sometimes empty.Strip these out
utterances = filter(None, [u.text.strip() for u in self.utterances])
return ". ".join(utterances).strip()
def getActiveUtterance(self, currentTime) -> Utterance:
"""
If no utterance is active, create a new one. Otherwise return non-finished utterance for update
"""
if len(self.utterances) < 1 or self.getLastUtterance().isFinished():
u = Utterance(currentTime)
self.addUtterance(u)
else:
u = self.getLastUtterance()
return u
def isSpeaking(self):
u = self.getLastUtterance()
if u is not None and not u.isFinished():
return True
return False
def getTimeSinceLastUtterance(self):
if not self.hasUtterances():
return None
return self.forMessage.story.timer.getElapsed() - self.getLastUtterance().lastUpdateTime
class Condition(object):
"""
A condition, basic conditions are built in, custom condition can be given by
providing a custom method.
"""
def __init__(self, id):
self.id = id
self.method = None
self.type = None
self.vars = {}
self.logInfo = None
self.originalJsonString = None
self.usedContainsDuration = None
def __getstate__(self):
# print(f'get condition {self.id}')
state = self.__dict__.copy()
return state
@classmethod
def initFromJson(conditionClass, data, story):
condition = conditionClass(data['@id'])
condition.type = data['type']
condition.originalJsonString = json.dumps(data)
#: :type condition: Condition
# TODO: should Condition be subclassed?
if data['type'] == "replyContains":
condition.method = condition._hasMetReplyContains
if data['type'] == "timeout":
condition.method = condition._hasMetTimeout
if data['type'] == "variable":
condition.method = condition._hasVariable
if data['type'] == "diversion":
condition.method = condition._hasDiverged
if data['type'] == "audioError":
condition.method = condition._hasAudioError
if data['type'] == "messagePlayed":
condition.method = condition._hasPlayed
if data['type'] == "variableEquals":
condition.method = condition._variableEquals
if data['type'] == "loop_time":
condition.method = condition._hasTimer
if data['type'] == "variable_storage":
condition.method = condition._hasVariableStorage
condition.hasRan = False
if 'vars' in data:
condition.vars = data['vars']
if 'regex' in condition.vars:
condition.vars['regex'] = condition.vars['regex'].rstrip()
return condition
def isMet(self, story):
"""
Validate if condition is met for the current story state
"""
try:
r = self.method(story)
except Exception as e:
story.logger.critical("Exception condition: {self.id}, ignoring it")
story.logger.exception(e)
r = False
return r
def _hasMetTimeout(self, story):
now = story.timer.getElapsed()
# check if the message already finished playing
if not story.lastMsgFinishTime:
return False
if 'onlyIfNoReply' in self.vars and self.vars['onlyIfNoReply']:
if story.currentReply and story.currentReply is not None and story.currentReply.hasUtterances():
story.logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}')
# 'onlyIfNoReply': only use this timeout if participants doesn't speak.
return False
# else:
# story.logger.debug('Only if no reply has no text yet!')
hasMetTimeout = now - story.lastMsgFinishTime >= story.applyTimeFactor(self.vars['seconds'])
if not hasMetTimeout:
return False
# update stats:
story.stats['timeouts'] +=1
if 'needsReply' in self.vars and self.vars['needsReply'] is True:
if story.currentReply is None or not story.currentReply.hasUtterances():
story.stats['silentTimeouts'] +=1
story.stats['consecutiveSilentTimeouts'] += 1
self.logInfo = "{}s".format(self.vars['seconds'])
return True
def _hasVariable(self, story) -> bool:
if not story.lastMsgFinishTime:
return False
r = story.hasVariableSet(self.vars['variable'])
if r:
story.logger.debug(f"Variable {self.vars['variable']} is set.")
if 'notSet' in self.vars and self.vars['notSet']:
# inverse:
r = not r
self.logInfo = "Does {} have variable {}".format(
'not' if 'notSet' in self.vars and self.vars['notSet'] else '',
self.vars['variable']
)
return r
def _hasTimer(self, story) -> bool:
if not story.lastMsgFinishTime:
return False
loopTime = story.hugvey.command.timer.getElapsed() % 3600
ltTime = int(self.vars['less_than'])
gtTime = int(self.vars['more_than'])
if not ltTime and not gtTime:
# ignore invalid times
return
elif not gtTime and loopTime < ltTime:
r = True
elif not ltTime and loopTime > gtTime:
r = True
elif loopTime < ltTime and loopTime > gtTime:
r = True
else:
r = False
if 'inverseMatch' in self.vars and self.vars['inverseMatch']:
r = not r
self.logInfo = "Looptime is {} {} < {} < {}".format(
'' if r else 'not',
f'{gtTime}' if gtTime else '-',
loopTime,
f'{ltTime}' if ltTime else '-',
)
return r
def _variableEquals(self, story) -> bool:
v1 = story.variableValues[self.vars['variable1']] if story.hasVariableSet(self.vars['variable1']) else None
v2 = story.variableValues[self.vars['variable2']] if story.hasVariableSet(self.vars['variable2']) else None
if v1:
story.logger.debug(f"Variable {self.vars['variable1']}: {v1}")
if v2:
story.logger.debug(f"Variable {self.vars['variable2']}: {v2}")
if 'notEq' in self.vars and self.vars['notEq']:
# inverse:
r = (v1 != v2)
else:
r = (v1 == v2)
story.logger.info("'{}' {} '{}' ({})".format(v1, '==' if v1 == v2 else '!=', v2, r))
return r
def _hasDiverged(self, story) -> bool:
if not story.lastMsgFinishTime:
return False
d = story.get(self.vars['diversionId'])
if not d:
story.logger.warning(f"Condition on non-existing diversion: {self.vars['diversionId']}")
r = d.hasHit
if r:
story.logger.debug(f"Diversion {self.vars['diversionId']} has been hit.")
if 'inverseMatch' in self.vars and self.vars['inverseMatch']:
# inverse:
r = not r
self.logInfo = "Has {} diverged to {}".format(
'not' if 'inverseMatch' in self.vars and self.vars['inverseMatch'] else '',
self.vars['diversionId']
)
return r
def _hasAudioError(self, story) -> bool:
if not story.currentMessage or not story.currentMessage.fileError:
return False
self.logInfo = f"Has error loading audio file for {story.currentMessage.id}"
return True
def _hasPlayed(self, story) -> bool:
if not story.lastMsgFinishTime:
return False
msgId = self.vars['msgId'].strip()
msg = story.get(msgId)
if not msg:
if not self.logInfo:
# show error only once
story.logger.warning(f"Condition on non-existing message: {msgId}")
# assigning false to r, keeps 'inverseMatch' working, even when msgId is wrong
r = False
else:
#: :type msg: Message
r = msg.isFinished()
if r:
story.logger.debug(f"Msg {msgId} has been played.")
if 'inverseMatch' in self.vars and self.vars['inverseMatch']:
# inverse:
r = not r
self.logInfo = "Has {} played msg {}".format(
'not' if 'inverseMatch' in self.vars and self.vars['inverseMatch'] else '',
msgId
)
return r
def _hasVariableStorage(self, story) -> bool:
if not story.lastMsgFinishTime:
return False
if self.hasRan:
# Prevent multiple runs of the same query within eg. waiting for a timeout.
return False
number = int(self.vars['number'])
unique = bool(self.vars['unique']) if 'unique' in self.vars else False
varValues = story.hugvey.command.variableStore.getLastOfName(self.vars['var_name'], story.language_code, number, unique)
self.hasRan = True
if len(varValues) < number:
story.logger.warn(f"{self.id}: Too few instances of {self.vars['var_name']}, only {len(varValues)} in store")
return False
for i in range(number):
story.setVariableValue(
f"stored_{self.vars['var_name']}_{i+1}",
varValues[i],
store=False
)
return True
def _hasMetReplyContains(self, story) -> bool:
"""
Check the reply for specific characteristics:
- regex: regular expression. If empy, just way for isFinished()
- delays: an array of [{'minReplyDuration', 'waitTime'},...]
- minReplyDuration: the nr of seconds the reply should take. Preferably have one with 0
- waitTime: the time to wait after isFinished() before continuing
"""
r = story.currentReply # make sure we keep working with the same object
if not r or not r.hasUtterances():
return False
capturedVariables = None
t = None
result = None
if 'regex' in self.vars and len(self.vars['regex']):
if 'regexCompiled' not in self.vars:
# Compile once, as we probably run it more than once
self.vars['regexCompiled'] = re.compile(self.vars['regex'])
t = r.getText().lower()
story.logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t))
result = self.vars['regexCompiled'].search(t)
if result is None:
#if there is something to match, but not found, it's never ok
return False
story.logger.debug('Got match on {}'.format(self.vars['regex']))
capturedVariables = result.groupdict()
if 'instantMatch' in self.vars and self.vars['instantMatch']:
story.logger.info(f"Instant match on {self.vars['regex']}, {self.vars}")
self.logInfo = "Instant match of {}, captured {}".format(
self.vars['regex'],
capturedVariables
)
# Set variables only when direction returns true
if capturedVariables is not None:
for captureGroup in capturedVariables:
story.setVariableValue(captureGroup, capturedVariables[captureGroup])
return True
# TODO: implement 'instant match' -> don't wait for isFinished()
# print(self.vars)
# either there's a match, or nothing to match at all
if 'delays' in self.vars:
if story.lastMsgFinishTime is None:
story.logger.debug("not finished playback yet")
return False
# time between finishing playback and ending of speaking:
replyDuration = r.getLastUtterance().lastUpdateTime - story.lastMsgFinishTime # using lastUpdateTime instead of endTime
delays = sorted(self.vars['delays'], key=lambda k: float(k['minReplyDuration']), reverse=True)
for delay in delays:
if replyDuration > float(delay['minReplyDuration']):
timeSinceReply = r.getTimeSinceLastUtterance()
waitTime = story.applyTimeDelay(story.applyTimeFactor(delay['waitTime']))
story.logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {waitTime}")
if timeSinceReply > waitTime:
# if variables are captured, only set them the moment the condition matches
if capturedVariables is not None:
for captureGroup in capturedVariables:
story.setVariableValue(captureGroup, capturedVariables[captureGroup])
# self.logInfo = "Match of {}, captured {} after, {}".format(
# self.vars['regex'],
# capturedVariables,
# timeSinceReply
# )
matchestxt = "" if not result else result.group()
self.logInfo = f"{self.id} - search \"{self.vars['regex']}\" on \"{t}\" matches \"{matchestxt}\", captured {capturedVariables} - after {timeSinceReply}s"
self.usedContainsDuration = waitTime
return True
break # don't check other delays
# wait for delay to match
story.logger.log(LOG_BS, "Wait for it...")
return False
# If there is a delay, it takes precedence of isSpeaking, since google does not always give an is_finished on short utterances (eg. "hello" or "no")
if r.isSpeaking():
story.logger.log(LOG_BS, f"is speaking: {r.getLastUtterance().text} - {r.getLastUtterance().startTime}")
return False
# There is a match and no delay say, person finished speaking. Go ahead sir!
self.logInfo = "Match"
return True
def getLogSummary(self):
return {
'id': self.id
}
class Direction(object):
"""
A condition based edge in the story graph
"""
def __init__(self, id, msgFrom: Message, msgTo: Message):
self.id = id
self.msgFrom = msgFrom
self.msgTo = msgTo
#: :type self.conditions: list(Condition)
self.conditions = []
self.conditionMet = None
self.isDiversionReturn = False
self.diversionHasReturned = False # for isDiversionReturn.
def __getstate__(self):
# print(f'get direction {self.id}')
state = self.__dict__.copy()
return state
def addCondition(self, condition: Condition):
self.conditions.append(condition)
def setMetCondition(self, condition: Condition):
self.conditionMet = condition
@classmethod
def initFromJson(direction, data, story):
msgFrom = story.get(data['source'])
msgTo = story.get(data['target'])
direction = direction(data['@id'], msgFrom, msgTo)
if 'conditions' in data:
for conditionId in data['conditions']:
c = story.get(conditionId)
direction.addCondition(c)
return direction
def getLogSummary(self):
return {
'id': self.id,
'condition': self.conditionMet.id if self.conditionMet else None
}
class Diversion(object):
"""
An Diversion. Used to catch events outside of story flow.
Not sure why I'm not using subclasses here o:)
"""
def __init__(self, id, type: str, params: dict):
self.id = id
self.params = params
self.finaliseMethod = None
self.hasHit = False
self.disabled = False
self.type = type
self.counter = 0
self.logInfo = self.id # default info is merely ID
if type == 'no_response':
self.method = self._divergeIfNoResponse
self.finaliseMethod = self._returnAfterNoResponse
self.counter = 0
if type == 'reply_contains':
self.method = self._divergeIfReplyContains
self.finaliseMethod = self._returnAfterReplyContains
if len(self.params['regex']) > 0:
self.regex = re.compile(self.params['regex'].rstrip())
else:
self.regex = None
if type == 'timeout':
self.method = self._divergeIfTimeout
self.finaliseMethod = self._returnAfterTimeout
if type == 'collective_moment':
self.method = self._divergeIfCollectiveMoment
if type == 'repeat':
self.method = self._divergeIfRepeatRequest
self.regex = re.compile(self.params['regex'].rstrip())
if type == 'interrupt':
self.method = self._divergeIfInterrupted
if not self.method:
raise Exception("No valid type given for diversion")
def __getstate__(self):
# print(f'get diversion {self.id}')
state = self.__dict__.copy()
return state
@classmethod
def initFromJson(diversionClass, data, story):
diversion = diversionClass(data['@id'], data['type'], data['params'])
return diversion
def getLogSummary(self):
return {
'id': self.id,
}
async def divergeIfNeeded(self, story, direction = None):
"""
Validate if condition is met for the current story state
Returns True when diverging
"""
# For all diversion except repeat (which simply doesn't have the variable)
if 'notAfterMsgId' in self.params and self.params['notAfterMsgId']:
msg = story.get(self.params['notAfterMsgId'])
if msg is None:
story.logger.warn(f"Invalid message selected for diversion: {self.params['notAfterMsgId']} for {self.id}")
elif story.logHasMsg(msg):
# story.logger.warn(f"Block diversion {self.id} because of hit message {self.params['notAfterMsgId']}")
self.disabled = True # never run it and allow following timeouts/no_responses to run
return False
try:
r = await self.method(story,
direction.msgFrom if direction else None,
direction.msgTo if direction else None,
direction if direction else None)
if r:
if self.type != 'repeat' and self.type !='interrupt':
# repeat diversion should be usable infinte times
self.hasHit = True
story.addToLog(self)
story.hugvey.eventLogger.info(f"diverge {self.id}")
except Exception as e:
story.logger.critical("Exception when attempting diversion")
story.logger.exception(e)
return False
return r
def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True, timeoutDuration = .5, replyContainsDurations = None):
"""
The finishes of this diversion's strand should point to the return message
with the right timeout/timing. If hit, this direction should also notify
this diversion.
replyContainsDurations: list formatted as in JSON
[{
"minReplyDuration": "0",
"waitTime": "3"
}]
"""
self.counter +=1
story.logger.info(f"Creating return directions for {startMsg.id}")
finishMessageIds = story.getFinishesForMsg(startMsg)
finalTimeoutDuration = timeoutDuration
finalContainsDurations = replyContainsDurations
#: :type story: Story
#: :type originalDirection: Direction
# story.directionsPerMsg[story.currentMessage.id]
# take the timeouts that are on the current message, and apply it to our return
# as to have somewhat equal pace as to where we originate from
if inheritTiming:
for originalDirection in story.getCurrentDirections():
# if originalDirection:
for condition in originalDirection.conditions:
if condition.type == 'timeout':
finalTimeoutDuration = float(condition.vars['seconds'])
if condition.type == 'replyContains':
finalContainsDurations = json.loads(condition.originalJsonString)['vars']['delays']
story.logger.debug(f"Finishes for {startMsg.id}: {finishMessageIds}")
i = 0
# story.logger.warn(f"FINISHES: {finishMessageIds}")
for msgId in finishMessageIds:
# Some very ugly hack to add a direction & condition
i+=1
msg = story.get(msgId)
if not msg:
continue
if msg.dontGenerateDirections:
story.logger.info(f"Diversion ending {msg.id} is story ending. Don't generate return direction.")
continue
usedReturnMessage = returnMsg
if msg.generatedDirectionsJumpToChapter:
usedReturnMessage = story.getNextChapterForMsg(returnMsg, canIncludeSelf=True)
if not usedReturnMessage:
# in case of a diversion in the last bit of the story, it can be there there is no return message.
raise Exception(f"No return message found for {msg.id}")
direction = Direction(f"{self.id}-{i}-{self.counter}", msg, usedReturnMessage)
data = json.loads(f"""
{{
"@id": "{self.id}-ct{i}-{self.counter}",
"@type": "Condition",
"type": "timeout",
"label": "Autogenerated Timeout",
"vars": {{
"seconds": "{finalTimeoutDuration}"
}}
}}
""")
data['vars']['onlyIfNoReply'] = finalContainsDurations is not None
# TODO: also at replycontains if it exists, with the same timings
condition = Condition.initFromJson(data, story)
direction.addCondition(condition)
if finalContainsDurations is not None:
data2 = json.loads(f"""
{{
"@id": "{self.id}-cr{i}-{self.counter}",
"@type": "Condition",
"type": "replyContains",
"label": "Autogenerated Reply Contains",
"vars": {{
"regex": "",
"instantMatch": false
}} }}
""")
data2['vars']['delays'] = finalContainsDurations
condition2 = Condition.initFromJson(data2, story)
direction.addCondition(condition2)
story.add(condition2)
direction.isDiversionReturn = True # will clear the currentDiversion on story
story.diversionDirections.append(direction)
story.logger.info(f"Created direction: {direction.id} ({msg.id} -> {usedReturnMessage.id}) {condition.id} with timeout {finalTimeoutDuration}s")
story.add(condition)
story.add(direction)
async def finalise(self, story):
""""
Only used if the Diversion sets the story.currentDiversion
"""
story.logger.info("end of diversion")
story.hugvey.eventLogger.info(f"return from {self.id}")
if not self.finaliseMethod:
story.logger.info(f"No finalisation for diversion {self.id}")
story.currentDiversion = None
return False
await self.finaliseMethod(story)
story.currentDiversion = None
return True
async def _divergeIfNoResponse(self, story, msgFrom, msgTo, direction):
"""
Participant doesn't speak for x consecutive replies (has had timeout)
"""
':type story: Story'
if story.currentDiversion or not msgFrom or not msgTo:
return False
if story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']):
story.stats['diversions']['no_response'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
return
story.logger.info(f"Diverge: No response {self.id} {story.stats}")
self.returnMessage = msgTo
if self.params['returnAfterStrand']:
self.createReturnDirectionsTo(story, msg, msgTo, direction)
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
return
async def _returnAfterNoResponse(self, story):
story.logger.info(f"Finalise diversion: {self.id}")
story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage)
async def _divergeIfReplyContains(self, story, msgFrom, msgTo, _):
"""
Participant doesn't speak for x consecutive replies (has had timeout)
"""
':type story: Story'
# use story.currentReply.getTimeSinceLastUtterance() > 2
if story.currentDiversion: # or not msgFrom or not msgTo:
# don't do nested diversions
return False
if self.hasHit:
# don't match twice
return
if story.currentReply is None or not self.regex:
return
direction = story.getDefaultDirectionForMsg(story.currentMessage)
if not direction:
# ignore the direction argument, and only check if the current message has a valid default
return
waitTime = story.applyTimeDelay(story.applyTimeFactor(1.8 if 'waitTime' not in self.params else float(self.params['waitTime'])))
timeSince = story.currentReply.getTimeSinceLastUtterance()
if timeSince < waitTime:
story.logger.log(LOG_BS, f"Waiting for replyContains: {timeSince} (needs {waitTime})")
return
text = story.currentReply.getText()
r = self.regex.search(text)
if r is None:
return
if 'notForColor' in self.params and self.params['notForColor'] and story.currentMessage.color:
if story.currentMessage.color.lower() == self.params['notForColor'].lower():
story.logger.debug(f"Skip diversion {self.id} because of section color")
return
story.logger.info(f"Diverge: reply contains {self.id}")
story.stats['diversions']['reply_contains'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
return
# valid diversion:
self.logInfo = f"{self.id} - search \"{r.re.pattern}\" on \"{text}\" matches \"{r.group()}\" after {timeSince}s"
if 'nextChapterOnReturn' in self.params and self.params['nextChapterOnReturn']:
msgTo = story.getNextChapterForMsg(story.currentMessage, False) or direction.msgTo
returnInheritTiming = False
else:
msgTo = direction.msgTo
returnInheritTiming = True
self.returnMessage = msgTo
if self.params['returnAfterStrand']:
self.createReturnDirectionsTo(story, msg, msgTo, direction, inheritTiming=returnInheritTiming)
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
async def _returnAfterReplyContains(self, story):
story.logger.info(f"Finalise diversion: {self.id}")
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage)
async def _divergeIfCollectiveMoment(self, story, msgFrom, msgTo, direction):
"""
Central command timer times to a collective moment
"""
#: :var story: Story
if story.currentDiversion or not msgFrom or not msgTo:
return False
if not msgTo.chapterStart:
# only when changing chapter
return
window_open_second = float(self.params['start_second'])
window_close_second = window_open_second + float(self.params['window'])
# Only keep a 1h loop
now = story.hugvey.command.timer.getElapsed() % 3600
if now < window_open_second or now > window_close_second:
return
#open!
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.id} {self.params['msgId']}")
return
self.returnMessage = msgTo
self.createReturnDirectionsTo(story, msg, msgTo, direction, inheritTiming=True)
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo, direction):
"""
Participant asks if message can be repeated.
"""
# if not msgFrom or not msgTo:
# return
# TODO: how to handle this now we sometimes use different timings.
# Perhaps set isFinished when matching condition.
if story.currentReply is None or story.currentReply.getTimeSinceLastUtterance() < story.applyTimeFactor(1.8):
return
if story.currentMessage.didRepeat:
# repeat only once
return
r = self.regex.search(story.currentReply.getText())
if r is None:
return
logger.info(f"Diverge: request repeat {self.id}")
story.stats['diversions']['repeat'] += 1
await story.setCurrentMessage(story.currentMessage)
story.currentMessage.didRepeat = True
return True
async def _divergeIfTimeout(self, story, msgFrom, msgTo, direction):
"""
(1) last spoken at all
(2) or duration for this last reply only
Only can kick in if there's no 'timeout' condition set.
"""
if story.currentDiversion:
return
if msgFrom or msgTo:
# not applicable a direction has been chosen
return
if not story.lastMsgFinishTime:
# not during play back
return
# not applicable when timeout is set
directions = story.getCurrentDirections()
for direction in directions:
for condition in direction.conditions:
if condition.type == 'timeout':
return
now = story.timer.getElapsed()
if now - story.lastMsgFinishTime < story.applyTimeFactor(self.params['minTimeAfterMessage']):
# not less than x sec after it
return
interval = story.applyTimeFactor(self.params['interval'])
if not self.params['fromLastMessage']:
# (1) last spoken at all
timeSince = story.timer.getElapsed('last_speech') if story.timer.hasMark('last_speech') else story.timer.getElapsed('start')
if story.timer.hasMark('last_diversion_timeout') and story.timer.getElapsed('last_diversion_timeout') > timeSince:
timeSince = story.timer.getElapsed('last_diversion_timeout')
if timeSince < interval:
return
story.stats['diversions']['timeout_total'] += 1
else:
if story.currentMessage is None:
return
# if story.currentMessage.timeoutDiversionCount + 1
if story.currentReply is not None:
# still playing back
# or somebody has spoken already (timeout only works on silences)
return
if now - story.lastMsgFinishTime < interval:
return
story.currentMessage.timeoutDiversionCount += 1
story.stats['diversions']['timeout_last'] += 1
# if we're still here, there's a match!
story.logger.info(f"Diverge: Timeout {self.id} of {self.params['interval']}")
story.stats['diversions']['timeout'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
return
# fall back to the currentMessage to return on.
# TODO: maybe, if not chapter is found, the diversion should be
# blocked alltogether?
self.returnMessage = story.getNextChapterForMsg(story.currentMessage, False) or story.currentMessage
if self.params['returnAfterStrand']:
# no direction is here, as this diversion triggers before a direction is taken
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False)
await story.setCurrentMessage(msg, allowReplyInterrupt=True)
story.currentDiversion = self
story.timer.setMark('last_diversion_timeout')
return True
async def _returnAfterTimeout(self, story):
story.logger.info(f"Finalise diversion: {self.id}")
async def _divergeIfInterrupted(self, story, msgFrom, msgTo, direction):
"""
This is here as a placeholder for the interruption diversion.
These will however be triggered differently
"""
if story.currentDiversion or story.allowReplyInterrupt:
return False
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
return
self.returnMessage = story.currentMessage
# no direction is here, as this diversion triggers before a direction is taken
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False, timeoutDuration=3, replyContainsDurations = [{
"minReplyDuration": "0",
"waitTime": "2"
}])
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
class Configuration(object):
id = 'configuration'
volume = 1 # Volume multiplier for 'play' command
nothing_text = "nothing" # When variable is not set, but used in sentence, replace it with this word.
time_factor = 1 # time is multiplied to timeouts etc. (not playback)
time_extra_delay = 0 # time adder for reply contains diversion/condition (see applyTimeDelay())
tempo_factor = 1 # tempo is multiplied (playback)
pitch_modifier = 1 # pitch is added (playback)
light0_intensity = 0
light0_fade = 30. # fade duration in seconds
light0_isSophie = False
light1_intensity = 150
light1_fade = 10.
light1_isSophie = False
light2_intensity = 75
light2_fade = 10.
light2_isSophie = False
light3_intensity = 150
light3_fade = 10.
light3_isSophie = False
light4_intensity = 150
light4_fade = 10.
light4_isSophie = False
@classmethod
def initFromJson(configClass, data, story):
config = Configuration()
config.__dict__.update(data)
return config
def getLightPresets(self):
c = self.__dict__
l = []
for i in range(5):
l.append({
'intensity': int(c[f"light{i}_intensity"]),
'fade': float(c[f"light{i}_fade"]),
'isSophie': float(c[f"light{i}_isSophie"])
})
return l
storyClasses = {
'Msg': Message,
'Direction': Direction,
'Condition': Condition,
'Diversion': Diversion,
'Configuration': Configuration,
}
class Stopwatch(object):
"""
Keep track of elapsed time. Use multiple markers, but a single pause/resume button
"""
def __init__(self):
self.isRunning = asyncio.Event()
self.reset()
def getElapsed(self, since_mark='start'):
t = time.time()
if self.paused_at != 0:
pause_duration = t - self.paused_at
else:
pause_duration = 0
return t - self.marks[since_mark] - pause_duration
def pause(self):
self.paused_at = time.time()
self.isRunning.clear()
def resume(self):
if self.paused_at == 0:
return
pause_duration = time.time() - self.paused_at
for m in self.marks:
self.marks[m] += pause_duration
self.paused_at = 0
self.isRunning.set()
def reset(self):
self.marks = {}
self.setMark('start')
self.paused_at = 0
self.isRunning.set()
def setMark(self, name, overrideValue = None):
"""
Set a marker to current time. Or , if given, to any float one desires
"""
self.marks[name] = overrideValue if overrideValue else time.time()
def hasMark(self, name):
return name in self.marks
def clearMark(self, name):
if name in self.marks:
self.marks.pop(name)
def __getstate__(self):
# print(f'get stopwatch')
state = self.__dict__.copy()
state['isRunning'] = self.isRunning.is_set()
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.isRunning = asyncio.Event()
if 'isRunning' in state and state['isRunning']:
self.isRunning.set()
else:
self.isRunning.clear()
class StoryState(object):
"""
Because Story not only contains state, but also logic/control variables, we need
a separate class to keep track of the state of things. This way, we can recreate
the exact state in which a story was before.
"""
msgLog = []
currentMessage = None
currentDiversion = None
currentReply = None
allowReplyInterrupt = False
timer = Stopwatch()
isRunning = False
lastMsgTime = None
lastSpeechStartTime = None
lastSpeechEndTime = None
variableValues = {} # captured variables from replies
finish_time = False
events = [] # queue of received events
commands = [] # queue of commands to send
log = [] # all nodes/elements that are triggered
msgLog = []
stats = {
'timeouts': 0,
'silentTimeouts': 0,
'consecutiveSilentTimeouts': 0,
'diversions': {
'no_response': 0,
'repeat': 0,
'reply_contains': 0,
'timeout': 0,
'timeout_total': 0,
'timeout_last': 0
}
}
def __init__(self):
pass
#
class Story(object):
"""Story represents and manages a story/narrative flow"""
# TODO should we separate 'narrative' (the graph) from the story (the
# current user flow)
def __init__(self, hugvey_state, panopticon_port):
super(Story, self).__init__()
self.hugvey = hugvey_state
self.panopticon_port = panopticon_port
self.events = [] # queue of received events
self.commands = [] # queue of commands to send
self.log = [] # all nodes/elements that are triggered
self.msgLog = [] # hit messages
self.logger = mainLogger.getChild(f"{self.hugvey.id}").getChild("story")
self.currentMessage = None
self.currentDiversion = None
self.currentReply = None
self.allowReplyInterrupt = False
self.timer = Stopwatch()
self.isRunning = False
self.diversions = []
self.interruptionDiversions = []
self.variables = {}
self.variableValues = {} # captured variables from replies
self.runId = uuid.uuid4().hex
self.currentLightPresetNr = None
def pause(self):
self.logger.debug('pause hugvey')
self.timer.pause()
def resume(self):
self.logger.debug('resume hugvey')
self.timer.resume()
def getLogSummary(self):
summary = {
# e[0]: the entity, e[1]: the logged time
'messages': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Message)],
'directions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Direction)],
'diversions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Diversion)],
}
# print(self.log)
return summary
def getLogCounts(self):
return {
'messages': len([1 for e in self.log if isinstance(e[0], Message)]),
'diversions': len([1 for e in self.log if isinstance(e[0], Diversion)]),
}
def registerVariable(self, variableName, message):
if variableName not in self.variables:
self.variables[variableName] = [message]
else:
self.variables[variableName].append(message)
def setVariableValue(self, name, value, store=True):
if name not in self.variables:
self.logger.warn(f"Set variable that is not needed in the story: {name}")
if name in self.variableValues and self.variableValues[name] == value:
self.logger.debug(f"Skip double setting of variable {name} to {value}")
return
self.logger.debug(f"Set variable {name} to {value}")
self.variableValues[name] = value
if store:
self.hugvey.command.variableStore.addVariable(self.runId, name, value, self.hugvey.id, self.language_code)
if name not in self.variables:
return
for message in self.variables[name]:
message.setVariable(name, value)
def hasVariableSet(self, name) -> bool:
return name in self.variableValues and self.variableValues is not None
def setStoryData(self, story_data, language_code):
"""
Parse self.data into a working story engine
"""
self.data = story_data
self.language_code = language_code
# keep to be able to reset it in the end
currentId = self.currentMessage.id if self.currentMessage else None
self.elements = {}
self.strands = {}
self.diversions = []
self.interruptionDiversions = []
self.directionsPerMsg = {}
self.startMessage = None # The entrypoint to the graph
self.variables = {}
self.reset()
for el in self.data:
try:
className = storyClasses[el['@type']]
obj = className.initFromJson(el, self)
self.add(obj)
except Exception as e:
self.logger.critical(f"Error loading story element: {el}")
self.logger.exception(e)
raise e
# self.logger.debug(self.elements)
# self.logger.debug(self.directionsPerMsg)
self.diversions = [el for el in self.elements.values() if type(el) == Diversion]
self.interruptionDiversions = [el for el in self.elements.values() if type(el) == Diversion and el.type == 'interrupt']
configurations = [el for el in self.elements.values() if type(el) == Configuration]
self.configuration = configurations[0] if len(configurations) else Configuration()
if currentId:
self.currentMessage = self.get(currentId)
if self.currentMessage:
self.logger.info(
f"Reinstantiated current message: {self.currentMessage.id}")
else:
self.logger.warn(
"Could not reinstatiate current message. Starting over")
# Register variables
for msg in self.getMessages():
# print(msg.id, msg.hasVariables())
if not msg.hasVariables():
continue
for var in msg.variables:
self.registerVariable(var, msg)
self.logger.debug(f'has variables: {self.variables}')
self.logger.debug(f'has {len(self.strands)} strands: {self.strands}')
# self.logger.info(f"Directions: {self.directionsPerMsg}")
self.calculateFinishesForStrands()
def reset(self):
self.timer.reset()
# self.startTime = time.time()
# currently active message, determines active listeners etc.
self.currentMessage = None
self.currentDiversion = None
self.lastMsgTime = None
self.lastSpeechStartTime = None
self.lastSpeechEndTime = None
self.variableValues = {} # captured variables from replies
self.finish_time = False
self.runId = uuid.uuid4().hex
self.diversionDirections = []
self.gaveErrorForNotContinuing = False
self.events = [] # queue of received events
self.commands = [] # queue of commands to send
self.log = [] # all nodes/elements that are triggered
self.msgLog = []
self.currentReply = None
self.stats = {
'timeouts': 0,
'silentTimeouts': 0,
'consecutiveSilentTimeouts': 0,
'diversions': {
'no_response': 0,
'repeat': 0,
'reply_contains': 0,
'timeout': 0,
'timeout_total': 0,
'timeout_last': 0
}
}
for msg in self.getMessages():
pass
def add(self, obj):
if obj.id in self.elements:
# print(obj)
raise Exception("Duplicate id for ''".format(obj.id))
self.elements[obj.id] = obj
if type(obj) == Diversion:
self.diversions.append(obj)
if type(obj) == Message:
if obj.isStart:
#confusingly, isStart is 'beginning' in the story json file
self.startMessage = obj
if obj.isStrandStart:
self.strands[obj.id] = []
if type(obj) == Direction:
if obj.msgFrom.id not in self.directionsPerMsg:
self.directionsPerMsg[obj.msgFrom.id] = []
self.directionsPerMsg[obj.msgFrom.id].append(obj)
def get(self, id):
"""
Get a story element by its id
"""
if id in self.elements:
return self.elements[id]
return None
def getMessages(self):
return [el for el in self.elements.values() if type(el) == Message]
def stop(self):
self.logger.info("Stop Story")
if self.isRunning:
self.isRunning = False
def shutdown(self):
self.stop()
self.hugvey = None
async def _processPendingEvents(self):
# Gather events:
nr = len(self.events)
for i in range(nr):
e = self.events.pop(0)
self.logger.debug("handle '{}'".format(e))
if e['event'] == "exit":
self.stop()
if e['event'] == 'connect':
# a client connected. Should only happen in the beginning or in case of error
# that is, until we have a 'reset' or 'start' event.
# reinitiate current message
await self.setCurrentMessage(self.currentMessage)
if e['event'] == "playbackStart":
if e['msgId'] != self.currentMessage.id:
continue
self.lastMsgStartTime = self.timer.getElapsed()
self.logger.debug("Start playback")
if e['event'] == "playbackFinish":
if e['msgId'] == self.currentMessage.id:
#TODO: migrate value to Messagage instead of Story
self.lastMsgFinishTime = self.timer.getElapsed()
self.hugvey.eventLogger.info(f"message: {self.currentMessage.id} {self.currentMessage.uuid} done")
# 2019-02-22 temporary disable listening while playing audio:
# if self.hugvey.google is not None:
# self.logger.warn("Temporary 'fix' -> resume recording?")
# self.hugvey.google.resume()
if self.currentMessage.id not in self.directionsPerMsg:
# print(self.currentDiversion)
# if self.currentDiversion is not None:
# await self.currentDiversion.finalise(self)
# else:
# TODO: check if direction that exists are diversion returns, and if they are already taken. Otherwise story blocks
self.logger.info("THE END!")
self._finish()
return
if e['event'] == 'speech':
# TODO if transcript is empty, ignore (happened sometimes in french)
if len(e['transcript'].strip()) < 1:
self.logger.warning(f'ignore empty transcription {e}')
continue
# participants speaks, reset counter
self.stats['consecutiveSilentTimeouts'] = 0
# if self.currentMessage and not self.lastMsgStartTime:
if self.currentMessage and not self.lastMsgFinishTime:
# Ignore incoming speech events until we receive a 'playbackStart' event.
# After that moment the mic will be muted, so nothing should come in _anyway_
# unless google is really slow on us. But by taking the start time we don't ignore
# messages that come in, in the case google is faster than our playbackFinish event.
# (if this setup doesn't work, try to test on self.lastMsgFinish time anyway)
# it keeps tricky with all these run conditions
# if len(self.interruptionDiversions) and not self.currentDiversion and not self.allowReplyInterrupt:
# self.logger.warn("diverge when speech during playing message")
# diversion = random.choice(self.interruptionDiversions)
# #: :type diversion: Diversion
# r = await diversion.divergeIfNeeded(self, None)
# print(r) # is always needed :-)
# else:
self.logger.info("ignore speech during playing message")
continue
# log if somebody starts speaking
if self.currentReply is None:
self.logger.info("Start speaking")
self.currentReply= Reply(self.currentMessage)
now = self.timer.getElapsed()
utterance = self.currentReply.getActiveUtterance(now)
# The 'is_final' from google sometimes comes 2 sec after finishing speaking
# therefore, we ignore the timing of this transcription if something has been said already
if e['is_final'] and utterance.hasText():
self.logger.debug(f'ignore timing: {now} use {utterance.lastUpdateTime}')
utterance.setText(e['transcript'], utterance.lastUpdateTime)
else:
utterance.setText(e['transcript'], now)
self.hugvey.eventLogger.debug("speaking: content {} \"{}\"".format(id(utterance), e['transcript']))
if not self.timer.hasMark('first_speech'):
self.timer.setMark('first_speech')
self.timer.setMark('last_speech')
if e['is_final']:
utterance.setFinished(self.timer.getElapsed())
self.hugvey.eventLogger.info("speaking: stop {}".format(id(utterance)))
if self.hugvey.recorder:
self.hugvey.recorder.updateTranscription(self.currentReply.getText())
def _processDirection(self, direction):
"""
return matching condition
"""
for condition in direction.conditions:
if condition.isMet(self):
self.logger.info("Condition is met: {0} ({2}), going to {1}".format(
condition.id, direction.msgTo.id, condition.type))
self.hugvey.eventLogger.info("condition: {0}".format(condition.id))
self.hugvey.eventLogger.info("direction: {0}".format(direction.id))
direction.setMetCondition(condition)
return condition
return None
async def _processDirections(self, directions):
':type directions: list(Direction)'
chosenDirection = None
metCondition = None
for direction in directions:
if direction.isDiversionReturn and direction.diversionHasReturned:
# Prevent that returns created from the same message send you
# back to a previous point in time.
# self.logger.warn("Skipping double direction for diversion")
continue
condition = self._processDirection(direction)
if not condition:
continue
self.addToLog(condition)
self.addToLog(direction)
self.currentMessage.setFinished(self.timer.getElapsed())
chosenDirection = direction
metCondition = condition
break
isDiverging = await self._processDiversions(chosenDirection)
allowReplyInterrupt = False
# in some cases, conditions should be allowed to interrupt the reply
if metCondition:
if metCondition.type == 'timeout' and not ('onlyIfNoReply' in metCondition.vars and metCondition.vars['onlyIfNoReply']):
allowReplyInterrupt = True
if metCondition.usedContainsDuration is not None and metCondition.usedContainsDuration < 0.1:
allowReplyInterrupt = True
if not isDiverging and chosenDirection:
if chosenDirection.isDiversionReturn and self.currentDiversion:
for direction in self.diversionDirections:
if direction.isDiversionReturn and not direction.diversionHasReturned:
self.logger.info(f"Mark diversion as returned for return direction {direction.id}")
direction.diversionHasReturned = True
# chosenDirection.diversionHasReturned = True
await self.currentDiversion.finalise(self)
await self.setCurrentMessage(chosenDirection.msgTo, allowReplyInterrupt=allowReplyInterrupt)
return chosenDirection
async def _processDiversions(self, direction: None) -> bool:
"""
Process the diversions on stack. If diverging, return True, else False
msgFrom and msgTo contain the source and target of a headed direction if given
Else, they are None
"""
diverge = False
activeDiversions = []
activeTimeoutDiv = None
activeTimeoutLastDiv = None
activeNoResponseDiv = None
for diversion in self.diversions:
#: :type diversion: Diversion
if diversion.disabled or diversion.hasHit or diversion.type == 'interrupt':
# interruptions are triggered somewhere else.
continue
if diversion.type == 'timeout':
if diversion.params['timesOccured'] > 0:
if not diversion.params['fromLastMessage']:
# perhaps neater if we collect them in a list, and then sort by key, but this works just as well
if not activeTimeoutDiv or activeTimeoutDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeTimeoutDiv = diversion
else:
if not activeTimeoutLastDiv or activeTimeoutLastDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeTimeoutLastDiv = diversion
continue
if diversion.type == 'no_response':
if diversion.params['timesOccured'] > 0:
if not activeNoResponseDiv or activeNoResponseDiv.params['timesOccured'] > diversion.params['timesOccured']:
activeNoResponseDiv = diversion
continue
activeDiversions.append(diversion)
if activeTimeoutDiv:
activeDiversions.append(activeTimeoutDiv)
if activeTimeoutLastDiv:
activeDiversions.append(activeTimeoutLastDiv)
if activeNoResponseDiv:
activeDiversions.append(activeNoResponseDiv)
for diversion in activeDiversions:
# TODO: collect diversions and order by times + timesOccured (for timeout & no_response)
d = await diversion.divergeIfNeeded(self, direction)
if d:
diverge = True
return diverge
def addToLog(self, node):
self.log.append((node, self.timer.getElapsed()))
if isinstance(node, Message):
self.msgLog.append(node)
if self.hugvey.recorder:
if isinstance(node, Message):
self.hugvey.recorder.log('hugvey', node.text, node.id)
if isinstance(node, Diversion):
self.hugvey.recorder.log('diversion',node.logInfo, node.id)
if isinstance(node, Condition):
self.hugvey.recorder.log('condition',node.logInfo, node.id)
def logHasMsg(self, node):
return node in self.msgLog
def checkIfGone(self):
'''
Make a guestimation if the audience has left... just really a simple timer check.
If we do think so, give an error and stop the conversation
'''
if not self.lastMsgFinishTime:
# don't do it when hugvey is speaking
return
if self.timer.hasMark('last_speech') and self.timer.getElapsed('last_speech') > 30*60:
self.hugvey.eventLogger.warning("Audience is quiet for too long...stopping")
self.logger.warning("Audience is quiet, force END!")
self._finish()
def checkIfHanging(self):
'''
Make a guestimation if the story is hanging at a message. Raise exception once.
'''
if not self.lastMsgFinishTime or self.gaveErrorForNotContinuing:
# don't do it when hugvey is speaking
# or when it already gave the error for this message
return
diff = self.timer.getElapsed() - self.lastMsgFinishTime
safeDiff = self.hugvey.command.config['story']['hugvey_critical_silence'] if 'hugvey_critical_silence' in self.hugvey.command.config['story'] else 90
if diff > safeDiff:
self.hugvey.eventLogger.warning("Hugvey is quiet for very long!")
self.logger.critical("Hugvey is quiet for very long!") # critical messages are forwarded to telegram
self.gaveErrorForNotContinuing = True
async def _renderer(self):
"""
every 1/10 sec. determine what needs to be done based on the current story state
"""
loopDuration = 0.1 # Configure fps
lastTime = time.time()
self.logger.debug("Start renderer")
while self.isRunning:
if self.isRunning is False:
break
# pause on timer paused
await self.timer.isRunning.wait() # wait for un-pause
for i in range(len(self.events)):
await self._processPendingEvents()
directions = self.getCurrentDirections()
await self._processDirections(directions)
self.checkIfGone()
self.checkIfHanging()
# TODO create timer event
# self.commands.append({'msg':'TEST!'})
# Test stability of Central Command with deliberate crash
# if self.timer.getElapsed() > 10:
# raise Exception("Test exception")
if not self.timer.hasMark('state_save') or self.timer.getElapsed('state_save') > 30:
self.storeState()
self.timer.setMark('state_save')
# wait for next iteration to avoid too high CPU
t = time.time()
await asyncio.sleep(max(0, loopDuration - (t - lastTime)))
lastTime = t
self.logger.debug("Stop renderer")
async def setCurrentMessage(self, message, useReply = None, allowReplyInterrupt = False):
"""
Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption.
"""
if self.currentMessage and not self.lastMsgFinishTime:
self.logger.info("Interrupt playback {}".format(self.currentMessage.id))
self.hugvey.eventLogger.info("interrupt")
# message is playing
self.hugvey.sendCommand({
'action': 'stop',
'id': self.currentMessage.id,
})
message.uuid = shortuuid.uuid()
self.currentMessage = message
self.lastMsgTime = time.time()
self.lastMsgFinishTime = None # to be filled in by the event
self.lastMsgStartTime = None # to be filled in by the event
self.gaveErrorForNotContinuing = False
self.allowReplyInterrupt = allowReplyInterrupt
# if not reset:
self.previousReply = self.currentReply # we can use this for interrptions
self.currentReply = useReply #self.currentMessage.reply
# send command to already mute mic
self.hugvey.sendCommand({
'action': 'prepare',
'id': message.id
})
# else:
# # if we press 'save & play', it should not remember it's last reply to that msg
# self.previousReply = self.currentReply # we can use this for interrptions
# self.currentReply = self.currentMessage.reply
self.logger.info("Current message: ({0}) \"{1}\"".format(
message.id, message.getTextLabel()))
if message.id != self.startMessage.id:
self.addToLog(message)
self.hugvey.eventLogger.info(f"message: {message.id} {message.uuid} start \"{message.getLabel()}\"")
# TODO: prep events & timer etc.
fn = await message.getAudioFilePath()
# get duration of audio file, so the client can detect a hang of 'play'
try:
duration = sox.file_info.duration(fn)
except Exception as e:
self.hugvey.eventLogger.critical(f"error: crash when reading wave file: {fn}")
self.logger.critical(f"error: crash when reading wave file: {fn}")
self.logger.exception(e)
duration = 10 # some default duration to have something to fall back to
params = message.getParams().copy()
params['vol'] = params['vol'] * self.configuration.volume if 'vol' in params else self.configuration.volume
params['vol'] = "{:.4f}".format(params['vol'])
params['tempo'] = (float(params['tempo']) if 'tempo' in params else 1) * (float(self.configuration.tempo_factor) if hasattr(self.configuration, 'tempo_factor') else 1)
duration = float(duration) / params['tempo']
params['tempo'] = "{:.4f}".format(params['tempo'])
params['pitch'] = (float(params['pitch']) if 'pitch' in params else 0)\
+ (float(self.configuration.pitch_modifier) if hasattr(self.configuration, 'pitch_modifier') else 0)
params['pitch'] = "{:.4f}".format(params['pitch'])
# self.hugvey.google.pause() # pause STT to avoid text events while decision is made
self.hugvey.sendCommand({
'action': 'play',
'file': fn,
'id': message.id,
'params': params,
'duration': duration
})
if message.lightChange is not None:
self.fadeLightPreset(message.lightChange)
# self.hugvey.setLightStatus(message.lightChange)
# 2019-02-22 temporary disable listening while playing audio:
# if self.hugvey.google is not None:
# self.logger.warn("Temporary 'fix' -> stop recording")
# self.hugvey.google.pause()
logmsg = "Pending directions:"
for direction in self.getCurrentDirections():
conditions = [c.id for c in direction.conditions]
logmsg += "\n- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions)
self.logger.log(LOG_BS,logmsg)
# if message.id != self.startMessage.id:
# self.storeState()
def fadeLightPreset(self, presetNr: int):
if presetNr < 0 or presetNr > 4:
self.logger.critical(f"Error parsing light fade preset code '{presetNr}'")
return
preset = self.configuration.getLightPresets()[presetNr]
self.currentLightPresetNr = presetNr
self.hugvey.transitionLight(preset['intensity'], preset['fade'], preset['isSophie'])
def getCurrentDirections(self):
if self.currentMessage.id not in self.directionsPerMsg:
return []
else:
return self.directionsPerMsg[self.currentMessage.id]
def getNextChapterForMsg(self, msg, canIncludeSelf = True, depth = 0):
if canIncludeSelf and msg.chapterStart:
self.logger.info(f"Next chapter: {msg.id}")
return msg
if depth >= 70:
# protection against infinite loop?
return None
if msg.id not in self.directionsPerMsg:
return None
for direction in self.directionsPerMsg[msg.id]:
r = self.getNextChapterForMsg(direction.msgTo, True, depth+1)
if r:
return r
# none found
return None
async def run(self, customStartMsgId = None, resuming = False):
self.logger.info("Starting story")
if not resuming:
self.hugvey.eventLogger.info("story: start")
self.timer.reset()
self.isRunning = True
if customStartMsgId is not None:
startMsg = self.get(customStartMsgId)
else:
startMsg = self.startMessage
await self.setCurrentMessage(startMsg)
else:
self.hugvey.eventLogger.info(f"story: resume from {self.currentMessage}")
self.isRunning = True
if not self.lastMsgFinishTime and self.currentMessage:
await self.setCurrentMessage(self.currentMessage)
await self._renderer()
def isFinished(self):
if hasattr(self, 'finish_time') and self.finish_time:
return time.time() - self.finish_time
return False
def _finish(self):
"""
Finish story and set hugvey to the right state
"""
self.finish()
#stop google etc:
self.hugvey.available(log=False)
def finish(self):
"""
Finish only the story
"""
self.logger.info(f"Finished story for {self.hugvey.id}")
self.hugvey.eventLogger.info("story: finished")
self.stop()
self.finish_time = time.time()
self.timer.pause()
if self.hugvey.google:
self.hugvey.google.stop()
def calculateFinishesForMsg(self, msgId, depth = 0, checked = []):
"""
BEWARE: checked = [] is evaluated at creation time of the method. Meaning that each call to this method
which doesn't explicitly specify the checked list, relies upon the list created at parse time. This means
subsequent call to the method make the list larger!! So the default should actually never be used. (found
out the hard way ;-) )
"""
# print(checked)
if msgId in checked:
# self.logger.log(LOG_BS, f"Finish for {msgId} already checked")
return []
checked.append(msgId)
if not msgId in self.directionsPerMsg or len(self.directionsPerMsg[msgId]) < 1:
# is finish
return [msgId]
if depth == 400:
self.logger.warn(f"Very deep hidden message to calculate finish for: msgId {msgId}")
# return []
finishes = []
for d in self.directionsPerMsg[msgId]:
if d.msgTo.id == msgId:
continue
finishes.extend(self.calculateFinishesForMsg(d.msgTo.id, depth+1, checked))
# de-duplicate before returning
return list(set(finishes))
def calculateFinishesForStrands(self):
for startMsgId in self.strands:
msg = self.get(startMsgId) #: :type msg: Message
if msg.isStart:
# ignore for the beginning
continue
self.logger.log(LOG_BS, f"Get finishes for {startMsgId}")
self.strands[startMsgId] = self.calculateFinishesForMsg(startMsgId, checked=[])
self.logger.log(LOG_BS, f"Finishes: {self.strands}")
def getFinishesForMsg(self, msg):
"""
Find the end of strands
Most often they will be 'start's so to speed up these are pre-calculated
Others can be calculated on the spot
returns message ids
"""
self.logger.debug(f"Get finishes for {msg.id} from {self.strands}")
if msg.id in self.strands:
return self.strands[msg.id]
return self.calculateFinishesForMsg(msg.id, checked=[])
def applyTimeFactor(self, time) -> float:
"""
Apply the particularities of the configuration.time_factor
"""
time = float(time)
if time < 2: # short timings are not influenced by this factor
return time
return time * self.configuration.time_factor
def applyTimeDelay(self, time) -> float:
"""
Since Moscow: apparently the interval at which Google returns interim results differs per language,
or we have anther cause of irregular results, either way, this screws up the short waitTimes that are
crucial for the replyContains condition/diversion. Therefore, have a story configuration option with which
we can extra delay to the timings (if non-zero)
"""
time = float(time)
if time > 0:
#if zero, it should always match instantly.
time += self.configuration.time_extra_delay
return time
def getDefaultDirectionForMsg(self, msg):
"""
There is only a default direction (for reply contains diversion) if it has
one, and only one, direction to go. If there's more, it should do nothing.
"""
if not msg.id in self.directionsPerMsg:
# is finish
return None
if len(self.directionsPerMsg[msg.id]) > 1:
return None
# TODO: should the direction have at least a timeout condition set, or not perse?
return self.directionsPerMsg[msg.id][0]
@classmethod
def getStateDir(self):
# return "/tmp"
return "./state"
# day = time.strftime("%Y%m%d")
# t = time.strftime("%H:%M:%S")
#
# self.out_folder = os.path.join(self.main_folder, day, f"{self.hv_id}", t)
# if not os.path.exists(self.out_folder):
# self.logger.debug(f"Create directory {self.out_folder}")
# self.target_folder = os.makedirs(self.out_folder, exist_ok=True)
@classmethod
def getStateFilename(cls, hv_id):
return os.path.join(cls.getStateDir(), f"state_hugvey{hv_id}")
def storeState(self):
# TODO: stop stopwatch
fn = self.getStateFilename(self.hugvey.lightId)
tmpfn = fn + '.tmp'
self.stateSave = time.time()
self.lightStateSave = self.hugvey.lightStatus
with open(tmpfn, 'wb') as fp:
pickle.dump(self, fp)
# write atomic to disk: flush, close, rename
fp.flush()
os.fsync(fp.fileno())
os.rename(tmpfn, fn)
duration = time.time() - self.stateSave
self.logger.debug(f"saved state to {fn} in {duration}s")
def hasSavedState(self):
return self.hugveyHasSavedState(self.hugvey.lightId)
@classmethod
def hugveyHasSavedState(cls, hv_id):
# print(os.path.exists(cls.getStateFilename(hv_id)), cls.getStateFilename(hv_id))
return os.path.exists(cls.getStateFilename(hv_id))
@classmethod
def loadStoryFromState(cls, hugvey_state):
# restart stopwatch
with open(cls.getStateFilename(hugvey_state.lightId), 'rb') as fp:
story = pickle.load(fp)
story.hugvey = hugvey_state
#: :type story: Story
story.logger = mainLogger.getChild(f"{story.hugvey.id}").getChild("story")
# TODO: this is not really working because it is overridden by the set-status later.
# story.hugvey.setLightStatus(story.lightStateSave)
story.logger.critical(f"Light preset {story.currentLightPresetNr}")
if story.currentLightPresetNr is not None:
story.fadeLightPreset(story.currentLightPresetNr)
return story
@classmethod
def clearSavedState(cls, hv_id):
fn = cls.getStateFilename(hv_id)
if os.path.exists(fn):
os.unlink(fn)
mainLogger.info(f"Removed state: {fn}")
#
def __getstate__(self):
# Copy the object's state from self.__dict__ which contains
# all our instance attributes. Always use the dict.copy()
# method to avoid modifying the original state.
state = self.__dict__.copy()
# Remove the unpicklable entries.
del state['hugvey']
del state['logger']
# del state['isRunning']
return state
def __setstate__(self, state):
self.__dict__.update(state)