1795 lines
64 KiB
Python
1795 lines
64 KiB
Python
import json
|
|
import time
|
|
import logging
|
|
import re
|
|
import asyncio
|
|
import urllib.parse
|
|
from .communication import LOG_BS
|
|
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
|
|
import uuid
|
|
import shortuuid
|
|
import threading
|
|
import faulthandler
|
|
from zmq.asyncio import Context
|
|
import zmq
|
|
import wave
|
|
import sox
|
|
from pythonosc import udp_client
|
|
import random
|
|
import pickle
|
|
import os
|
|
import traceback
|
|
|
|
mainLogger = logging.getLogger("hugvey")
|
|
logger = mainLogger.getChild("narrative")
|
|
|
|
class Utterance(object):
|
|
"""Part of a reply"""
|
|
def __init__(self, startTime):
|
|
self.startTime = startTime
|
|
self.endTime = None
|
|
self.text = ""
|
|
self.lastUpdateTime = startTime
|
|
|
|
def setText(self, text, now):
|
|
self.text = text
|
|
self.lastUpdateTime = now
|
|
|
|
def hasText(self):
|
|
return len(self.text) > 0
|
|
|
|
def setFinished(self, endTime):
|
|
self.endTime = endTime
|
|
|
|
def isFinished(self):
|
|
return self.endTime is not None
|
|
|
|
|
|
def __getstate__(self):
|
|
# print(f'get utterance {self}')
|
|
state = self.__dict__.copy()
|
|
return state
|
|
|
|
|
|
class Message(object):
|
|
def __init__(self, id, text):
|
|
self.id = id
|
|
self.text = text
|
|
self.isStart = False
|
|
self.isStrandStart = False
|
|
self.chapterStart = False
|
|
self.reply = None
|
|
# self.replyTime = None
|
|
self.audioFile= None
|
|
self.filenameFetchLock = asyncio.Lock()
|
|
self.interruptCount = 0
|
|
self.timeoutDiversionCount = 0
|
|
self.afterrunTime = 0. # the time after this message to allow for interrupts
|
|
self.finishTime = None # message can be finished without finished utterance (with instant replycontains)
|
|
self.params = {}
|
|
self.variableValues = {}
|
|
self.parseForVariables()
|
|
self.uuid = None # Have a unique id each time the message is played back.
|
|
self.color = None
|
|
|
|
def __getstate__(self):
|
|
# Copy the object's state from self.__dict__ which contains
|
|
# all our instance attributes. Always use the dict.copy()
|
|
# method to avoid modifying the original state.
|
|
# print(f'get msg {self.id}')
|
|
state = self.__dict__.copy()
|
|
# Remove the unpicklable entries.
|
|
del state['filenameFetchLock']
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
self.__dict__.update(state)
|
|
self.filenameFetchLock = asyncio.Lock()
|
|
|
|
def setStory(self, story):
|
|
self.story = story
|
|
self.logger = story.logger.getChild("message")
|
|
|
|
@classmethod
|
|
def initFromJson(message, data, story):
|
|
msg = message(data['@id'], data['text'])
|
|
msg.isStart = data['beginning'] if 'beginning' in data else False
|
|
msg.isStrandStart = data['start'] if 'start' in data else False
|
|
msg.chapterStart = bool(data['chapterStart']) if 'chapterStart' in data else False
|
|
msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0.
|
|
msg.color = data['color'] if 'color' in data else None
|
|
if 'audio' in data and data['audio'] is not None:
|
|
msg.audioFile = data['audio']['file']
|
|
msg.setStory(story)
|
|
if 'params' in data:
|
|
msg.params = data['params']
|
|
if not 'vol' in msg.params:
|
|
# prevent clipping on some Lyrebird tracks
|
|
msg.params['vol'] = .8
|
|
|
|
msg.params['vol'] = float(msg.params['vol'])
|
|
|
|
return msg
|
|
|
|
def parseForVariables(self):
|
|
"""
|
|
Find variables in text
|
|
"""
|
|
self.variables = re.findall('\$(\w+)', self.text)
|
|
for var in self.variables:
|
|
self.variableValues[var] = None
|
|
|
|
def hasVariables(self) -> bool:
|
|
return len(self.variables) > 0
|
|
|
|
def setVariable(self, name, value):
|
|
if name not in self.variables:
|
|
self.logger.critical("Set nonexisting variable")
|
|
return
|
|
|
|
if self.variableValues[name] == value:
|
|
return
|
|
|
|
self.variableValues[name] = value
|
|
|
|
self.logger.warn(f"Set variable, fetch {name}")
|
|
if not None in self.variableValues.values():
|
|
self.logger.warn(f"now fetch {name}")
|
|
asyncio.get_event_loop().create_task(self.getAudioFilePath())
|
|
# asyncio.get_event_loop().call_soon_threadsafe(self.getAudioFilePath)
|
|
self.logger.warn(f"started {name}")
|
|
|
|
def getText(self):
|
|
# sort reverse to avoid replacing the wrong variable
|
|
self.variables.sort(key=len, reverse=True)
|
|
text = self.text
|
|
# self.logger.debug(f"Getting text for {self.id}")
|
|
for var in self.variables:
|
|
self.logger.debug(f"try replacing ${var} with {self.variableValues[var]} in {text}")
|
|
replacement = self.variableValues[var] if (self.variableValues[var] is not None) else self.story.configuration.nothing_text #TODO: translate nothing to each language
|
|
text = text.replace('$'+var, replacement)
|
|
return text
|
|
|
|
def setReply(self, reply):
|
|
self.reply = reply
|
|
|
|
def hasReply(self):
|
|
return self.reply is not None
|
|
|
|
def getReply(self):
|
|
if not self.hasReply():
|
|
raise Exception(
|
|
"Getting reply while there is none! {0}".format(self.id))
|
|
|
|
return self.reply
|
|
|
|
def isFinished(self):
|
|
return self.finishTime is not None
|
|
|
|
def setFinished(self, currentTime):
|
|
self.finishTime = currentTime
|
|
|
|
def getFinishedTime(self):
|
|
return self.finishTime
|
|
|
|
def getParams(self):
|
|
return self.params
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id,
|
|
'time': None if self.reply is None else [u.startTime for u in self.reply.utterances],
|
|
'text': self.getText(),
|
|
'replyText': None if self.reply is None else [u.text for u in self.reply.utterances]
|
|
}
|
|
|
|
async def getAudioFilePath(self):
|
|
if self.audioFile is not None:
|
|
return self.audioFile
|
|
|
|
text = self.getText()
|
|
self.logger.debug(f"Fetching audio for {text}")
|
|
|
|
# return "test";
|
|
async with self.filenameFetchLock:
|
|
# print(threading.enumerate())
|
|
|
|
info = {
|
|
'text': text,
|
|
'variable': True if self.hasVariables() else False
|
|
}
|
|
s = Context.instance().socket(zmq.REQ) #: :type s: zmq.sugar.Socket
|
|
voiceAddr = f"ipc://voice{self.story.hugvey.id}"
|
|
s.connect(voiceAddr)
|
|
await s.send_json(info)
|
|
filename = await s.recv_string()
|
|
s.close()
|
|
|
|
|
|
# print(threading.enumerate())
|
|
|
|
self.logger.debug(f"Fetched audio for {text}: {filename}")
|
|
return filename
|
|
|
|
|
|
|
|
class Reply(object):
|
|
def __init__(self, message: Message):
|
|
self.forMessage = None
|
|
self.utterances = []
|
|
self.setForMessage(message)
|
|
|
|
|
|
def __getstate__(self):
|
|
# print(f'get reply {self}')
|
|
state = self.__dict__.copy()
|
|
return state
|
|
|
|
def setForMessage(self, message: Message):
|
|
self.forMessage = message
|
|
message.setReply(self)
|
|
|
|
def getLastUtterance(self) -> Utterance:
|
|
if not self.hasUtterances():
|
|
return None
|
|
u = self.utterances[-1] #: :type u: Utterance
|
|
|
|
# attempt to fix a glitch that google does not always send is_finished
|
|
if u.isFinished():
|
|
return u
|
|
|
|
now = self.forMessage.story.timer.getElapsed()
|
|
diff = now - u.lastUpdateTime
|
|
if diff > 5: # time in seconds to force silence in utterance
|
|
# useful for eg. 'hello', or 'no'
|
|
self.forMessage.story.logger.warn(
|
|
f"Set finish time for utterance after {diff}s {u.text}"
|
|
)
|
|
u.setFinished(now)
|
|
|
|
return u
|
|
|
|
def getFirstUtterance(self) -> Utterance:
|
|
if not self.hasUtterances():
|
|
return None
|
|
return self.utterances[0]
|
|
|
|
def hasUtterances(self) -> bool:
|
|
return len(self.utterances) > 0
|
|
|
|
def addUtterance(self, utterance: Utterance):
|
|
self.utterances.append(utterance)
|
|
|
|
def getText(self) -> str:
|
|
return ". ".join([u.text for u in self.utterances])
|
|
|
|
def getActiveUtterance(self, currentTime) -> Utterance:
|
|
"""
|
|
If no utterance is active, create a new one. Otherwise return non-finished utterance for update
|
|
"""
|
|
if len(self.utterances) < 1 or self.getLastUtterance().isFinished():
|
|
u = Utterance(currentTime)
|
|
self.addUtterance(u)
|
|
else:
|
|
u = self.getLastUtterance()
|
|
return u
|
|
|
|
def isSpeaking(self):
|
|
u = self.getLastUtterance()
|
|
if u is not None and not u.isFinished():
|
|
return True
|
|
return False
|
|
|
|
def getTimeSinceLastUtterance(self):
|
|
if not self.hasUtterances():
|
|
return None
|
|
|
|
return self.forMessage.story.timer.getElapsed() - self.getLastUtterance().lastUpdateTime
|
|
|
|
class Condition(object):
|
|
"""
|
|
A condition, basic conditions are built in, custom condition can be given by
|
|
providing a custom method.
|
|
"""
|
|
|
|
def __init__(self, id):
|
|
self.id = id
|
|
self.method = None
|
|
self.type = None
|
|
self.vars = {}
|
|
self.logInfo = None
|
|
self.originalJsonString = None
|
|
self.usedContainsDuration = None
|
|
|
|
|
|
def __getstate__(self):
|
|
# print(f'get condition {self.id}')
|
|
state = self.__dict__.copy()
|
|
return state
|
|
|
|
@classmethod
|
|
def initFromJson(conditionClass, data, story):
|
|
condition = conditionClass(data['@id'])
|
|
condition.type = data['type']
|
|
condition.originalJsonString = json.dumps(data)
|
|
|
|
# TODO: should Condition be subclassed?
|
|
if data['type'] == "replyContains":
|
|
condition.method = condition._hasMetReplyContains
|
|
if data['type'] == "timeout":
|
|
condition.method = condition._hasMetTimeout
|
|
if data['type'] == "variable":
|
|
condition.method = condition._hasVariable
|
|
if data['type'] == "diversion":
|
|
condition.method = condition._hasDiverged
|
|
|
|
if 'vars' in data:
|
|
condition.vars = data['vars']
|
|
|
|
return condition
|
|
|
|
def isMet(self, story):
|
|
"""
|
|
Validate if condition is met for the current story state
|
|
"""
|
|
return self.method(story)
|
|
|
|
def _hasMetTimeout(self, story):
|
|
now = story.timer.getElapsed()
|
|
# check if the message already finished playing
|
|
if not story.lastMsgFinishTime:
|
|
return False
|
|
|
|
if 'onlyIfNoReply' in self.vars and self.vars['onlyIfNoReply']:
|
|
if story.currentReply and story.currentReply is not None and story.currentReply.hasUtterances():
|
|
story.logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}')
|
|
# 'onlyIfNoReply': only use this timeout if participants doesn't speak.
|
|
return False
|
|
# else:
|
|
# story.logger.debug('Only if no reply has no text yet!')
|
|
|
|
hasMetTimeout = now - story.lastMsgFinishTime >= float(self.vars['seconds'])
|
|
if not hasMetTimeout:
|
|
return False
|
|
|
|
# update stats:
|
|
story.stats['timeouts'] +=1
|
|
if 'needsReply' in self.vars and self.vars['needsReply'] is True:
|
|
if story.currentReply is None or not story.currentReply.hasUtterances():
|
|
story.stats['silentTimeouts'] +=1
|
|
story.stats['consecutiveSilentTimeouts'] += 1
|
|
|
|
self.logInfo = "{}s".format(self.vars['seconds'])
|
|
return True
|
|
|
|
def _hasVariable(self, story) -> bool:
|
|
if not story.lastMsgFinishTime:
|
|
return False
|
|
|
|
r = story.hasVariableSet(self.vars['variable'])
|
|
if r:
|
|
story.logger.debug(f"Variable {self.vars['variable']} is set.")
|
|
|
|
if 'notSet' in self.vars and self.vars['notSet']:
|
|
# inverse:
|
|
r = not r
|
|
self.logInfo = "Does {} have variable {}".format(
|
|
'not' if 'notSet' in self.vars and self.vars['notSet'] else '',
|
|
self.vars['variable']
|
|
)
|
|
return r
|
|
|
|
def _hasDiverged(self, story) -> bool:
|
|
if not story.lastMsgFinishTime:
|
|
return False
|
|
|
|
d = story.get(self.vars['diversionId'])
|
|
if not d:
|
|
story.logger.critical(f"Condition on non-existing diversion: {self.vars['diversionId']}")
|
|
|
|
r = d.hasHit
|
|
if r:
|
|
story.logger.debug(f"Diversion {self.vars['diversionId']} has been hit.")
|
|
|
|
if 'inverseMatch' in self.vars and self.vars['inverseMatch']:
|
|
# inverse:
|
|
r = not r
|
|
|
|
self.logInfo = "Has {} diverged to {}".format(
|
|
'not' if 'inverseMatch' in self.vars and self.vars['inverseMatch'] else '',
|
|
self.vars['diversionId']
|
|
)
|
|
|
|
return r
|
|
|
|
def _hasMetReplyContains(self, story) -> bool:
|
|
"""
|
|
Check the reply for specific characteristics:
|
|
- regex: regular expression. If empy, just way for isFinished()
|
|
- delays: an array of [{'minReplyDuration', 'waitTime'},...]
|
|
- minReplyDuration: the nr of seconds the reply should take. Preferably have one with 0
|
|
- waitTime: the time to wait after isFinished() before continuing
|
|
"""
|
|
r = story.currentReply # make sure we keep working with the same object
|
|
if not r or not r.hasUtterances():
|
|
return False
|
|
|
|
capturedVariables = None
|
|
|
|
if 'regex' in self.vars and len(self.vars['regex']):
|
|
if 'regexCompiled' not in self.vars:
|
|
# Compile once, as we probably run it more than once
|
|
self.vars['regexCompiled'] = re.compile(self.vars['regex'])
|
|
|
|
t = r.getText().lower()
|
|
story.logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t))
|
|
result = self.vars['regexCompiled'].search(t)
|
|
if result is None:
|
|
#if there is something to match, but not found, it's never ok
|
|
return False
|
|
story.logger.debug('Got match on {}'.format(self.vars['regex']))
|
|
|
|
capturedVariables = result.groupdict()
|
|
|
|
if ('instantMatch' in self.vars and self.vars['instantMatch']) or not r.isSpeaking():
|
|
# try to avoid setting variables for intermediate strings
|
|
for captureGroup in capturedVariables:
|
|
story.setVariableValue(captureGroup, capturedVariables[captureGroup])
|
|
|
|
if 'instantMatch' in self.vars and self.vars['instantMatch']:
|
|
story.logger.info(f"Instant match on {self.vars['regex']}, {self.vars}")
|
|
self.logInfo = "Instant match of {}, captured {}".format(
|
|
self.vars['regex'],
|
|
capturedVariables
|
|
)
|
|
return True
|
|
# TODO: implement 'instant match' -> don't wait for isFinished()
|
|
|
|
|
|
# print(self.vars)
|
|
# either there's a match, or nothing to match at all
|
|
if 'delays' in self.vars:
|
|
if story.lastMsgFinishTime is None:
|
|
story.logger.debug("not finished playback yet")
|
|
return False
|
|
# time between finishing playback and ending of speaking:
|
|
replyDuration = r.getLastUtterance().lastUpdateTime - story.lastMsgFinishTime # using lastUpdateTime instead of endTime
|
|
delays = sorted(self.vars['delays'], key=lambda k: float(k['minReplyDuration']), reverse=True)
|
|
for delay in delays:
|
|
if replyDuration > float(delay['minReplyDuration']):
|
|
timeSinceReply = r.getTimeSinceLastUtterance()
|
|
story.logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {delay['waitTime']}")
|
|
if timeSinceReply > float(delay['waitTime']):
|
|
# if variables are captured, only set them the moment the condition matches
|
|
if capturedVariables is not None:
|
|
for captureGroup in capturedVariables:
|
|
story.setVariableValue(captureGroup, capturedVariables[captureGroup])
|
|
self.logInfo = "Match of {}, captured {} after, {}".format(
|
|
self.vars['regex'],
|
|
capturedVariables,
|
|
timeSinceReply
|
|
)
|
|
self.usedContainsDuration = float(delay['waitTime'])
|
|
return True
|
|
break # don't check other delays
|
|
# wait for delay to match
|
|
story.logger.log(LOG_BS, "Wait for it...")
|
|
return False
|
|
|
|
# If there is a delay, it takes precedence of isSpeaking, since google does not always give an is_finished on short utterances (eg. "hello" or "no")
|
|
if r.isSpeaking():
|
|
story.logger.log(LOG_BS, f"is speaking: {r.getLastUtterance().text} - {r.getLastUtterance().startTime}")
|
|
return False
|
|
|
|
# There is a match and no delay say, person finished speaking. Go ahead sir!
|
|
self.logInfo = "Match"
|
|
return True
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id
|
|
}
|
|
|
|
class Direction(object):
|
|
"""
|
|
A condition based edge in the story graph
|
|
"""
|
|
|
|
def __init__(self, id, msgFrom: Message, msgTo: Message):
|
|
self.id = id
|
|
self.msgFrom = msgFrom
|
|
self.msgTo = msgTo
|
|
#: :type self.conditions: list(Condition)
|
|
self.conditions = []
|
|
self.conditionMet = None
|
|
self.isDiversionReturn = False
|
|
|
|
|
|
def __getstate__(self):
|
|
# print(f'get direction {self.id}')
|
|
state = self.__dict__.copy()
|
|
return state
|
|
|
|
def addCondition(self, condition: Condition):
|
|
self.conditions.append(condition)
|
|
|
|
def setMetCondition(self, condition: Condition):
|
|
self.conditionMet = condition
|
|
|
|
@classmethod
|
|
def initFromJson(direction, data, story):
|
|
msgFrom = story.get(data['source'])
|
|
msgTo = story.get(data['target'])
|
|
direction = direction(data['@id'], msgFrom, msgTo)
|
|
if 'conditions' in data:
|
|
for conditionId in data['conditions']:
|
|
c = story.get(conditionId)
|
|
direction.addCondition(c)
|
|
return direction
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id,
|
|
'condition': self.conditionMet.id if self.conditionMet else None
|
|
}
|
|
|
|
|
|
class Diversion(object):
|
|
"""
|
|
An Diversion. Used to catch events outside of story flow.
|
|
Not sure why I'm not using subclasses here o:)
|
|
"""
|
|
|
|
def __init__(self, id, type: str, params: dict):
|
|
self.id = id
|
|
self.params = params
|
|
self.finaliseMethod = None
|
|
self.hasHit = False
|
|
self.disabled = False
|
|
self.type = type
|
|
self.counter = 0
|
|
if type == 'no_response':
|
|
self.method = self._divergeIfNoResponse
|
|
self.finaliseMethod = self._returnAfterNoResponse
|
|
self.counter = 0
|
|
if type == 'reply_contains':
|
|
self.method = self._divergeIfReplyContains
|
|
self.finaliseMethod = self._returnAfterReplyContains
|
|
if len(self.params['regex']) > 0:
|
|
self.regex = re.compile(self.params['regex'])
|
|
else:
|
|
self.regex = None
|
|
|
|
if type == 'timeout':
|
|
self.method = self._divergeIfTimeout
|
|
self.finaliseMethod = self._returnAfterTimeout
|
|
if type == 'repeat':
|
|
self.method = self._divergeIfRepeatRequest
|
|
self.regex = re.compile(self.params['regex'])
|
|
if type == 'interrupt':
|
|
self.method = self._divergeIfInterrupted
|
|
|
|
if not self.method:
|
|
raise Exception("No valid type given for diversion")
|
|
|
|
|
|
def __getstate__(self):
|
|
# print(f'get diversion {self.id}')
|
|
state = self.__dict__.copy()
|
|
return state
|
|
|
|
@classmethod
|
|
def initFromJson(diversionClass, data, story):
|
|
diversion = diversionClass(data['@id'], data['type'], data['params'])
|
|
|
|
return diversion
|
|
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id,
|
|
}
|
|
|
|
async def divergeIfNeeded(self, story, direction = None):
|
|
"""
|
|
Validate if condition is met for the current story state
|
|
Returns True when diverging
|
|
"""
|
|
|
|
# For all diversion except repeat (which simply doesn't have the variable)
|
|
if 'notAfterMsgId' in self.params and self.params['notAfterMsgId']:
|
|
msg = story.get(self.params['notAfterMsgId'])
|
|
if msg is None:
|
|
story.logger.warn(f"Invalid message selected for diversion: {self.params['notAfterMsgId']} for {self.id}")
|
|
elif story.logHasMsg(msg):
|
|
# story.logger.warn(f"Block diversion {self.id} because of hit message {self.params['notAfterMsgId']}")
|
|
self.disabled = True # never run it and allow following timeouts/no_responses to run
|
|
return False
|
|
|
|
r = await self.method(story,
|
|
direction.msgFrom if direction else None,
|
|
direction.msgTo if direction else None,
|
|
direction if direction else None)
|
|
if r:
|
|
if self.type != 'repeat' and self.type !='interrupt':
|
|
# repeat diversion should be usable infinte times
|
|
self.hasHit = True
|
|
|
|
story.addToLog(self)
|
|
return r
|
|
|
|
def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True, timeoutDuration = .5, replyContainsDurations = None):
|
|
"""
|
|
The finishes of this diversion's strand should point to the return message
|
|
with the right timeout/timing. If hit, this direction should also notify
|
|
this diversion.
|
|
|
|
replyContainsDurations: list formatted as in JSON
|
|
[{
|
|
"minReplyDuration": "0",
|
|
"waitTime": "3"
|
|
}]
|
|
"""
|
|
self.counter +=1
|
|
# story.logger.warn(f"CREATING DIRECTIONS FOR {startMsg.id}")
|
|
finishMessageIds = story.getFinishesForMsg(startMsg)
|
|
finalTimeoutDuration = timeoutDuration
|
|
finalContainsDurations = replyContainsDurations
|
|
#: :type story: Story
|
|
#: :type originalDirection: Direction
|
|
# story.directionsPerMsg[story.currentMessage.id]
|
|
# take the timeouts that are on the current message, and apply it to our return
|
|
# as to have somewhat equal pace as to where we originate from
|
|
if inheritTiming:
|
|
for originalDirection in story.getCurrentDirections():
|
|
# if originalDirection:
|
|
for condition in originalDirection.conditions:
|
|
if condition.type == 'timeout':
|
|
finalTimeoutDuration = float(condition.vars['seconds'])
|
|
if condition.type == 'replyContains':
|
|
finalContainsDurations = json.loads(condition.originalJsonString)['vars']['delays']
|
|
|
|
i = 0
|
|
# story.logger.warn(f"FINISHES: {finishMessageIds}")
|
|
for msgId in finishMessageIds:
|
|
# Some very ugly hack to add a direction & condition
|
|
i+=1
|
|
msg = story.get(msgId)
|
|
if not msg:
|
|
continue
|
|
|
|
direction = Direction(f"{self.id}-{i}-{self.counter}", msg, returnMsg)
|
|
data = json.loads(f"""
|
|
{{
|
|
"@id": "{self.id}-ct{i}-{self.counter}",
|
|
"@type": "Condition",
|
|
"type": "timeout",
|
|
"label": "Autogenerated Timeout",
|
|
"vars": {{
|
|
"seconds": "{finalTimeoutDuration}"
|
|
}}
|
|
}}
|
|
""")
|
|
data['vars']['onlyIfNoReply'] = finalContainsDurations is not None
|
|
|
|
# TODO: also at replycontains if it exists, with the same timings
|
|
condition = Condition.initFromJson(data, story)
|
|
direction.addCondition(condition)
|
|
|
|
if finalContainsDurations is not None:
|
|
data2 = json.loads(f"""
|
|
{{
|
|
"@id": "{self.id}-cr{i}-{self.counter}",
|
|
"@type": "Condition",
|
|
"type": "replyContains",
|
|
"label": "Autogenerated Reply Contains",
|
|
"vars": {{
|
|
"regex": "",
|
|
"instantMatch": false
|
|
}} }}
|
|
""")
|
|
data2['vars']['delays'] = finalContainsDurations
|
|
condition2 = Condition.initFromJson(data2, story)
|
|
direction.addCondition(condition2)
|
|
story.add(condition2)
|
|
|
|
direction.isDiversionReturn = True # will clear the currentDiversion on story
|
|
story.logger.info(f"Created direction: {direction.id} {condition.id} with timeout {finalTimeoutDuration}s")
|
|
story.add(condition)
|
|
story.add(direction)
|
|
# story.logger.warn(f"ADDED DIRECTION {direction.id}")
|
|
|
|
|
|
|
|
async def finalise(self, story):
|
|
""""
|
|
Only used if the Diversion sets the story.currentDiversion
|
|
"""
|
|
story.logger.info("end of diversion")
|
|
if not self.finaliseMethod:
|
|
story.logger.info(f"No finalisation for diversion {self.id}")
|
|
story.currentDiversion = None
|
|
return False
|
|
|
|
await self.finaliseMethod(story)
|
|
story.currentDiversion = None
|
|
return True
|
|
|
|
async def _divergeIfNoResponse(self, story, msgFrom, msgTo, direction):
|
|
"""
|
|
Participant doesn't speak for x consecutive replies (has had timeout)
|
|
"""
|
|
':type story: Story'
|
|
if story.currentDiversion or not msgFrom or not msgTo:
|
|
return False
|
|
|
|
if story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']):
|
|
story.stats['diversions']['no_response'] += 1
|
|
msg = story.get(self.params['msgId'])
|
|
if msg is None:
|
|
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
|
|
return
|
|
|
|
story.logger.info(f"Diverge: No response {self.id} {story.stats}")
|
|
|
|
self.returnMessage = msgTo
|
|
|
|
if self.params['returnAfterStrand']:
|
|
self.createReturnDirectionsTo(story, msg, msgTo, direction)
|
|
|
|
await story.setCurrentMessage(msg)
|
|
story.currentDiversion = self
|
|
return True
|
|
|
|
return
|
|
|
|
async def _returnAfterNoResponse(self, story):
|
|
story.logger.info(f"Finalise diversion: {self.id}")
|
|
story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging
|
|
# if self.params['returnAfterStrand']:
|
|
# await story.setCurrentMessage(self.returnMessage)
|
|
|
|
async def _divergeIfReplyContains(self, story, msgFrom, msgTo, _):
|
|
"""
|
|
Participant doesn't speak for x consecutive replies (has had timeout)
|
|
"""
|
|
':type story: Story'
|
|
# use story.currentReply.getTimeSinceLastUtterance() > 2
|
|
if story.currentDiversion: # or not msgFrom or not msgTo:
|
|
# don't do nested diversions
|
|
return False
|
|
|
|
if self.hasHit:
|
|
# don't match twice
|
|
return
|
|
|
|
if story.currentReply is None or not self.regex:
|
|
return
|
|
|
|
direction = story.getDefaultDirectionForMsg(story.currentMessage)
|
|
if not direction:
|
|
# ignore the direction argument, and only check if the current message has a valid default
|
|
return
|
|
|
|
msgTo = direction.msgTo
|
|
|
|
if not direction:
|
|
return
|
|
|
|
waitTime = 1.8 if 'waitTime' not in self.params else float(self.params['waitTime'])
|
|
timeSince = story.currentReply.getTimeSinceLastUtterance()
|
|
if timeSince < waitTime:
|
|
story.logger.log(LOG_BS, f"Waiting for replyContains: {timeSince} (needs {waitTime})")
|
|
return
|
|
|
|
r = self.regex.search(story.currentReply.getText())
|
|
if r is None:
|
|
return
|
|
|
|
if 'notForColor' in self.params and self.params['notForColor'] and story.currentMessage.color:
|
|
if story.currentMessage.color.lower() == self.params['notForColor'].lower():
|
|
story.logger.debug(f"Skip diversion {self.id} because of section color")
|
|
return
|
|
|
|
story.logger.info(f"Diverge: reply contains {self.id}")
|
|
story.stats['diversions']['reply_contains'] += 1
|
|
|
|
msg = story.get(self.params['msgId'])
|
|
if msg is None:
|
|
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
|
|
return
|
|
|
|
# TODO: pick the direction with timeout as next Message.
|
|
self.returnMessage = msgTo
|
|
|
|
if self.params['returnAfterStrand']:
|
|
self.createReturnDirectionsTo(story, msg, msgTo, direction)
|
|
|
|
await story.setCurrentMessage(msg)
|
|
story.currentDiversion = self
|
|
return True
|
|
|
|
async def _returnAfterReplyContains(self, story):
|
|
story.logger.info(f"Finalise diversion: {self.id}")
|
|
# if self.params['returnAfterStrand']:
|
|
# await story.setCurrentMessage(self.returnMessage)
|
|
|
|
async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo, direction):
|
|
"""
|
|
Participant asks if message can be repeated.
|
|
"""
|
|
# if not msgFrom or not msgTo:
|
|
# return
|
|
|
|
# TODO: how to handle this now we sometimes use different timings.
|
|
# Perhaps set isFinished when matching condition.
|
|
if story.currentReply is None or story.currentReply.getTimeSinceLastUtterance() > 1:
|
|
return
|
|
|
|
r = self.regex.search(story.currentReply.getText())
|
|
if r is None:
|
|
return
|
|
|
|
logger.info(f"Diverge: request repeat {self.id}")
|
|
story.stats['diversions']['repeat'] += 1
|
|
await story.setCurrentMessage(story.currentMessage)
|
|
return True
|
|
|
|
async def _divergeIfTimeout(self, story, msgFrom, msgTo, direction):
|
|
"""
|
|
(1) last spoken at all
|
|
(2) or duration for this last reply only
|
|
"""
|
|
if story.currentDiversion:
|
|
return
|
|
|
|
if msgFrom or msgTo:
|
|
# not applicable a direction has been chosen
|
|
return
|
|
|
|
if not story.lastMsgFinishTime:
|
|
# not during play back
|
|
return
|
|
|
|
# not applicable when timeout is set
|
|
directions = story.getCurrentDirections()
|
|
for direction in directions:
|
|
for condition in direction.conditions:
|
|
if condition.type == 'timeout':
|
|
return
|
|
|
|
now = story.timer.getElapsed()
|
|
if now - story.lastMsgFinishTime < float(self.params['minTimeAfterMessage']):
|
|
# not less than x sec after it
|
|
return
|
|
|
|
|
|
interval = float(self.params['interval'])
|
|
if not self.params['fromLastMessage']:
|
|
# (1) last spoken at all
|
|
|
|
timeSince = story.timer.getElapsed('last_speech') if story.timer.hasMark('last_speech') else story.timer.getElapsed('start')
|
|
if story.timer.hasMark('last_diversion_timeout') and story.timer.getElapsed('last_diversion_timeout') > timeSince:
|
|
timeSince = story.timer.getElapsed('last_diversion_timeout')
|
|
if timeSince < interval:
|
|
return
|
|
|
|
story.stats['diversions']['timeout_total'] += 1
|
|
else:
|
|
if story.currentMessage is None:
|
|
return
|
|
|
|
# if story.currentMessage.timeoutDiversionCount + 1
|
|
|
|
if story.currentReply is not None:
|
|
# still playing back
|
|
# or somebody has spoken already (timeout only works on silences)
|
|
return
|
|
|
|
if now - story.lastMsgFinishTime < interval:
|
|
return
|
|
|
|
story.currentMessage.timeoutDiversionCount += 1
|
|
story.stats['diversions']['timeout_last'] += 1
|
|
|
|
# if we're still here, there's a match!
|
|
story.logger.info(f"Diverge: Timeout {self.id} of {self.params['interval']}")
|
|
story.stats['diversions']['timeout'] += 1
|
|
|
|
msg = story.get(self.params['msgId'])
|
|
if msg is None:
|
|
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
|
|
return
|
|
|
|
# fall back to the currentMessage to return on.
|
|
# TODO: maybe, if not chapter is found, the diversion should be
|
|
# blocked alltogether?
|
|
self.returnMessage = story.getNextChapterForMsg(story.currentMessage, False) or story.currentMessage
|
|
|
|
|
|
if self.params['returnAfterStrand']:
|
|
# no direction is here, as this diversion triggers before a direction is taken
|
|
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False)
|
|
|
|
await story.setCurrentMessage(msg, allowReplyInterrupt=True)
|
|
story.currentDiversion = self
|
|
story.timer.setMark('last_diversion_timeout')
|
|
return True
|
|
|
|
async def _returnAfterTimeout(self, story):
|
|
story.logger.info(f"Finalise diversion: {self.id}")
|
|
|
|
|
|
async def _divergeIfInterrupted(self, story, msgFrom, msgTo, direction):
|
|
"""
|
|
This is here as a placeholder for the interruption diversion.
|
|
These will however be triggered differently
|
|
"""
|
|
|
|
if story.currentDiversion or story.allowReplyInterrupt:
|
|
return False
|
|
|
|
msg = story.get(self.params['msgId'])
|
|
if msg is None:
|
|
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
|
|
return
|
|
|
|
self.returnMessage = story.currentMessage
|
|
# no direction is here, as this diversion triggers before a direction is taken
|
|
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False, timeoutDuration=3, replyContainsDurations = [{
|
|
"minReplyDuration": "0",
|
|
"waitTime": "2"
|
|
}])
|
|
await story.setCurrentMessage(msg)
|
|
story.currentDiversion = self
|
|
|
|
return True
|
|
|
|
class Configuration(object):
|
|
id = 'configuration'
|
|
volume = 1 # Volume multiplier for 'play' command
|
|
nothing_text = "nothing" # When variable is not set, but used in sentence, replace it with this word.
|
|
|
|
@classmethod
|
|
def initFromJson(configClass, data, story):
|
|
config = Configuration()
|
|
config.__dict__.update(data)
|
|
return config
|
|
|
|
|
|
storyClasses = {
|
|
'Msg': Message,
|
|
'Direction': Direction,
|
|
'Condition': Condition,
|
|
'Diversion': Diversion,
|
|
'Configuration': Configuration,
|
|
}
|
|
|
|
|
|
class Stopwatch(object):
|
|
"""
|
|
Keep track of elapsed time. Use multiple markers, but a single pause/resume button
|
|
"""
|
|
def __init__(self):
|
|
self.isRunning = asyncio.Event()
|
|
self.reset()
|
|
|
|
def getElapsed(self, since_mark='start'):
|
|
t = time.time()
|
|
if self.paused_at != 0:
|
|
pause_duration = t - self.paused_at
|
|
else:
|
|
pause_duration = 0
|
|
return t - self.marks[since_mark] - pause_duration
|
|
|
|
def pause(self):
|
|
self.paused_at = time.time()
|
|
self.isRunning.clear()
|
|
|
|
def resume(self):
|
|
if self.paused_at == 0:
|
|
return
|
|
|
|
pause_duration = time.time() - self.paused_at
|
|
for m in self.marks:
|
|
self.marks[m] += pause_duration
|
|
|
|
self.paused_at = 0
|
|
self.isRunning.set()
|
|
|
|
def reset(self):
|
|
self.marks = {}
|
|
self.setMark('start')
|
|
self.paused_at = 0
|
|
self.isRunning.set()
|
|
|
|
def setMark(self, name):
|
|
self.marks[name] = time.time()
|
|
|
|
def hasMark(self, name):
|
|
return name in self.marks
|
|
|
|
def clearMark(self, name):
|
|
if name in self.marks:
|
|
self.marks.pop(name)
|
|
|
|
def __getstate__(self):
|
|
# print(f'get stopwatch')
|
|
state = self.__dict__.copy()
|
|
state['isRunning'] = self.isRunning.is_set()
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
self.__dict__.update(state)
|
|
|
|
self.isRunning = asyncio.Event()
|
|
if 'isRunning' in state and state['isRunning']:
|
|
self.isRunning.set()
|
|
else:
|
|
self.isRunning.clear()
|
|
|
|
|
|
class StoryState(object):
|
|
"""
|
|
Because Story not only contains state, but also logic/control variables, we need
|
|
a separate class to keep track of the state of things. This way, we can recreate
|
|
the exact state in which a story was before.
|
|
"""
|
|
msgLog = []
|
|
currentMessage = None
|
|
currentDiversion = None
|
|
currentReply = None
|
|
allowReplyInterrupt = False
|
|
timer = Stopwatch()
|
|
isRunning = False
|
|
|
|
lastMsgTime = None
|
|
lastSpeechStartTime = None
|
|
lastSpeechEndTime = None
|
|
variableValues = {} # captured variables from replies
|
|
finish_time = False
|
|
|
|
events = [] # queue of received events
|
|
commands = [] # queue of commands to send
|
|
log = [] # all nodes/elements that are triggered
|
|
msgLog = []
|
|
|
|
stats = {
|
|
'timeouts': 0,
|
|
'silentTimeouts': 0,
|
|
'consecutiveSilentTimeouts': 0,
|
|
'diversions': {
|
|
'no_response': 0,
|
|
'repeat': 0,
|
|
'reply_contains': 0,
|
|
'timeout': 0,
|
|
'timeout_total': 0,
|
|
'timeout_last': 0
|
|
}
|
|
}
|
|
|
|
def __init__(self):
|
|
pass
|
|
#
|
|
|
|
class Story(object):
|
|
"""Story represents and manages a story/narrative flow"""
|
|
# TODO should we separate 'narrative' (the graph) from the story (the
|
|
# current user flow)
|
|
|
|
def __init__(self, hugvey_state, panopticon_port):
|
|
super(Story, self).__init__()
|
|
self.hugvey = hugvey_state
|
|
self.panopticon_port = panopticon_port
|
|
|
|
self.events = [] # queue of received events
|
|
self.commands = [] # queue of commands to send
|
|
self.log = [] # all nodes/elements that are triggered
|
|
self.msgLog = [] # hit messages
|
|
self.logger = mainLogger.getChild(f"{self.hugvey.id}").getChild("story")
|
|
self.currentMessage = None
|
|
self.currentDiversion = None
|
|
self.currentReply = None
|
|
self.allowReplyInterrupt = False
|
|
self.timer = Stopwatch()
|
|
self.isRunning = False
|
|
self.diversions = []
|
|
self.interruptionDiversions = []
|
|
self.variables = {}
|
|
|
|
def pause(self):
|
|
self.logger.debug('pause hugvey')
|
|
self.timer.pause()
|
|
|
|
def resume(self):
|
|
self.logger.debug('resume hugvey')
|
|
self.timer.resume()
|
|
|
|
def getLogSummary(self):
|
|
summary = {
|
|
# e[0]: the entity, e[1]: the logged time
|
|
'messages': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Message)],
|
|
'directions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Direction)],
|
|
'diversions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Diversion)],
|
|
}
|
|
# print(self.log)
|
|
return summary
|
|
|
|
def getLogCounts(self):
|
|
return {
|
|
'messages': len([1 for e in self.log if isinstance(e[0], Message)]),
|
|
'diversions': len([1 for e in self.log if isinstance(e[0], Diversion)]),
|
|
}
|
|
|
|
def registerVariable(self, variableName, message):
|
|
if variableName not in self.variables:
|
|
self.variables[variableName] = [message]
|
|
else:
|
|
self.variables[variableName].append(message)
|
|
|
|
def setVariableValue(self, name, value):
|
|
if name not in self.variables:
|
|
self.logger.warn(f"Set variable that is not needed in the story: {name}")
|
|
else:
|
|
self.logger.debug(f"Set variable {name} to {value}")
|
|
|
|
self.variableValues[name] = value
|
|
|
|
if name not in self.variables:
|
|
return
|
|
|
|
for message in self.variables[name]:
|
|
message.setVariable(name, value)
|
|
|
|
def hasVariableSet(self, name) -> bool:
|
|
return name in self.variableValues and self.variableValues is not None
|
|
|
|
def setStoryData(self, story_data, language_code):
|
|
"""
|
|
Parse self.data into a working story engine
|
|
"""
|
|
self.data = story_data
|
|
self.language_code = language_code
|
|
|
|
# keep to be able to reset it in the end
|
|
currentId = self.currentMessage.id if self.currentMessage else None
|
|
|
|
self.elements = {}
|
|
self.strands = {}
|
|
self.diversions = []
|
|
self.interruptionDiversions = []
|
|
self.directionsPerMsg = {}
|
|
self.startMessage = None # The entrypoint to the graph
|
|
self.variables = {}
|
|
self.reset()
|
|
|
|
for el in self.data:
|
|
try:
|
|
className = storyClasses[el['@type']]
|
|
obj = className.initFromJson(el, self)
|
|
self.add(obj)
|
|
except Exception as e:
|
|
self.logger.critical(f"Error loading story element: {el}")
|
|
self.logger.exception(e)
|
|
raise e
|
|
|
|
# self.logger.debug(self.elements)
|
|
# self.logger.debug(self.directionsPerMsg)
|
|
|
|
self.diversions = [el for el in self.elements.values() if type(el) == Diversion]
|
|
self.interruptionDiversions = [el for el in self.elements.values() if type(el) == Diversion and el.type == 'interrupt']
|
|
configurations = [el for el in self.elements.values() if type(el) == Configuration]
|
|
self.configuration = configurations[0] if len(configurations) else Configuration()
|
|
|
|
if currentId:
|
|
self.currentMessage = self.get(currentId)
|
|
if self.currentMessage:
|
|
self.logger.info(
|
|
f"Reinstantiated current message: {self.currentMessage.id}")
|
|
else:
|
|
self.logger.warn(
|
|
"Could not reinstatiate current message. Starting over")
|
|
|
|
# Register variables
|
|
for msg in self.getMessages():
|
|
# print(msg.id, msg.hasVariables())
|
|
if not msg.hasVariables():
|
|
continue
|
|
|
|
for var in msg.variables:
|
|
self.registerVariable(var, msg)
|
|
|
|
|
|
self.logger.info(f'has variables: {self.variables}')
|
|
self.logger.info(f'has {len(self.strands)} strands: {self.strands}')
|
|
self.calculateFinishesForStrands()
|
|
|
|
def reset(self):
|
|
self.timer.reset()
|
|
# self.startTime = time.time()
|
|
# currently active message, determines active listeners etc.
|
|
self.currentMessage = None
|
|
self.currentDiversion = None
|
|
self.lastMsgTime = None
|
|
self.lastSpeechStartTime = None
|
|
self.lastSpeechEndTime = None
|
|
self.variableValues = {} # captured variables from replies
|
|
self.finish_time = False
|
|
|
|
self.events = [] # queue of received events
|
|
self.commands = [] # queue of commands to send
|
|
self.log = [] # all nodes/elements that are triggered
|
|
self.msgLog = []
|
|
self.currentReply = None
|
|
|
|
self.stats = {
|
|
'timeouts': 0,
|
|
'silentTimeouts': 0,
|
|
'consecutiveSilentTimeouts': 0,
|
|
'diversions': {
|
|
'no_response': 0,
|
|
'repeat': 0,
|
|
'reply_contains': 0,
|
|
'timeout': 0,
|
|
'timeout_total': 0,
|
|
'timeout_last': 0
|
|
}
|
|
}
|
|
|
|
for msg in self.getMessages():
|
|
pass
|
|
|
|
def add(self, obj):
|
|
if obj.id in self.elements:
|
|
# print(obj)
|
|
raise Exception("Duplicate id for ''".format(obj.id))
|
|
|
|
self.elements[obj.id] = obj
|
|
|
|
if type(obj) == Diversion:
|
|
self.diversions.append(obj)
|
|
|
|
if type(obj) == Message:
|
|
if obj.isStart:
|
|
#confusingly, isStart is 'beginning' in the story json file
|
|
self.startMessage = obj
|
|
if obj.isStrandStart:
|
|
self.strands[obj.id] = []
|
|
|
|
if type(obj) == Direction:
|
|
if obj.msgFrom.id not in self.directionsPerMsg:
|
|
self.directionsPerMsg[obj.msgFrom.id] = []
|
|
self.directionsPerMsg[obj.msgFrom.id].append(obj)
|
|
|
|
def get(self, id):
|
|
"""
|
|
Get a story element by its id
|
|
"""
|
|
if id in self.elements:
|
|
return self.elements[id]
|
|
return None
|
|
|
|
def getMessages(self):
|
|
return [el for el in self.elements.values() if type(el) == Message]
|
|
|
|
def stop(self):
|
|
self.logger.info("Stop Story")
|
|
if self.isRunning:
|
|
self.isRunning = False
|
|
|
|
def shutdown(self):
|
|
self.stop()
|
|
self.hugvey = None
|
|
|
|
async def _processPendingEvents(self):
|
|
# Gather events:
|
|
nr = len(self.events)
|
|
for i in range(nr):
|
|
e = self.events.pop(0)
|
|
self.logger.debug("handle '{}'".format(e))
|
|
if e['event'] == "exit":
|
|
self.stop()
|
|
if e['event'] == 'connect':
|
|
# a client connected. Should only happen in the beginning or in case of error
|
|
# that is, until we have a 'reset' or 'start' event.
|
|
# reinitiate current message
|
|
await self.setCurrentMessage(self.currentMessage)
|
|
|
|
if e['event'] == "playbackStart":
|
|
if e['msgId'] != self.currentMessage.id:
|
|
continue
|
|
self.lastMsgStartTime = self.timer.getElapsed()
|
|
self.logger.debug("Start playback")
|
|
|
|
|
|
if e['event'] == "playbackFinish":
|
|
if e['msgId'] == self.currentMessage.id:
|
|
#TODO: migrate value to Messagage instead of Story
|
|
self.lastMsgFinishTime = self.timer.getElapsed()
|
|
self.hugvey.eventLogger.info(f"message: {self.currentMessage.id} {self.currentMessage.uuid} done")
|
|
|
|
# 2019-02-22 temporary disable listening while playing audio:
|
|
# if self.hugvey.google is not None:
|
|
# self.logger.warn("Temporary 'fix' -> resume recording?")
|
|
# self.hugvey.google.resume()
|
|
|
|
if self.currentMessage.id not in self.directionsPerMsg:
|
|
# print(self.currentDiversion)
|
|
# if self.currentDiversion is not None:
|
|
# await self.currentDiversion.finalise(self)
|
|
# else:
|
|
self.logger.info("THE END!")
|
|
self._finish()
|
|
return
|
|
|
|
if e['event'] == 'speech':
|
|
# participants speaks, reset counter
|
|
self.stats['consecutiveSilentTimeouts'] = 0
|
|
|
|
# if self.currentMessage and not self.lastMsgStartTime:
|
|
if self.currentMessage and not self.lastMsgFinishTime:
|
|
# Ignore incoming speech events until we receive a 'playbackStart' event.
|
|
# After that moment the mic will be muted, so nothing should come in _anyway_
|
|
# unless google is really slow on us. But by taking the start time we don't ignore
|
|
# messages that come in, in the case google is faster than our playbackFinish event.
|
|
# (if this setup doesn't work, try to test on self.lastMsgFinish time anyway)
|
|
# it keeps tricky with all these run conditions
|
|
# if len(self.interruptionDiversions) and not self.currentDiversion and not self.allowReplyInterrupt:
|
|
# self.logger.warn("diverge when speech during playing message")
|
|
# diversion = random.choice(self.interruptionDiversions)
|
|
# #: :type diversion: Diversion
|
|
# r = await diversion.divergeIfNeeded(self, None)
|
|
# print(r) # is always needed :-)
|
|
# else:
|
|
self.logger.info("ignore speech during playing message")
|
|
continue
|
|
|
|
|
|
# log if somebody starts speaking
|
|
if self.currentReply is None:
|
|
self.logger.info("Start speaking")
|
|
self.currentReply= Reply(self.currentMessage)
|
|
|
|
now = self.timer.getElapsed()
|
|
utterance = self.currentReply.getActiveUtterance(now)
|
|
# The 'is_final' from google sometimes comes 2 sec after finishing speaking
|
|
# therefore, we ignore the timing of this transcription if something has been said already
|
|
if e['is_final'] and utterance.hasText():
|
|
self.logger.debug(f'ignore timing: {now} use {utterance.lastUpdateTime}')
|
|
utterance.setText(e['transcript'], utterance.lastUpdateTime)
|
|
else:
|
|
utterance.setText(e['transcript'], now)
|
|
|
|
self.hugvey.eventLogger.info("speaking: content {} \"{}\"".format(id(utterance), e['transcript']))
|
|
self.timer.setMark('last_speech')
|
|
|
|
if e['is_final']:
|
|
utterance.setFinished(self.timer.getElapsed())
|
|
self.hugvey.eventLogger.info("speaking: stop {}".format(id(utterance)))
|
|
|
|
if self.hugvey.recorder:
|
|
self.hugvey.recorder.updateTranscription(self.currentReply.getText())
|
|
|
|
|
|
async def _processDirections(self, directions):
|
|
':type directions: list(Direction)'
|
|
chosenDirection = None
|
|
metCondition = None
|
|
for direction in directions:
|
|
for condition in direction.conditions:
|
|
if condition.isMet(self):
|
|
self.logger.info("Condition is met: {0} ({2}), going to {1}".format(
|
|
condition.id, direction.msgTo.id, condition.type))
|
|
self.hugvey.eventLogger.info("condition: {0}".format(condition.id))
|
|
self.hugvey.eventLogger.info("direction: {0}".format(direction.id))
|
|
metCondition = condition
|
|
direction.setMetCondition(condition)
|
|
self.addToLog(condition)
|
|
self.addToLog(direction)
|
|
self.currentMessage.setFinished(self.timer.getElapsed())
|
|
chosenDirection = direction
|
|
|
|
isDiverging = await self._processDiversions(chosenDirection)
|
|
|
|
allowReplyInterrupt = False
|
|
# in some cases, conditions should be allowed to interrupt the reply
|
|
if metCondition:
|
|
if metCondition.type == 'timeout' and not ('onlyIfNoReply' in metCondition.vars and metCondition.vars['onlyIfNoReply']):
|
|
allowReplyInterrupt = True
|
|
if metCondition.usedContainsDuration is not None and metCondition.usedContainsDuration < 0.1:
|
|
allowReplyInterrupt = True
|
|
|
|
if not isDiverging and chosenDirection:
|
|
if chosenDirection.isDiversionReturn and self.currentDiversion:
|
|
await self.currentDiversion.finalise(self)
|
|
|
|
await self.setCurrentMessage(chosenDirection.msgTo, allowReplyInterrupt=allowReplyInterrupt)
|
|
|
|
return chosenDirection
|
|
|
|
async def _processDiversions(self, direction: None) -> bool:
|
|
"""
|
|
Process the diversions on stack. If diverging, return True, else False
|
|
msgFrom and msgTo contain the source and target of a headed direction if given
|
|
Else, they are None
|
|
"""
|
|
diverge = False
|
|
|
|
activeDiversions = []
|
|
activeTimeoutDiv = None
|
|
activeTimeoutLastDiv = None
|
|
activeNoResponseDiv = None
|
|
for diversion in self.diversions:
|
|
#: :type diversion: Diversion
|
|
if diversion.disabled or diversion.hasHit or diversion.type == 'interrupt':
|
|
# interruptions are triggered somewhere else.
|
|
continue
|
|
|
|
if diversion.type == 'timeout':
|
|
if diversion.params['timesOccured'] > 0:
|
|
if not diversion.params['fromLastMessage']:
|
|
# perhaps neater if we collect them in a list, and then sort by key, but this works just as well
|
|
if not activeTimeoutDiv or activeTimeoutDiv.params['timesOccured'] > diversion.params['timesOccured']:
|
|
activeTimeoutDiv = diversion
|
|
else:
|
|
if not activeTimeoutLastDiv or activeTimeoutLastDiv.params['timesOccured'] > diversion.params['timesOccured']:
|
|
activeTimeoutLastDiv = diversion
|
|
continue
|
|
|
|
if diversion.type == 'no_response':
|
|
if diversion.params['timesOccured'] > 0:
|
|
if not activeNoResponseDiv or activeNoResponseDiv.params['timesOccured'] > diversion.params['timesOccured']:
|
|
activeNoResponseDiv = diversion
|
|
continue
|
|
|
|
activeDiversions.append(diversion)
|
|
|
|
if activeTimeoutDiv:
|
|
activeDiversions.append(activeTimeoutDiv)
|
|
if activeTimeoutLastDiv:
|
|
activeDiversions.append(activeTimeoutLastDiv)
|
|
if activeNoResponseDiv:
|
|
activeDiversions.append(activeNoResponseDiv)
|
|
|
|
for diversion in activeDiversions:
|
|
# TODO: collect diversions and order by times + timesOccured (for timeout & no_response)
|
|
d = await diversion.divergeIfNeeded(self, direction)
|
|
if d:
|
|
diverge = True
|
|
return diverge
|
|
|
|
def addToLog(self, node):
|
|
self.log.append((node, self.timer.getElapsed()))
|
|
|
|
if isinstance(node, Message):
|
|
self.msgLog.append(node)
|
|
|
|
if self.hugvey.recorder:
|
|
if isinstance(node, Message):
|
|
self.hugvey.recorder.log('hugvey', node.text, node.id)
|
|
if isinstance(node, Diversion):
|
|
self.hugvey.recorder.log('diversion',node.id)
|
|
if isinstance(node, Condition):
|
|
self.hugvey.recorder.log('condition',node.logInfo, node.id)
|
|
|
|
def logHasMsg(self, node):
|
|
return node in self.msgLog
|
|
|
|
async def _renderer(self):
|
|
"""
|
|
every 1/10 sec. determine what needs to be done based on the current story state
|
|
"""
|
|
loopDuration = 0.1 # Configure fps
|
|
lastTime = time.time()
|
|
self.logger.debug("Start renderer")
|
|
while self.isRunning:
|
|
if self.isRunning is False:
|
|
break
|
|
|
|
# pause on timer paused
|
|
await self.timer.isRunning.wait() # wait for un-pause
|
|
|
|
for i in range(len(self.events)):
|
|
await self._processPendingEvents()
|
|
|
|
directions = self.getCurrentDirections()
|
|
await self._processDirections(directions)
|
|
|
|
# TODO create timer event
|
|
# self.commands.append({'msg':'TEST!'})
|
|
|
|
# Test stability of Central Command with deliberate crash
|
|
# if self.timer.getElapsed() > 10:
|
|
# raise Exception("Test exception")
|
|
if not self.timer.hasMark('state_save') or self.timer.getElapsed('state_save') > 5:
|
|
self.storeState()
|
|
self.timer.setMark('state_save')
|
|
|
|
|
|
# wait for next iteration to avoid too high CPU
|
|
t = time.time()
|
|
await asyncio.sleep(max(0, loopDuration - (t - lastTime)))
|
|
lastTime = t
|
|
|
|
self.logger.debug("Stop renderer")
|
|
|
|
async def setCurrentMessage(self, message, useReply = None, allowReplyInterrupt = False):
|
|
"""
|
|
Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption.
|
|
"""
|
|
if self.currentMessage and not self.lastMsgFinishTime:
|
|
self.logger.info("Interrupt playback {}".format(self.currentMessage.id))
|
|
self.hugvey.eventLogger.info("interrupt")
|
|
# message is playing
|
|
self.hugvey.sendCommand({
|
|
'action': 'stop',
|
|
'id': self.currentMessage.id,
|
|
})
|
|
|
|
|
|
message.uuid = shortuuid.uuid()
|
|
self.currentMessage = message
|
|
self.lastMsgTime = time.time()
|
|
self.lastMsgFinishTime = None # to be filled in by the event
|
|
self.lastMsgStartTime = None # to be filled in by the event
|
|
self.allowReplyInterrupt = allowReplyInterrupt
|
|
|
|
# if not reset:
|
|
self.previousReply = self.currentReply # we can use this for interrptions
|
|
self.currentReply = useReply #self.currentMessage.reply
|
|
|
|
# send command to already mute mic
|
|
self.hugvey.sendCommand({
|
|
'action': 'prepare',
|
|
'id': message.id
|
|
})
|
|
|
|
# else:
|
|
# # if we press 'save & play', it should not remember it's last reply to that msg
|
|
# self.previousReply = self.currentReply # we can use this for interrptions
|
|
# self.currentReply = self.currentMessage.reply
|
|
|
|
self.logger.info("Current message: ({0}) \"{1}\"".format(
|
|
message.id, message.getText()))
|
|
self.addToLog(message)
|
|
self.hugvey.eventLogger.info(f"message: {message.id} {message.uuid} start \"{message.getText()}\"")
|
|
|
|
# TODO: prep events & timer etc.
|
|
fn = await message.getAudioFilePath()
|
|
|
|
# get duration of audio file, so the client can detect a hang of 'play'
|
|
try:
|
|
duration = sox.file_info.duration(fn)
|
|
except Exception as e:
|
|
self.hugvey.eventLogger.critical(f"error: crash when reading wave file: {fn}")
|
|
self.logger.critical(f"error: crash when reading wave file: {fn}")
|
|
self.logger.exception(e)
|
|
duration = 10 # some default duration to have something to fall back to
|
|
|
|
params = message.getParams().copy()
|
|
params['vol'] = params['vol'] * self.configuration.volume if 'vol' in params else self.configuration.volume
|
|
|
|
# self.hugvey.google.pause() # pause STT to avoid text events while decision is made
|
|
self.hugvey.sendCommand({
|
|
'action': 'play',
|
|
'file': fn,
|
|
'id': message.id,
|
|
'params': params,
|
|
'duration': duration
|
|
})
|
|
|
|
# 2019-02-22 temporary disable listening while playing audio:
|
|
# if self.hugvey.google is not None:
|
|
# self.logger.warn("Temporary 'fix' -> stop recording")
|
|
# self.hugvey.google.pause()
|
|
|
|
logmsg = "Pending directions:"
|
|
|
|
for direction in self.getCurrentDirections():
|
|
conditions = [c.id for c in direction.conditions]
|
|
logmsg += "\n- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions)
|
|
|
|
self.logger.log(LOG_BS,logmsg)
|
|
self.storeState()
|
|
|
|
def getCurrentDirections(self):
|
|
if self.currentMessage.id not in self.directionsPerMsg:
|
|
return []
|
|
else:
|
|
return self.directionsPerMsg[self.currentMessage.id]
|
|
|
|
def getNextChapterForMsg(self, msg, canIncludeSelf = True, depth = 0):
|
|
if canIncludeSelf and msg.chapterStart:
|
|
self.logger.info(f"Next chapter: {msg.id}")
|
|
return msg
|
|
|
|
if depth >= 70:
|
|
# protection against infinite loop?
|
|
return None
|
|
|
|
if msg.id not in self.directionsPerMsg:
|
|
return None
|
|
|
|
for direction in self.directionsPerMsg[msg.id]:
|
|
r = self.getNextChapterForMsg(direction.msgTo, True, depth+1)
|
|
if r:
|
|
return r
|
|
# none found
|
|
return None
|
|
|
|
async def run(self, customStartMsgId = None, resuming = False):
|
|
self.logger.info("Starting story")
|
|
if not resuming:
|
|
self.hugvey.eventLogger.info("story: start")
|
|
self.timer.reset()
|
|
self.isRunning = True
|
|
if customStartMsgId is not None:
|
|
startMsg = self.get(customStartMsgId)
|
|
else:
|
|
startMsg = self.startMessage
|
|
await self.setCurrentMessage(startMsg)
|
|
else:
|
|
self.hugvey.eventLogger.info(f"story: resume from {self.currentMessage}")
|
|
self.isRunning = True
|
|
if not self.lastMsgFinishTime and self.currentMessage:
|
|
await self.setCurrentMessage(self.currentMessage)
|
|
|
|
await self._renderer()
|
|
|
|
def isFinished(self):
|
|
if hasattr(self, 'finish_time') and self.finish_time:
|
|
return time.time() - self.finish_time
|
|
|
|
return False
|
|
|
|
def _finish(self):
|
|
"""
|
|
Finish story and set hugvey to the right state
|
|
"""
|
|
self.finish()
|
|
#stop google etc:
|
|
self.hugvey.available()
|
|
|
|
def finish(self):
|
|
"""
|
|
Finish only the story
|
|
"""
|
|
self.logger.info(f"Finished story for {self.hugvey.id}")
|
|
self.hugvey.eventLogger.info("story: finished")
|
|
self.stop()
|
|
self.finish_time = time.time()
|
|
self.timer.pause()
|
|
|
|
def calculateFinishesForMsg(self, msgId, depth = 0, checked = []):
|
|
# if msgId in checked:
|
|
# return []
|
|
#
|
|
# checked.append(msgId)
|
|
|
|
if not msgId in self.directionsPerMsg or len(self.directionsPerMsg[msgId]) < 1:
|
|
# is finish
|
|
return [msgId]
|
|
|
|
if depth > 100:
|
|
return []
|
|
|
|
finishes = []
|
|
for d in self.directionsPerMsg[msgId]:
|
|
if d.msgTo.id == msgId:
|
|
continue
|
|
finishes.extend(self.calculateFinishesForMsg(d.msgTo.id, depth+1, checked))
|
|
|
|
# de-duplicate before returning
|
|
return list(set(finishes))
|
|
|
|
def calculateFinishesForStrands(self):
|
|
for startMsgId in self.strands:
|
|
msg = self.get(startMsgId) #: :type msg: Message
|
|
if msg.isStart:
|
|
# ignore for the beginning
|
|
continue
|
|
|
|
self.logger.log(LOG_BS, f"Get finishes for {startMsgId}")
|
|
self.strands[startMsgId] = self.calculateFinishesForMsg(startMsgId)
|
|
|
|
self.logger.log(LOG_BS, f"Finishes: {self.strands}")
|
|
|
|
def getFinishesForMsg(self, msg):
|
|
"""
|
|
Find the end of strands
|
|
|
|
Most often they will be 'start's so to speed up these are pre-calculated
|
|
Others can be calculated on the spot
|
|
|
|
returns message ids
|
|
"""
|
|
print(msg.id, self.strands)
|
|
if msg.id in self.strands:
|
|
return self.strands[msg.id]
|
|
|
|
return self.calculateFinishesForMsg(msg.id)
|
|
|
|
def getDefaultDirectionForMsg(self, msg):
|
|
"""
|
|
There is only a default direction (for reply contains diversion) if it has
|
|
one, and only one, direction to go. If there's more, it should do nothing.
|
|
"""
|
|
if not msg.id in self.directionsPerMsg:
|
|
# is finish
|
|
return None
|
|
|
|
if len(self.directionsPerMsg[msg.id]) > 1:
|
|
return None
|
|
|
|
# TODO: should the direction have at least a timeout condition set, or not perse?
|
|
return self.directionsPerMsg[msg.id][0]
|
|
|
|
@classmethod
|
|
def getStateDir(self):
|
|
return "/tmp"
|
|
# day = time.strftime("%Y%m%d")
|
|
# t = time.strftime("%H:%M:%S")
|
|
#
|
|
# self.out_folder = os.path.join(self.main_folder, day, f"{self.hv_id}", t)
|
|
# if not os.path.exists(self.out_folder):
|
|
# self.logger.debug(f"Create directory {self.out_folder}")
|
|
# self.target_folder = os.makedirs(self.out_folder, exist_ok=True)
|
|
|
|
@classmethod
|
|
def getStateFilename(cls, hv_id):
|
|
return os.path.join(cls.getStateDir(), f"hugvey{hv_id}")
|
|
|
|
def storeState(self):
|
|
# TODO: stop stopwatch
|
|
fn = self.getStateFilename(self.hugvey.id)
|
|
tmpfn = fn + '.tmp'
|
|
self.stateSave = time.time()
|
|
with open(tmpfn, 'wb') as fp:
|
|
pickle.dump(self, fp)
|
|
# write atomic to disk: flush, close, rename
|
|
fp.flush()
|
|
os.fsync(fp.fileno())
|
|
|
|
os.rename(tmpfn, fn)
|
|
self.logger.debug(f"saved state to {fn}")
|
|
|
|
def hasSavedState(self):
|
|
return self.hugveyHasSavedState(self.hugvey.id)
|
|
|
|
@classmethod
|
|
def hugveyHasSavedState(cls, hv_id):
|
|
return os.path.exists(cls.getStateFilename(hv_id))
|
|
|
|
@classmethod
|
|
def loadStoryFromState(cls, hugvey_state):
|
|
# restart stopwatch
|
|
with open(cls.getStateFilename(hugvey_state.id), 'rb') as fp:
|
|
story = pickle.load(fp)
|
|
|
|
story.hugvey = hugvey_state
|
|
story.logger = mainLogger.getChild(f"{story.hugvey.id}").getChild("story")
|
|
return story
|
|
# TODO: take running state etc.
|
|
|
|
@classmethod
|
|
def clearSavedState(cls, hv_id):
|
|
fn = cls.getStateFilename(hv_id)
|
|
if os.path.exists(fn):
|
|
os.unlink(fn)
|
|
mainLogger.info(f"Removed state: {fn}")
|
|
#
|
|
def __getstate__(self):
|
|
# Copy the object's state from self.__dict__ which contains
|
|
# all our instance attributes. Always use the dict.copy()
|
|
# method to avoid modifying the original state.
|
|
state = self.__dict__.copy()
|
|
|
|
# Remove the unpicklable entries.
|
|
del state['hugvey']
|
|
del state['logger']
|
|
# del state['isRunning']
|
|
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
self.__dict__.update(state)
|