import json import time import logging import re import asyncio from .communication import LOG_BS logger = logging.getLogger("narrative") class Utterance(object): """Part of a reply""" def __init__(self, startTime): self.startTime = startTime self.endTime = None self.text = "" def setText(self, text): self.text = text def setFinished(self, endTime): self.endTime = endTime def isFinished(self): return self.endTime is not None class Message(object): def __init__(self, id, text): self.id = id self.text = text self.isStart = False self.reply = None # self.replyTime = None self.audioFile= None self.interruptCount = 0 self.afterrunTime = 0. # the time after this message to allow for interrupts self.finishTime = None # message can be finished without finished utterance (with instant replycontains) @classmethod def initFromJson(message, data, story): msg = message(data['@id'], data['text']) msg.isStart = data['start'] if 'start' in data else False msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0. if 'audio' in data: msg.audioFile = data['audio']['file'] return msg def setReply(self, reply): self.reply = reply def hasReply(self): return self.reply is not None def getReply(self): if not self.hasReply(): raise Exception( "Getting reply while there is none! {0}".format(self.id)) return self.reply def isFinished(self): return self.finishTime is not None def setFinished(self, currentTime): self.finishTime = currentTime def getFinishedTime(self): return self.finishTime def getLogSummary(self): return { 'id': self.id, 'time': None if self.reply is None else [u.startTime for u in self.reply.utterances], 'replyText': None if self.reply is None else [u.text for u in self.reply.utterances] } class Reply(object): def __init__(self, message: Message): self.forMessage = None self.utterances = [] self.setForMessage(message) def setForMessage(self, message: Message): self.forMessage = message message.setReply(self) def getLastUtterance(self) -> Utterance: if not self.hasUtterances(): return None return self.utterances[-1] def getFirstUtterance(self) -> Utterance: if not self.hasUtterances(): return None return self.utterances[0] def hasUtterances(self) -> bool: return len(self.utterances) > 0 def addUtterance(self, utterance: Utterance): self.utterances.append(utterance) def getText(self) -> str: return ". ".join([u.text for u in self.utterances]) def getActiveUtterance(self, currentTime) -> Utterance: """ If no utterance is active, create a new one. Otherwise return non-finished utterance for update """ if len(self.utterances) < 1 or self.getLastUtterance().isFinished(): u = Utterance(currentTime) self.addUtterance(u) else: u = self.getLastUtterance() return u def isSpeaking(self): u = self.getLastUtterance() if u is not None and not u.isFinished(): return True return False class Condition(object): """ A condition, basic conditions are built in, custom condition can be given by providing a custom method. """ def __init__(self, id): self.id = id self.method = None self.vars = {} @classmethod def initFromJson(conditionClass, data, story): condition = conditionClass(data['@id']) # TODO: should Condition be subclassed? if data['type'] == "replyContains": condition.method = condition._hasMetReplyContains if data['type'] == "timeout": condition.method = condition._hasMetTimeout if 'vars' in data: condition.vars = data['vars'] return condition def isMet(self, story): """ Validate if condition is met for the current story state """ return self.method(story) def _hasMetTimeout(self, story): now = time.time() # check if the message already finished playing if not story.lastMsgFinishTime: return False return now - story.lastMsgFinishTime >= float(self.vars['seconds']) def _hasMetReplyContains(self, story) -> bool: """ Check the reply for specific characteristics: - regex: regular expression. If empy, just way for isFinished() - delays: an array of [{'minReplyDuration', 'waitTime'},...] - minReplyDuration: the nr of seconds the reply should take. Preferably have one with 0 - waitTime: the time to wait after isFinished() before continuing """ r = story.currentReply # make sure we keep working with the same object if not r or not r.hasUtterances(): return False if 'regex' in self.vars and len(self.vars['regex']): if 'regexCompiled' not in self.vars: # Compile once, as we probably run it more than once self.vars['regexCompiled'] = re.compile(self.vars['regex']) t = story.currentReply.getText().lower() logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t)) result = self.vars['regexCompiled'].search(t) if result is None: #if there is something to match, but not found, it's never ok return False logger.debug('Got match on {}'.format(self.vars['regex'])) results = result.groupdict() for captureGroup in results: story.variables[captureGroup] = results[captureGroup] # TODO: implement 'instant match' -> don't wait for isFinished() if r.isSpeaking(): logger.log(LOG_BS, "is speaking") return False # print(self.vars) # either there's a match, or nothing to match at all if 'delays' in self.vars: replyDuration = story.timer.getElapsed() - r.getFirstUtterance().endTime timeSinceReply = story.timer.getElapsed() - r.getLastUtterance().endTime delays = sorted(self.vars['delays'], key=lambda k: float(k['minReplyDuration']), reverse=True) for delay in delays: if replyDuration > float(delay['minReplyDuration']): logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {delay['waitTime']}") if timeSinceReply > float(delay['waitTime']): return True break # don't check other delays # wait for delay to match return False # There is a match and no delay say, person finished speaking. Go ahead sir! return True def getLogSummary(self): return { 'id': self.id } class Direction(object): """ A condition based edge in the story graph """ def __init__(self, id, msgFrom: Message, msgTo: Message): self.id = id self.msgFrom = msgFrom self.msgTo = msgTo self.conditions = [] self.conditionMet = None def addCondition(self, condition: Condition): self.conditions.append(condition) def setMetCondition(self, condition: Condition): self.conditionMet = condition @classmethod def initFromJson(direction, data, story): msgFrom = story.get(data['source']) msgTo = story.get(data['target']) direction = direction(data['@id'], msgFrom, msgTo) if 'conditions' in data: for conditionId in data['conditions']: c = story.get(conditionId) direction.addCondition(c) return direction def getLogSummary(self): return { 'id': self.id, 'condition': self.conditionMet.id if self.conditionMet else None } class Diversion(object): """ An Diversion. Used to catch events outside of story flow. """ def __init__(self, id): self.id = id self.conditions = [] def addCondition(self, condition: Condition): self.conditions.append(condition) @classmethod def initFromJson(diversionClass, data, story): diversion = diversionClass(data['@id']) if 'conditions' in data: for conditionId in data['conditions']: c = story.get(conditionId) diversion.addCondition(c) return diversion def getLogSummary(self): return { 'id': self.id, # 'time': self.replyTime } storyClasses = { 'Msg': Message, 'Direction': Direction, 'Condition': Condition, 'Diversion': Diversion, } class Stopwatch(object): """ Keep track of elapsed time. Use multiple markers, but a single pause/resume button """ def __init__(self): self.isRunning = asyncio.Event() self.reset() def getElapsed(self, since_mark='start'): t = time.time() if self.paused_at != 0: pause_duration = t - self.paused_at else: pause_duration = 0 return t - self.marks[since_mark] - pause_duration def pause(self): self.paused_at = time.time() self.isRunning.clear() def resume(self): if self.paused_at == 0: return pause_duration = time.time() - self.paused_at for m in self.marks: self.marks[m] += pause_duration self.paused_at = 0 self.isRunning.set() def reset(self): self.marks = {} self.setMark('start') self.paused_at = 0 self.isRunning.set() def setMark(self, name): self.marks[name] = time.time() def clearMark(self, name): if name in self.marks: self.marks.pop(name) class Story(object): """Story represents and manages a story/narrative flow""" # TODO should we separate 'narrative' (the graph) from the story (the # current user flow) def __init__(self, hugvey_state): super(Story, self).__init__() self.hugvey = hugvey_state self.events = [] # queue of received events self.commands = [] # queue of commands to send self.log = [] # all nodes/elements that are triggered self.currentMessage = None self.currentReply = None self.timer = Stopwatch() self.isRunning = False def pause(self): logger.debug('pause hugvey') self.timer.pause() def resume(self): logger.debug('resume hugvey') self.timer.resume() def getLogSummary(self): summary = { # e[0]: the entity, e[1]: the logged time 'messages': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Message)], 'directions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Direction)], 'diversions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Diversion)], } # print(self.log) return summary def setStoryData(self, story_data): """ Parse self.data into a working story engine """ self.data = story_data # keep to be able to reset it in the end currentId = self.currentMessage.id if self.currentMessage else None self.elements = {} self.diversions = [] self.directionsPerMsg = {} self.startMessage = None # The entrypoint to the graph self.reset() for el in self.data: className = storyClasses[el['@type']] obj = className.initFromJson(el, self) self.add(obj) logger.debug(self.elements) logger.debug(self.directionsPerMsg) if currentId: self.currentMessage = self.get(currentId) if self.currentMessage: logger.info( f"Reinstantiated current message: {self.currentMessage.id}") else: logger.warn( "Could not reinstatiate current message. Starting over") def reset(self): self.timer.reset() # self.startTime = time.time() # currently active message, determines active listeners etc. self.currentMessage = None self.lastMsgTime = None self.lastSpeechStartTime = None self.lastSpeechEndTime = None self.variables = {} # captured variables from replies self.finish_time = False self.events = [] # queue of received events self.commands = [] # queue of commands to send self.log = [] # all nodes/elements that are triggered self.currentReply = None def add(self, obj): if obj.id in self.elements: # print(obj) raise Exception("Duplicate id for ''".format(obj.id)) if type(obj) == Message and obj.isStart: self.startMessage = obj self.elements[obj.id] = obj if type(obj) == Diversion: self.diversions.append(obj) if type(obj) == Direction: if obj.msgFrom.id not in self.directionsPerMsg: self.directionsPerMsg[obj.msgFrom.id] = [] self.directionsPerMsg[obj.msgFrom.id].append(obj) def get(self, id): """ Get a story element by its id """ if id in self.elements: return self.elements[id] return None def stop(self): logger.info("Stop Story") if self.isRunning: self.isRunning = False def _processPendingEvents(self): # Gather events: nr = len(self.events) for i in range(nr): e = self.events.pop(0) logger.info("handle '{}'".format(e)) if e['event'] == "exit": self.stop() if e['event'] == 'connect': # a client connected. Should only happen in the beginning or in case of error # that is, until we have a 'reset' or 'start' event. # reinitiate current message self.setCurrentMessage(self.currentMessage) if e['event'] == "playbackFinish": if e['msgId'] == self.currentMessage.id: self.lastMsgFinishTime = time.time() if self.currentMessage.id not in self.directionsPerMsg: logger.info("THE END!") self.stop() return if e['event'] == 'speech': # message is still playing: if self.currentMessage and not self.lastMsgFinishTime and self.previousReply and self.previousReply.forMessage.interruptCount < 4: timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime() if self.previousReply.forMessage.afterrunTime > timeDiff: #interrupt only in given interval: logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id)) self.currentReply = self.previousReply self.previousReply.forMessage.interruptCount += 1 self.currentMessage = self.setCurrentMessage(self.previousReply.forMessage) # log if somebody starts speaking # TODO: implement interrupt if self.currentReply is None: self.currentReply= Reply(self.currentMessage) utterance = self.currentReply.getActiveUtterance(self.timer.getElapsed()) utterance.setText(e['transcript']) if e['is_final']: utterance.setFinished(self.timer.getElapsed()) def _processDirections(self, directions): for direction in directions: for condition in direction.conditions: if condition.isMet(self): logger.info("Condition is met: {0}, going to {1}".format( condition.id, direction.msgTo.id)) direction.setMetCondition(condition) self.addToLog(condition) self.addToLog(direction) self.currentMessage.setFinished(self.timer.getElapsed()) self.setCurrentMessage(direction.msgTo) return direction def addToLog(self, node): self.log.append((node, self.timer.getElapsed())) async def _renderer(self): """ every 1/10 sec. determine what needs to be done based on the current story state """ loopDuration = 0.1 # Configure fps lastTime = time.time() logger.info("Start renderer") while self.isRunning: if self.isRunning is False: break # pause on timer paused await self.timer.isRunning.wait() # wait for un-pause for i in range(len(self.events)): self._processPendingEvents() if self.currentMessage.id not in self.directionsPerMsg: self.finish() directions = self.getCurrentDirections() self._processDirections(directions) # TODO create timer event # self.commands.append({'msg':'TEST!'}) # wait for next iteration to avoid too high CPU t = time.time() await asyncio.sleep(max(0, loopDuration - (t - lastTime))) lastTime = t logger.info("Stop renderer") def setCurrentMessage(self, message): if self.currentMessage and not self.lastMsgFinishTime: logger.info("Interrupt playback {}".format(self.currentMessage.id)) # message is playing self.hugvey.sendCommand({ 'action': 'stop', 'id': self.currentMessage.id, }) self.currentMessage = message self.lastMsgTime = time.time() self.lastMsgFinishTime = None # to be filled in by the event self.previousReply = self.currentReply # we can use this for interrptions self.currentReply = self.currentMessage.reply logger.info("Current message: ({0}) \"{1}\"".format( message.id, message.text)) self.addToLog(message) # TODO: prep events & timer etc. if message.audioFile: self.hugvey.sendCommand({ 'action': 'play', 'file': message.audioFile, 'id': message.id, }) else: self.hugvey.sendCommand({ 'action': 'play', 'msg': message.text, 'id': message.id, }) logger.debug("Pending directions: ") for direction in self.getCurrentDirections(): conditions = [c.id for c in direction.conditions] logger.debug( "- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions)) def getCurrentDirections(self): if self.currentMessage.id not in self.directionsPerMsg: return [] else: return self.directionsPerMsg[self.currentMessage.id] async def run(self): logger.info("Starting story") self.timer.reset() self.isRunning = True self.setCurrentMessage(self.startMessage) await self._renderer() def isFinished(self): if hasattr(self, 'finish_time') and self.finish_time: return time.time() - self.finish_time return False def finish(self): logger.info(f"Finished story for {self.hugvey.id}") self.hugvey.pause() self.finish_time = time.time() self.timer.pause()