686 lines
23 KiB
Python
686 lines
23 KiB
Python
import json
|
|
import time
|
|
import logging
|
|
import re
|
|
import asyncio
|
|
import urllib.parse
|
|
from .communication import LOG_BS
|
|
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
|
|
|
|
|
|
logger = logging.getLogger("narrative")
|
|
|
|
class Utterance(object):
|
|
"""Part of a reply"""
|
|
def __init__(self, startTime):
|
|
self.startTime = startTime
|
|
self.endTime = None
|
|
self.text = ""
|
|
|
|
def setText(self, text):
|
|
self.text = text
|
|
|
|
def setFinished(self, endTime):
|
|
self.endTime = endTime
|
|
|
|
def isFinished(self):
|
|
return self.endTime is not None
|
|
|
|
|
|
|
|
class Message(object):
|
|
def __init__(self, id, text):
|
|
self.id = id
|
|
self.text = text
|
|
self.isStart = False
|
|
self.reply = None
|
|
# self.replyTime = None
|
|
self.audioFile= None
|
|
self.interruptCount = 0
|
|
self.afterrunTime = 0. # the time after this message to allow for interrupts
|
|
self.finishTime = None # message can be finished without finished utterance (with instant replycontains)
|
|
self.parseForVariables()
|
|
|
|
@classmethod
|
|
def initFromJson(message, data, story):
|
|
msg = message(data['@id'], data['text'])
|
|
msg.isStart = data['start'] if 'start' in data else False
|
|
msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0.
|
|
if 'audio' in data:
|
|
msg.audioFile = data['audio']['file']
|
|
return msg
|
|
|
|
def parseForVariables(self):
|
|
"""
|
|
Find variables in text
|
|
"""
|
|
self.variables = re.findall('\$(\w+)', self.text)
|
|
|
|
def hasVariables(self) -> bool:
|
|
return len(self.variables) > 0
|
|
|
|
def setReply(self, reply):
|
|
self.reply = reply
|
|
|
|
def hasReply(self):
|
|
return self.reply is not None
|
|
|
|
def getReply(self):
|
|
if not self.hasReply():
|
|
raise Exception(
|
|
"Getting reply while there is none! {0}".format(self.id))
|
|
|
|
return self.reply
|
|
|
|
def isFinished(self):
|
|
return self.finishTime is not None
|
|
|
|
def setFinished(self, currentTime):
|
|
self.finishTime = currentTime
|
|
|
|
def getFinishedTime(self):
|
|
return self.finishTime
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id,
|
|
'time': None if self.reply is None else [u.startTime for u in self.reply.utterances],
|
|
'replyText': None if self.reply is None else [u.text for u in self.reply.utterances]
|
|
}
|
|
|
|
async def getAudioFilePath(self,story):
|
|
client = AsyncHTTPClient()
|
|
queryString = urllib.parse.urlencode({
|
|
'text': self.text,
|
|
'filename': 1,
|
|
'variable': 1 if self.hasVariables() else 0
|
|
})
|
|
request = HTTPRequest(
|
|
url = f"http://localhost:{story.panopticon_port}/voice?{queryString}",
|
|
method="GET"
|
|
)
|
|
logger.log(LOG_BS, request.url)
|
|
response = await client.fetch(request)
|
|
if response.code != 200:
|
|
logger.critical(f"Error when fetching filename: {response.code} for {queryString}")
|
|
return None
|
|
|
|
return response.body.decode().strip()
|
|
|
|
|
|
class Reply(object):
|
|
def __init__(self, message: Message):
|
|
self.forMessage = None
|
|
self.utterances = []
|
|
self.setForMessage(message)
|
|
|
|
def setForMessage(self, message: Message):
|
|
self.forMessage = message
|
|
message.setReply(self)
|
|
|
|
def getLastUtterance(self) -> Utterance:
|
|
if not self.hasUtterances():
|
|
return None
|
|
return self.utterances[-1]
|
|
|
|
def getFirstUtterance(self) -> Utterance:
|
|
if not self.hasUtterances():
|
|
return None
|
|
return self.utterances[0]
|
|
|
|
def hasUtterances(self) -> bool:
|
|
return len(self.utterances) > 0
|
|
|
|
def addUtterance(self, utterance: Utterance):
|
|
self.utterances.append(utterance)
|
|
|
|
def getText(self) -> str:
|
|
return ". ".join([u.text for u in self.utterances])
|
|
|
|
def getActiveUtterance(self, currentTime) -> Utterance:
|
|
"""
|
|
If no utterance is active, create a new one. Otherwise return non-finished utterance for update
|
|
"""
|
|
if len(self.utterances) < 1 or self.getLastUtterance().isFinished():
|
|
u = Utterance(currentTime)
|
|
self.addUtterance(u)
|
|
else:
|
|
u = self.getLastUtterance()
|
|
return u
|
|
|
|
def isSpeaking(self):
|
|
u = self.getLastUtterance()
|
|
if u is not None and not u.isFinished():
|
|
return True
|
|
return False
|
|
|
|
class Condition(object):
|
|
"""
|
|
A condition, basic conditions are built in, custom condition can be given by
|
|
providing a custom method.
|
|
"""
|
|
|
|
def __init__(self, id):
|
|
self.id = id
|
|
self.method = None
|
|
self.vars = {}
|
|
|
|
@classmethod
|
|
def initFromJson(conditionClass, data, story):
|
|
condition = conditionClass(data['@id'])
|
|
# TODO: should Condition be subclassed?
|
|
if data['type'] == "replyContains":
|
|
condition.method = condition._hasMetReplyContains
|
|
if data['type'] == "timeout":
|
|
condition.method = condition._hasMetTimeout
|
|
|
|
if 'vars' in data:
|
|
condition.vars = data['vars']
|
|
|
|
return condition
|
|
|
|
def isMet(self, story):
|
|
"""
|
|
Validate if condition is met for the current story state
|
|
"""
|
|
return self.method(story)
|
|
|
|
def _hasMetTimeout(self, story):
|
|
now = story.timer.getElapsed()
|
|
# check if the message already finished playing
|
|
if not story.lastMsgFinishTime:
|
|
return False
|
|
|
|
if 'onlyIfNoReply' in self.vars and self.vars['onlyIfNoReply']:
|
|
if story.currentReply and story.currentReply.hasUtterances():
|
|
logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}')
|
|
# 'onlyIfNoReply': only use this timeout if participants doesn't speak.
|
|
return False
|
|
# else:
|
|
# logger.debug('Only if no reply has no text yet!')
|
|
|
|
return now - story.lastMsgFinishTime >= float(self.vars['seconds'])
|
|
|
|
def _hasMetReplyContains(self, story) -> bool:
|
|
"""
|
|
Check the reply for specific characteristics:
|
|
- regex: regular expression. If empy, just way for isFinished()
|
|
- delays: an array of [{'minReplyDuration', 'waitTime'},...]
|
|
- minReplyDuration: the nr of seconds the reply should take. Preferably have one with 0
|
|
- waitTime: the time to wait after isFinished() before continuing
|
|
"""
|
|
r = story.currentReply # make sure we keep working with the same object
|
|
if not r or not r.hasUtterances():
|
|
return False
|
|
|
|
if 'regex' in self.vars and len(self.vars['regex']):
|
|
if 'regexCompiled' not in self.vars:
|
|
# Compile once, as we probably run it more than once
|
|
self.vars['regexCompiled'] = re.compile(self.vars['regex'])
|
|
|
|
t = story.currentReply.getText().lower()
|
|
logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t))
|
|
result = self.vars['regexCompiled'].search(t)
|
|
if result is None:
|
|
#if there is something to match, but not found, it's never ok
|
|
return False
|
|
logger.debug('Got match on {}'.format(self.vars['regex']))
|
|
results = result.groupdict()
|
|
for captureGroup in results:
|
|
story.variableValues[captureGroup] = results[captureGroup]
|
|
|
|
if 'instantMatch' in self.vars and self.vars['instantMatch']:
|
|
logger.info(f"Instant match on {self.vars['regex']}, {self.vars}")
|
|
return True
|
|
# TODO: implement 'instant match' -> don't wait for isFinished()
|
|
|
|
if r.isSpeaking():
|
|
logger.log(LOG_BS, "is speaking")
|
|
return False
|
|
|
|
# print(self.vars)
|
|
# either there's a match, or nothing to match at all
|
|
if 'delays' in self.vars:
|
|
if story.lastMsgFinishTime is None:
|
|
return False
|
|
# time between finishing playback and ending of speaking:
|
|
replyDuration = r.getLastUtterance().endTime - story.lastMsgFinishTime
|
|
delays = sorted(self.vars['delays'], key=lambda k: float(k['minReplyDuration']), reverse=True)
|
|
for delay in delays:
|
|
if replyDuration > float(delay['minReplyDuration']):
|
|
timeSinceReply = story.timer.getElapsed() - r.getLastUtterance().endTime
|
|
logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {delay['waitTime']}")
|
|
if timeSinceReply > float(delay['waitTime']):
|
|
return True
|
|
break # don't check other delays
|
|
# wait for delay to match
|
|
return False
|
|
|
|
# There is a match and no delay say, person finished speaking. Go ahead sir!
|
|
return True
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id
|
|
}
|
|
|
|
class Direction(object):
|
|
"""
|
|
A condition based edge in the story graph
|
|
"""
|
|
|
|
def __init__(self, id, msgFrom: Message, msgTo: Message):
|
|
self.id = id
|
|
self.msgFrom = msgFrom
|
|
self.msgTo = msgTo
|
|
self.conditions = []
|
|
self.conditionMet = None
|
|
|
|
def addCondition(self, condition: Condition):
|
|
self.conditions.append(condition)
|
|
|
|
def setMetCondition(self, condition: Condition):
|
|
self.conditionMet = condition
|
|
|
|
@classmethod
|
|
def initFromJson(direction, data, story):
|
|
msgFrom = story.get(data['source'])
|
|
msgTo = story.get(data['target'])
|
|
direction = direction(data['@id'], msgFrom, msgTo)
|
|
if 'conditions' in data:
|
|
for conditionId in data['conditions']:
|
|
c = story.get(conditionId)
|
|
direction.addCondition(c)
|
|
return direction
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id,
|
|
'condition': self.conditionMet.id if self.conditionMet else None
|
|
}
|
|
|
|
|
|
class Diversion(object):
|
|
"""
|
|
An Diversion. Used to catch events outside of story flow.
|
|
"""
|
|
|
|
def __init__(self, id):
|
|
self.id = id
|
|
self.conditions = []
|
|
|
|
def addCondition(self, condition: Condition):
|
|
self.conditions.append(condition)
|
|
|
|
@classmethod
|
|
def initFromJson(diversionClass, data, story):
|
|
diversion = diversionClass(data['@id'])
|
|
if 'conditions' in data:
|
|
for conditionId in data['conditions']:
|
|
c = story.get(conditionId)
|
|
diversion.addCondition(c)
|
|
return diversion
|
|
|
|
|
|
def getLogSummary(self):
|
|
return {
|
|
'id': self.id,
|
|
# 'time': self.replyTime
|
|
}
|
|
|
|
|
|
storyClasses = {
|
|
'Msg': Message,
|
|
'Direction': Direction,
|
|
'Condition': Condition,
|
|
'Diversion': Diversion,
|
|
}
|
|
|
|
|
|
class Stopwatch(object):
|
|
"""
|
|
Keep track of elapsed time. Use multiple markers, but a single pause/resume button
|
|
"""
|
|
def __init__(self):
|
|
self.isRunning = asyncio.Event()
|
|
self.reset()
|
|
|
|
def getElapsed(self, since_mark='start'):
|
|
t = time.time()
|
|
if self.paused_at != 0:
|
|
pause_duration = t - self.paused_at
|
|
else:
|
|
pause_duration = 0
|
|
return t - self.marks[since_mark] - pause_duration
|
|
|
|
def pause(self):
|
|
self.paused_at = time.time()
|
|
self.isRunning.clear()
|
|
|
|
def resume(self):
|
|
if self.paused_at == 0:
|
|
return
|
|
|
|
pause_duration = time.time() - self.paused_at
|
|
for m in self.marks:
|
|
self.marks[m] += pause_duration
|
|
|
|
self.paused_at = 0
|
|
self.isRunning.set()
|
|
|
|
def reset(self):
|
|
self.marks = {}
|
|
self.setMark('start')
|
|
self.paused_at = 0
|
|
self.isRunning.set()
|
|
|
|
def setMark(self, name):
|
|
self.marks[name] = time.time()
|
|
|
|
|
|
def clearMark(self, name):
|
|
if name in self.marks:
|
|
self.marks.pop(name)
|
|
|
|
class Story(object):
|
|
"""Story represents and manages a story/narrative flow"""
|
|
# TODO should we separate 'narrative' (the graph) from the story (the
|
|
# current user flow)
|
|
|
|
def __init__(self, hugvey_state, panopticon_port):
|
|
super(Story, self).__init__()
|
|
self.hugvey = hugvey_state
|
|
self.panopticon_port = panopticon_port
|
|
|
|
self.events = [] # queue of received events
|
|
self.commands = [] # queue of commands to send
|
|
self.log = [] # all nodes/elements that are triggered
|
|
self.currentMessage = None
|
|
self.currentReply = None
|
|
self.timer = Stopwatch()
|
|
self.isRunning = False
|
|
self.variables = {}
|
|
|
|
def pause(self):
|
|
logger.debug('pause hugvey')
|
|
self.timer.pause()
|
|
|
|
def resume(self):
|
|
logger.debug('resume hugvey')
|
|
self.timer.resume()
|
|
|
|
def getLogSummary(self):
|
|
summary = {
|
|
# e[0]: the entity, e[1]: the logged time
|
|
'messages': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Message)],
|
|
'directions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Direction)],
|
|
'diversions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Diversion)],
|
|
}
|
|
# print(self.log)
|
|
return summary
|
|
|
|
def registerVariable(self, variableName, message):
|
|
if variableName not in self.variables:
|
|
self.variables[variableName] = [message]
|
|
else:
|
|
self.variables[variableName].append(message)
|
|
|
|
def setStoryData(self, story_data):
|
|
"""
|
|
Parse self.data into a working story engine
|
|
"""
|
|
self.data = story_data
|
|
|
|
# keep to be able to reset it in the end
|
|
currentId = self.currentMessage.id if self.currentMessage else None
|
|
|
|
self.elements = {}
|
|
self.diversions = []
|
|
self.directionsPerMsg = {}
|
|
self.startMessage = None # The entrypoint to the graph
|
|
self.variables = {}
|
|
self.reset()
|
|
|
|
for el in self.data:
|
|
className = storyClasses[el['@type']]
|
|
obj = className.initFromJson(el, self)
|
|
self.add(obj)
|
|
|
|
logger.debug(self.elements)
|
|
logger.debug(self.directionsPerMsg)
|
|
|
|
if currentId:
|
|
self.currentMessage = self.get(currentId)
|
|
if self.currentMessage:
|
|
logger.info(
|
|
f"Reinstantiated current message: {self.currentMessage.id}")
|
|
else:
|
|
logger.warn(
|
|
"Could not reinstatiate current message. Starting over")
|
|
|
|
def reset(self):
|
|
self.timer.reset()
|
|
# self.startTime = time.time()
|
|
# currently active message, determines active listeners etc.
|
|
self.currentMessage = None
|
|
self.lastMsgTime = None
|
|
self.lastSpeechStartTime = None
|
|
self.lastSpeechEndTime = None
|
|
self.variableValues = {} # captured variables from replies
|
|
self.finish_time = False
|
|
|
|
self.events = [] # queue of received events
|
|
self.commands = [] # queue of commands to send
|
|
self.log = [] # all nodes/elements that are triggered
|
|
self.currentReply = None
|
|
|
|
for msg in self.getMessages():
|
|
pass
|
|
|
|
def add(self, obj):
|
|
if obj.id in self.elements:
|
|
# print(obj)
|
|
raise Exception("Duplicate id for ''".format(obj.id))
|
|
|
|
if type(obj) == Message and obj.isStart:
|
|
self.startMessage = obj
|
|
|
|
self.elements[obj.id] = obj
|
|
|
|
if type(obj) == Diversion:
|
|
self.diversions.append(obj)
|
|
|
|
if type(obj) == Direction:
|
|
if obj.msgFrom.id not in self.directionsPerMsg:
|
|
self.directionsPerMsg[obj.msgFrom.id] = []
|
|
self.directionsPerMsg[obj.msgFrom.id].append(obj)
|
|
|
|
def get(self, id):
|
|
"""
|
|
Get a story element by its id
|
|
"""
|
|
if id in self.elements:
|
|
return self.elements[id]
|
|
return None
|
|
def getMessages(self):
|
|
return [el for el in self.elements if type(el) == Message]
|
|
|
|
def stop(self):
|
|
logger.info("Stop Story")
|
|
if self.isRunning:
|
|
self.isRunning = False
|
|
|
|
async def _processPendingEvents(self):
|
|
# Gather events:
|
|
nr = len(self.events)
|
|
for i in range(nr):
|
|
e = self.events.pop(0)
|
|
logger.info("handle '{}'".format(e))
|
|
if e['event'] == "exit":
|
|
self.stop()
|
|
if e['event'] == 'connect':
|
|
# a client connected. Should only happen in the beginning or in case of error
|
|
# that is, until we have a 'reset' or 'start' event.
|
|
# reinitiate current message
|
|
await self.setCurrentMessage(self.currentMessage)
|
|
|
|
if e['event'] == "playbackFinish":
|
|
if e['msgId'] == self.currentMessage.id:
|
|
#TODO: migrate value to Messagage instead of Story
|
|
self.lastMsgFinishTime = self.timer.getElapsed()
|
|
|
|
# 2019-02-22 temporary disable listening while playing audio:
|
|
if self.hugvey.google is not None:
|
|
logger.warn("Temporary 'fix' -> resume recording?")
|
|
self.hugvey.google.resume()
|
|
|
|
if self.currentMessage.id not in self.directionsPerMsg:
|
|
logger.info("THE END!")
|
|
self.stop()
|
|
return
|
|
|
|
if e['event'] == 'speech':
|
|
# message is still playing:
|
|
if self.currentMessage and not self.lastMsgFinishTime and self.previousReply and self.previousReply.forMessage.interruptCount < 4:
|
|
timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime()
|
|
if self.previousReply.forMessage.afterrunTime > timeDiff:
|
|
#interrupt only in given interval:
|
|
logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id))
|
|
self.currentReply = self.previousReply
|
|
self.previousReply.forMessage.interruptCount += 1
|
|
self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage)
|
|
|
|
# log if somebody starts speaking
|
|
# TODO: implement interrupt
|
|
if self.currentReply is None:
|
|
self.currentReply= Reply(self.currentMessage)
|
|
|
|
utterance = self.currentReply.getActiveUtterance(self.timer.getElapsed())
|
|
utterance.setText(e['transcript'])
|
|
|
|
if e['is_final']:
|
|
utterance.setFinished(self.timer.getElapsed())
|
|
|
|
|
|
async def _processDirections(self, directions):
|
|
for direction in directions:
|
|
for condition in direction.conditions:
|
|
if condition.isMet(self):
|
|
logger.info("Condition is met: {0}, going to {1}".format(
|
|
condition.id, direction.msgTo.id))
|
|
direction.setMetCondition(condition)
|
|
self.addToLog(condition)
|
|
self.addToLog(direction)
|
|
self.currentMessage.setFinished(self.timer.getElapsed())
|
|
await self.setCurrentMessage(direction.msgTo)
|
|
return direction
|
|
|
|
def addToLog(self, node):
|
|
self.log.append((node, self.timer.getElapsed()))
|
|
|
|
async def _renderer(self):
|
|
"""
|
|
every 1/10 sec. determine what needs to be done based on the current story state
|
|
"""
|
|
loopDuration = 0.1 # Configure fps
|
|
lastTime = time.time()
|
|
logger.info("Start renderer")
|
|
while self.isRunning:
|
|
if self.isRunning is False:
|
|
break
|
|
|
|
# pause on timer paused
|
|
await self.timer.isRunning.wait() # wait for un-pause
|
|
|
|
for i in range(len(self.events)):
|
|
await self._processPendingEvents()
|
|
|
|
if self.currentMessage.id not in self.directionsPerMsg:
|
|
self.finish()
|
|
|
|
directions = self.getCurrentDirections()
|
|
await self._processDirections(directions)
|
|
|
|
# TODO create timer event
|
|
# self.commands.append({'msg':'TEST!'})
|
|
|
|
# wait for next iteration to avoid too high CPU
|
|
t = time.time()
|
|
await asyncio.sleep(max(0, loopDuration - (t - lastTime)))
|
|
lastTime = t
|
|
|
|
logger.info("Stop renderer")
|
|
|
|
async def setCurrentMessage(self, message):
|
|
if self.currentMessage and not self.lastMsgFinishTime:
|
|
logger.info("Interrupt playback {}".format(self.currentMessage.id))
|
|
# message is playing
|
|
self.hugvey.sendCommand({
|
|
'action': 'stop',
|
|
'id': self.currentMessage.id,
|
|
})
|
|
|
|
self.currentMessage = message
|
|
self.lastMsgTime = time.time()
|
|
self.lastMsgFinishTime = None # to be filled in by the event
|
|
|
|
# if not reset:
|
|
self.previousReply = self.currentReply # we can use this for interrptions
|
|
self.currentReply = self.currentMessage.reply
|
|
# else:
|
|
# # if we press 'save & play', it should not remember it's last reply to that msg
|
|
# self.previousReply = self.currentReply # we can use this for interrptions
|
|
# self.currentReply = self.currentMessage.reply
|
|
|
|
logger.info("Current message: ({0}) \"{1}\"".format(
|
|
message.id, message.text))
|
|
self.addToLog(message)
|
|
# TODO: prep events & timer etc.
|
|
# TODO: preload file paths if no variables are set, or once these are loaded
|
|
self.hugvey.sendCommand({
|
|
'action': 'play',
|
|
'file': await message.getAudioFilePath(self),
|
|
'id': message.id,
|
|
})
|
|
|
|
# 2019-02-22 temporary disable listening while playing audio:
|
|
if self.hugvey.google is not None:
|
|
logger.warn("Temporary 'fix' -> stop recording")
|
|
self.hugvey.google.pause()
|
|
|
|
logger.debug("Pending directions: ")
|
|
|
|
for direction in self.getCurrentDirections():
|
|
conditions = [c.id for c in direction.conditions]
|
|
logger.debug(
|
|
"- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions))
|
|
|
|
def getCurrentDirections(self):
|
|
if self.currentMessage.id not in self.directionsPerMsg:
|
|
return []
|
|
else:
|
|
return self.directionsPerMsg[self.currentMessage.id]
|
|
|
|
async def run(self, customStartMsgId = None):
|
|
logger.info("Starting story")
|
|
self.timer.reset()
|
|
self.isRunning = True
|
|
if customStartMsgId is not None:
|
|
startMsg = self.get(customStartMsgId)
|
|
else:
|
|
startMsg = self.startMessage
|
|
await self.setCurrentMessage(startMsg)
|
|
await self._renderer()
|
|
|
|
def isFinished(self):
|
|
if hasattr(self, 'finish_time') and self.finish_time:
|
|
return time.time() - self.finish_time
|
|
|
|
return False
|
|
|
|
def finish(self):
|
|
logger.info(f"Finished story for {self.hugvey.id}")
|
|
self.hugvey.pause()
|
|
self.finish_time = time.time()
|
|
self.timer.pause()
|
|
|