import json import time import logging import re import asyncio import urllib.parse from .communication import LOG_BS from tornado.httpclient import AsyncHTTPClient, HTTPRequest import uuid import shortuuid import threading import faulthandler from zmq.asyncio import Context import zmq import wave from pythonosc import udp_client mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("narrative") class Utterance(object): """Part of a reply""" def __init__(self, startTime): self.startTime = startTime self.endTime = None self.text = "" self.lastUpdateTime = startTime def setText(self, text, now): self.text = text self.lastUpdateTime = now def setFinished(self, endTime): self.endTime = endTime def isFinished(self): return self.endTime is not None class Message(object): def __init__(self, id, text): self.id = id self.text = text self.isStart = False self.chapterStart = False self.reply = None # self.replyTime = None self.audioFile= None self.filenameFetchLock = asyncio.Lock() self.interruptCount = 0 self.timeoutDiversionCount = 0 self.afterrunTime = 0. # the time after this message to allow for interrupts self.finishTime = None # message can be finished without finished utterance (with instant replycontains) self.params = {} self.variableValues = {} self.parseForVariables() self.uuid = None # Have a unique id each time the message is played back. self.color = None def setStory(self, story): self.story = story self.logger = story.logger.getChild("message") @classmethod def initFromJson(message, data, story): msg = message(data['@id'], data['text']) msg.isStart = data['beginning'] if 'beginning' in data else False msg.chapterStart = bool(data['chapterStart']) if 'chapterStart' in data else False msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0. msg.color = data['color'] if 'color' in data else None if 'audio' in data and data['audio'] is not None: msg.audioFile = data['audio']['file'] msg.setStory(story) if 'params' in data: msg.params = data['params'] if not 'vol' in msg.params: # prevent clipping on some Lyrebird tracks msg.params['vol'] = .8 return msg def parseForVariables(self): """ Find variables in text """ self.variables = re.findall('\$(\w+)', self.text) for var in self.variables: self.variableValues[var] = None def hasVariables(self) -> bool: return len(self.variables) > 0 def setVariable(self, name, value): if name not in self.variables: self.logger.critical("Set nonexisting variable") return if self.variableValues[name] == value: return self.variableValues[name] = value self.logger.warn(f"Set variable, fetch {name}") if not None in self.variableValues.values(): self.logger.warn(f"now fetch {name}") asyncio.get_event_loop().create_task(self.getAudioFilePath()) # asyncio.get_event_loop().call_soon_threadsafe(self.getAudioFilePath) self.logger.warn(f"started {name}") def getText(self): # sort reverse to avoid replacing the wrong variable self.variables.sort(key=len, reverse=True) text = self.text # self.logger.debug(f"Getting text for {self.id}") for var in self.variables: self.logger.debug(f"try replacing ${var} with {self.variableValues[var]} in {text}") replacement = self.variableValues[var] if (self.variableValues[var] is not None) else "nothing" #TODO: translate nothing to each language text = text.replace('$'+var, replacement) return text def setReply(self, reply): self.reply = reply def hasReply(self): return self.reply is not None def getReply(self): if not self.hasReply(): raise Exception( "Getting reply while there is none! {0}".format(self.id)) return self.reply def isFinished(self): return self.finishTime is not None def setFinished(self, currentTime): self.finishTime = currentTime def getFinishedTime(self): return self.finishTime def getParams(self): return self.params def getLogSummary(self): return { 'id': self.id, 'time': None if self.reply is None else [u.startTime for u in self.reply.utterances], 'text': self.getText(), 'replyText': None if self.reply is None else [u.text for u in self.reply.utterances] } async def getAudioFilePath(self): if self.audioFile is not None: return self.audioFile text = self.getText() self.logger.debug(f"Fetching audio for {text}") # return "test"; async with self.filenameFetchLock: # print(threading.enumerate()) info = { 'text': text, 'variable': True if self.hasVariables() else False } s = Context.instance().socket(zmq.REQ) #: :type s: zmq.sugar.Socket voiceAddr = f"ipc://voice{self.story.hugvey.id}" s.connect(voiceAddr) await s.send_json(info) filename = await s.recv_string() s.close() # print(threading.enumerate()) self.logger.debug(f"Fetched audio for {text}: {filename}") return filename class Reply(object): def __init__(self, message: Message): self.forMessage = None self.utterances = [] self.setForMessage(message) def setForMessage(self, message: Message): self.forMessage = message message.setReply(self) def getLastUtterance(self) -> Utterance: if not self.hasUtterances(): return None u = self.utterances[-1] #: :type u: Utterance # attempt to fix a glitch that google does not always send is_finished if u.isFinished(): return u now = self.forMessage.story.timer.getElapsed() diff = now - u.lastUpdateTime if diff > 5: # time in seconds to force silence in utterance # useful for eg. 'hello', or 'no' self.forMessage.story.logger.warn( f"Set finish time for utterance after {diff}s {u.text}" ) u.setFinished(now) return u def getFirstUtterance(self) -> Utterance: if not self.hasUtterances(): return None return self.utterances[0] def hasUtterances(self) -> bool: return len(self.utterances) > 0 def addUtterance(self, utterance: Utterance): self.utterances.append(utterance) def getText(self) -> str: return ". ".join([u.text for u in self.utterances]) def getActiveUtterance(self, currentTime) -> Utterance: """ If no utterance is active, create a new one. Otherwise return non-finished utterance for update """ if len(self.utterances) < 1 or self.getLastUtterance().isFinished(): u = Utterance(currentTime) self.addUtterance(u) else: u = self.getLastUtterance() return u def isSpeaking(self): u = self.getLastUtterance() if u is not None and not u.isFinished(): return True return False class Condition(object): """ A condition, basic conditions are built in, custom condition can be given by providing a custom method. """ def __init__(self, id): self.id = id self.method = None self.type = None self.vars = {} self.logInfo = None @classmethod def initFromJson(conditionClass, data, story): condition = conditionClass(data['@id']) condition.type = data['type'] # TODO: should Condition be subclassed? if data['type'] == "replyContains": condition.method = condition._hasMetReplyContains if data['type'] == "timeout": condition.method = condition._hasMetTimeout if data['type'] == "variable": condition.method = condition._hasVariable if data['type'] == "diversion": condition.method = condition._hasDiverged if 'vars' in data: condition.vars = data['vars'] return condition def isMet(self, story): """ Validate if condition is met for the current story state """ return self.method(story) def _hasMetTimeout(self, story): now = story.timer.getElapsed() # check if the message already finished playing if not story.lastMsgFinishTime: return False if 'onlyIfNoReply' in self.vars and self.vars['onlyIfNoReply']: if story.currentReply and story.currentReply is not None and story.currentReply.hasUtterances(): story.logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}') # 'onlyIfNoReply': only use this timeout if participants doesn't speak. return False # else: # story.logger.debug('Only if no reply has no text yet!') hasMetTimeout = now - story.lastMsgFinishTime >= float(self.vars['seconds']) if not hasMetTimeout: return False # update stats: story.stats['timeouts'] +=1 if 'needsReply' in self.vars and self.vars['needsReply'] is True: if story.currentReply is None or not story.currentReply.hasUtterances(): story.stats['silentTimeouts'] +=1 story.stats['consecutiveSilentTimeouts'] += 1 self.logInfo = "{}s".format(self.vars['seconds']) return True def _hasVariable(self, story) -> bool: if not story.lastMsgFinishTime: return False r = story.hasVariableSet(self.vars['variable']) if r: story.logger.debug(f"Variable {self.vars['variable']} is set.") if 'notSet' in self.vars and self.vars['notSet']: # inverse: r = not r self.logInfo = "Does {} have variable {}".format( 'not' if 'notSet' in self.vars and self.vars['notSet'] else '', self.vars['variable'] ) return r def _hasDiverged(self, story) -> bool: if not story.lastMsgFinishTime: return False d = story.get(self.vars['diversionId']) if not d: story.logger.critical(f"Condition on non-existing diversion: {self.vars['diversionId']}") r = d.hasHit if r: story.logger.debug(f"Diversion {self.vars['diversionId']} has been hit.") if 'inverseMatch' in self.vars and self.vars['inverseMatch']: # inverse: r = not r self.logInfo = "Has {} diverged to {}".format( 'not' if 'inverseMatch' in self.vars and self.vars['inverseMatch'] else '', self.vars['diversionId'] ) return r def _hasMetReplyContains(self, story) -> bool: """ Check the reply for specific characteristics: - regex: regular expression. If empy, just way for isFinished() - delays: an array of [{'minReplyDuration', 'waitTime'},...] - minReplyDuration: the nr of seconds the reply should take. Preferably have one with 0 - waitTime: the time to wait after isFinished() before continuing """ r = story.currentReply # make sure we keep working with the same object if not r or not r.hasUtterances(): return False capturedVariables = None if 'regex' in self.vars and len(self.vars['regex']): if 'regexCompiled' not in self.vars: # Compile once, as we probably run it more than once self.vars['regexCompiled'] = re.compile(self.vars['regex']) t = r.getText().lower() story.logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t)) result = self.vars['regexCompiled'].search(t) if result is None: #if there is something to match, but not found, it's never ok return False story.logger.debug('Got match on {}'.format(self.vars['regex'])) capturedVariables = result.groupdict() if ('instantMatch' in self.vars and self.vars['instantMatch']) or not r.isSpeaking(): # try to avoid setting variables for intermediate strings for captureGroup in capturedVariables: story.setVariableValue(captureGroup, capturedVariables[captureGroup]) if 'instantMatch' in self.vars and self.vars['instantMatch']: story.logger.info(f"Instant match on {self.vars['regex']}, {self.vars}") self.logInfo = "Instant match of {}, captured {}".format( self.vars['regex'], capturedVariables ) return True # TODO: implement 'instant match' -> don't wait for isFinished() # print(self.vars) # either there's a match, or nothing to match at all if 'delays' in self.vars: if story.lastMsgFinishTime is None: story.logger.debug("not finished playback yet") return False # time between finishing playback and ending of speaking: replyDuration = r.getLastUtterance().lastUpdateTime - story.lastMsgFinishTime # using lastUpdateTime instead of endTime delays = sorted(self.vars['delays'], key=lambda k: float(k['minReplyDuration']), reverse=True) for delay in delays: if replyDuration > float(delay['minReplyDuration']): timeSinceReply = story.timer.getElapsed() - r.getLastUtterance().lastUpdateTime story.logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {delay['waitTime']}") if timeSinceReply > float(delay['waitTime']): # if variables are captured, only set them the moment the condition matches if capturedVariables is not None: for captureGroup in capturedVariables: story.setVariableValue(captureGroup, capturedVariables[captureGroup]) self.logInfo = "Match of {}, captured {} after, {}".format( self.vars['regex'], capturedVariables, timeSinceReply ) return True break # don't check other delays # wait for delay to match story.logger.log(LOG_BS, "Wait for it...") return False # If there is a delay, it takes precedence of isSpeaking, since google does not always give an is_finished on short utterances (eg. "hello" or "no") if r.isSpeaking(): story.logger.log(LOG_BS, f"is speaking: {r.getLastUtterance().text} - {r.getLastUtterance().startTime}") return False # There is a match and no delay say, person finished speaking. Go ahead sir! self.logInfo = "Match" return True def getLogSummary(self): return { 'id': self.id } class Direction(object): """ A condition based edge in the story graph """ def __init__(self, id, msgFrom: Message, msgTo: Message): self.id = id self.msgFrom = msgFrom self.msgTo = msgTo #: :type self.conditions: list(Condition) self.conditions = [] self.conditionMet = None def addCondition(self, condition: Condition): self.conditions.append(condition) def setMetCondition(self, condition: Condition): self.conditionMet = condition @classmethod def initFromJson(direction, data, story): msgFrom = story.get(data['source']) msgTo = story.get(data['target']) direction = direction(data['@id'], msgFrom, msgTo) if 'conditions' in data: for conditionId in data['conditions']: c = story.get(conditionId) direction.addCondition(c) return direction def getLogSummary(self): return { 'id': self.id, 'condition': self.conditionMet.id if self.conditionMet else None } class Diversion(object): """ An Diversion. Used to catch events outside of story flow. Not sure why I'm not using subclasses here o:) """ def __init__(self, id, type: str, params: dict): self.id = id self.params = params self.finaliseMethod = None self.hasHit = False if type == 'no_response': self.method = self._divergeIfNoResponse self.finaliseMethod = self._returnAfterNoResponse self.counter = 0 if type == 'reply_contains': self.method = self._divergeIfReplyContains self.finaliseMethod = self._returnAfterReplyContains if len(self.params['regex']) > 0: self.regex = re.compile(self.params['regex']) else: self.regex = None if type == 'timeout': self.method = self._divergeIfTimeout self.finaliseMethod = self._returnAfterTimeout if type == 'repeat': self.method = self._divergeIfRepeatRequest self.regex = re.compile(self.params['regex']) if not self.method: raise Exception("No valid type given for diversion") @classmethod def initFromJson(diversionClass, data, story): diversion = diversionClass(data['@id'], data['type'], data['params']) return diversion def getLogSummary(self): return { 'id': self.id, } async def divergeIfNeeded(self, story, msgFrom, msgTo): """ Validate if condition is met for the current story state Returns True when diverging """ r = await self.method(story, msgFrom, msgTo) if r: self.hasHit = True story.addToLog(self) return r async def finalise(self, story): """" Only used if the Diversion sets the story.currentDiversion """ if not self.finaliseMethod: story.logger.info(f"No finalisation for diversion {self.id}") return False await self.finaliseMethod(story) return True async def _divergeIfNoResponse(self, story, msgFrom, msgTo): """ Participant doesn't speak for x consecutive replies (has had timeout) """ ':type story: Story' if story.currentDiversion or not msgFrom or not msgTo: return False if story.stats['diversions']['no_response'] + 1 == self.params['timesOccured'] and story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']): story.stats['diversions']['no_response'] += 1 msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") return story.logger.info(f"Diverge: No response {self.id} {story.stats}") self.returnMessage = msgTo await story.setCurrentMessage(msg) story.currentDiversion = self return True return async def _returnAfterNoResponse(self, story): story.logger.info(f"Finalise diversion: {self.id}") story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging if self.params['returnAfterStrand']: await story.setCurrentMessage(self.returnMessage) async def _divergeIfReplyContains(self, story, msgFrom, msgTo): """ Participant doesn't speak for x consecutive replies (has had timeout) """ ':type story: Story' if story.currentDiversion or not msgFrom or not msgTo: # don't do nested diversions # if we remove this, don't forget to double check 'returnMessage' return False if self.hasHit: # don't match twice return if story.currentReply is None or not self.regex: return r = self.regex.search(story.currentReply.getText()) if r is None: return if 'notForColor' in self.params and self.params['notForColor'] and story.currentMessage.color: if story.currentMessage.color.lower() == self.params['notForColor'].lower(): story.logger.debug(f"Skip diversion {self.id} because of section color") return story.logger.info(f"Diverge: reply contains {self.id}") story.stats['diversions']['reply_contains'] += 1 msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") return self.returnMessage = msgTo await story.setCurrentMessage(msg) story.currentDiversion = self return True async def _returnAfterReplyContains(self, story): story.logger.info(f"Finalise diversion: {self.id}") if self.params['returnAfterStrand']: await story.setCurrentMessage(self.returnMessage) async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo): """ Participant asks if message can be repeated. """ if not msgFrom or not msgTo: return # TODO: how to handle this now we sometimes use different timings. # Perhaps set isFinished when matching condition. if story.currentReply is None or story.currentReply.isSpeaking(): return r = self.regex.search(story.currentReply.getText()) if r is None: return logger.info(f"Diverge: request repeat {self.id}") story.stats['diversions']['repeat'] += 1 await story.setCurrentMessage(msgFrom) return True async def _divergeIfTimeout(self, story, msgFrom, msgTo): """ (1) last spoken at all (2) or duration for this last reply only """ if msgFrom or msgTo: # not applicable a direction has been chosen return if not story.lastMsgFinishTime: # not during play back return # not applicable when timeout is set directions = story.getCurrentDirections() for direction in directions: for condition in direction.conditions: if condition.type == 'timeout': return now = story.timer.getElapsed() if now - story.lastMsgFinishTime < float(self.params['minTimeAfterMessage']): # not less than x sec after it return interval = float(self.params['interval']) if not self.params['fromLastMessage']: # (1) last spoken at all if story.stats['diversions']['timeout_total'] + 1 != self.params['timesOccured']: return timeSince = story.timer.getElapsed('last_speech') if story.timer.hasMark('last_speech') else story.timer.getElapsed('start') if story.timer.hasMark('last_diversion_timeout') and story.timer.getElapsed('last_diversion_timeout') > timeSince: timeSince = story.timer.getElapsed('last_diversion_timeout') if timeSince < interval: return story.stats['diversions']['timeout_total'] += 1 else: if story.currentMessage is None: return # if story.currentMessage.timeoutDiversionCount + 1 if story.stats['diversions']['timeout_last'] + 1 != self.params['timesOccured']: return if story.currentReply is not None: # still playing back # or somebody has spoken already (timeout only works on silences) return if now - story.lastMsgFinishTime < interval: return story.currentMessage.timeoutDiversionCount += 1 story.stats['diversions']['timeout_last'] += 1 # if we're still here, there's a match! story.logger.info(f"Diverge: Timeout {self.id} of {self.params['interval']}") story.stats['diversions']['timeout'] += 1 msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") return # fall back to the currentMessage to return on. # TODO: maybe, if not chapter is found, the diversion should be # blocked alltogether? self.returnMessage = story.getNextChapterForMsg(story.currentMessage, False) or story.currentMessage await story.setCurrentMessage(msg) story.currentDiversion = self story.timer.setMark('last_diversion_timeout') return True async def _returnAfterTimeout(self, story): story.logger.info(f"Finalise diversion: {self.id}") if self.params['returnAfterStrand']: await story.setCurrentMessage(self.returnMessage) storyClasses = { 'Msg': Message, 'Direction': Direction, 'Condition': Condition, 'Diversion': Diversion, } class Stopwatch(object): """ Keep track of elapsed time. Use multiple markers, but a single pause/resume button """ def __init__(self): self.isRunning = asyncio.Event() self.reset() def getElapsed(self, since_mark='start'): t = time.time() if self.paused_at != 0: pause_duration = t - self.paused_at else: pause_duration = 0 return t - self.marks[since_mark] - pause_duration def pause(self): self.paused_at = time.time() self.isRunning.clear() def resume(self): if self.paused_at == 0: return pause_duration = time.time() - self.paused_at for m in self.marks: self.marks[m] += pause_duration self.paused_at = 0 self.isRunning.set() def reset(self): self.marks = {} self.setMark('start') self.paused_at = 0 self.isRunning.set() def setMark(self, name): self.marks[name] = time.time() def hasMark(self, name): return name in self.marks def clearMark(self, name): if name in self.marks: self.marks.pop(name) class Story(object): """Story represents and manages a story/narrative flow""" # TODO should we separate 'narrative' (the graph) from the story (the # current user flow) def __init__(self, hugvey_state, panopticon_port): super(Story, self).__init__() self.hugvey = hugvey_state self.panopticon_port = panopticon_port self.events = [] # queue of received events self.commands = [] # queue of commands to send self.log = [] # all nodes/elements that are triggered self.logger = mainLogger.getChild(f"{self.hugvey.id}").getChild("story") self.currentMessage = None self.currentDiversion = None self.currentReply = None self.timer = Stopwatch() self.isRunning = False self.diversions = [] self.variables = {} def pause(self): self.logger.debug('pause hugvey') self.timer.pause() def resume(self): self.logger.debug('resume hugvey') self.timer.resume() def getLogSummary(self): summary = { # e[0]: the entity, e[1]: the logged time 'messages': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Message)], 'directions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Direction)], 'diversions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Diversion)], } # print(self.log) return summary def getLogCounts(self): return { 'messages': len([1 for e in self.log if isinstance(e[0], Message)]), 'diversions': len([1 for e in self.log if isinstance(e[0], Diversion)]), } def registerVariable(self, variableName, message): if variableName not in self.variables: self.variables[variableName] = [message] else: self.variables[variableName].append(message) def setVariableValue(self, name, value): if name not in self.variables: self.logger.warn(f"Set variable that is not needed in the story: {name}") else: self.logger.debug(f"Set variable {name} to {value}") self.variableValues[name] = value if name not in self.variables: return for message in self.variables[name]: message.setVariable(name, value) def hasVariableSet(self, name) -> bool: return name in self.variableValues and self.variableValues is not None def setStoryData(self, story_data): """ Parse self.data into a working story engine """ self.data = story_data # keep to be able to reset it in the end currentId = self.currentMessage.id if self.currentMessage else None self.elements = {} self.diversions = [] self.directionsPerMsg = {} self.startMessage = None # The entrypoint to the graph self.variables = {} self.reset() for el in self.data: className = storyClasses[el['@type']] obj = className.initFromJson(el, self) self.add(obj) # self.logger.debug(self.elements) # self.logger.debug(self.directionsPerMsg) self.diversions = [el for el in self.elements.values() if type(el) == Diversion] if currentId: self.currentMessage = self.get(currentId) if self.currentMessage: self.logger.info( f"Reinstantiated current message: {self.currentMessage.id}") else: self.logger.warn( "Could not reinstatiate current message. Starting over") # Register variables for msg in self.getMessages(): # print(msg.id, msg.hasVariables()) if not msg.hasVariables(): continue for var in msg.variables: self.registerVariable(var, msg) self.logger.info(f'has variables: {self.variables}') def reset(self): self.timer.reset() # self.startTime = time.time() # currently active message, determines active listeners etc. self.currentMessage = None self.currentDiversion = None self.lastMsgTime = None self.lastSpeechStartTime = None self.lastSpeechEndTime = None self.variableValues = {} # captured variables from replies self.finish_time = False self.events = [] # queue of received events self.commands = [] # queue of commands to send self.log = [] # all nodes/elements that are triggered self.currentReply = None self.stats = { 'timeouts': 0, 'silentTimeouts': 0, 'consecutiveSilentTimeouts': 0, 'diversions': { 'no_response': 0, 'repeat': 0, 'reply_contains': 0, 'timeout': 0, 'timeout_total': 0, 'timeout_last': 0 } } for msg in self.getMessages(): pass def add(self, obj): if obj.id in self.elements: # print(obj) raise Exception("Duplicate id for ''".format(obj.id)) if type(obj) == Message and obj.isStart: self.startMessage = obj self.elements[obj.id] = obj if type(obj) == Diversion: self.diversions.append(obj) if type(obj) == Direction: if obj.msgFrom.id not in self.directionsPerMsg: self.directionsPerMsg[obj.msgFrom.id] = [] self.directionsPerMsg[obj.msgFrom.id].append(obj) def get(self, id): """ Get a story element by its id """ if id in self.elements: return self.elements[id] return None def getMessages(self): return [el for el in self.elements.values() if type(el) == Message] def stop(self): self.logger.info("Stop Story") if self.isRunning: self.isRunning = False def shutdown(self): self.stop() self.hugvey = None async def _processPendingEvents(self): # Gather events: nr = len(self.events) for i in range(nr): e = self.events.pop(0) self.logger.debug("handle '{}'".format(e)) if e['event'] == "exit": self.stop() if e['event'] == 'connect': # a client connected. Should only happen in the beginning or in case of error # that is, until we have a 'reset' or 'start' event. # reinitiate current message await self.setCurrentMessage(self.currentMessage) if e['event'] == "playbackStart": if e['msgId'] != self.currentMessage.id: continue self.lastMsgStartTime = self.timer.getElapsed() self.logger.debug("Start playback") if e['event'] == "playbackFinish": if e['msgId'] == self.currentMessage.id: #TODO: migrate value to Messagage instead of Story self.lastMsgFinishTime = self.timer.getElapsed() self.hugvey.eventLogger.info(f"message: {self.currentMessage.id} {self.currentMessage.uuid} done") # 2019-02-22 temporary disable listening while playing audio: # if self.hugvey.google is not None: # self.logger.warn("Temporary 'fix' -> resume recording?") # self.hugvey.google.resume() if self.currentMessage.id not in self.directionsPerMsg: print(self.currentDiversion) if self.currentDiversion is not None: self.logger.info("end of diversion") await self.currentDiversion.finalise(self) self.currentDiversion = None else: self.logger.info("THE END!") self._finish() return if e['event'] == 'speech': # participants speaks, reset counter self.stats['consecutiveSilentTimeouts'] = 0 # if self.currentMessage and not self.lastMsgStartTime: if self.currentMessage and not self.lastMsgFinishTime: # Ignore incoming speech events until we receive a 'playbackStart' event. # After that moment the mic will be muted, so nothing should come in _anyway_ # unless google is really slow on us. But by taking the start time we don't ignore # messages that come in, in the case google is faster than our playbackFinish event. # (if this setup doesn't work, try to test on self.lastMsgFinish time anyway) # it keeps tricky with all these run conditions self.logger.info("ignore speech while playing message") continue # message is still playing: if self.currentMessage and not self.lastMsgFinishTime and self.previousReply and self.previousReply.forMessage.interruptCount < 4: timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime() if self.previousReply.forMessage.afterrunTime > timeDiff: #interrupt only in given interval: self.logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id)) self.currentReply = self.previousReply self.previousReply.forMessage.interruptCount += 1 self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage, self.previousReply) # log if somebody starts speaking # TODO: implement interrupt if self.currentReply is None: self.logger.info("Start speaking") self.currentReply= Reply(self.currentMessage) now = self.timer.getElapsed() utterance = self.currentReply.getActiveUtterance(now) utterance.setText(e['transcript'], now) self.hugvey.eventLogger.info("speaking: content {} \"{}\"".format(id(utterance), e['transcript'])) self.timer.setMark('last_speech') if e['is_final']: utterance.setFinished(self.timer.getElapsed()) self.hugvey.eventLogger.info("speaking: stop {}".format(id(utterance))) if self.hugvey.recorder: self.hugvey.recorder.updateTranscription(self.currentReply.getText()) async def _processDirections(self, directions): ':type directions: list(Direction)' chosenDirection = None for direction in directions: for condition in direction.conditions: if condition.isMet(self): self.logger.info("Condition is met: {0}, going to {1}".format( condition.id, direction.msgTo.id)) self.hugvey.eventLogger.info("condition: {0}".format(condition.id)) self.hugvey.eventLogger.info("direction: {0}".format(direction.id)) direction.setMetCondition(condition) self.addToLog(condition) self.addToLog(direction) self.currentMessage.setFinished(self.timer.getElapsed()) chosenDirection = direction isDiverging = await self._processDiversions( chosenDirection.msgFrom if chosenDirection else None, chosenDirection.msgTo if chosenDirection else None) if not isDiverging and chosenDirection: await self.setCurrentMessage(chosenDirection.msgTo) return chosenDirection async def _processDiversions(self, msgFrom, msgTo) -> bool: """ Process the diversions on stack. If diverging, return True, else False msgFrom and msgTo contain the source and target of a headed direction if given Else, they are None """ diverge = False for diversion in self.diversions: d = await diversion.divergeIfNeeded(self, msgFrom, msgTo) if d: diverge = True return diverge def addToLog(self, node): self.log.append((node, self.timer.getElapsed())) if self.hugvey.recorder: if isinstance(node, Message): self.hugvey.recorder.log('hugvey', node.text, node.id) if isinstance(node, Diversion): self.hugvey.recorder.log('diversion',node.id) if isinstance(node, Condition): self.hugvey.recorder.log('condition',node.logInfo, node.id) async def _renderer(self): """ every 1/10 sec. determine what needs to be done based on the current story state """ loopDuration = 0.1 # Configure fps lastTime = time.time() self.logger.debug("Start renderer") while self.isRunning: if self.isRunning is False: break # pause on timer paused await self.timer.isRunning.wait() # wait for un-pause for i in range(len(self.events)): await self._processPendingEvents() # Test stability of Central Command with deliberate crash # if self.timer.getElapsed() > 5: # raise Exception('test') # The finish is not here anymore, but only on the playbackFinish event. directions = self.getCurrentDirections() await self._processDirections(directions) # TODO create timer event # self.commands.append({'msg':'TEST!'}) # wait for next iteration to avoid too high CPU t = time.time() await asyncio.sleep(max(0, loopDuration - (t - lastTime))) lastTime = t self.logger.debug("Stop renderer") async def setCurrentMessage(self, message, useReply = None): """ Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption. """ if self.currentMessage and not self.lastMsgFinishTime: self.logger.info("Interrupt playback {}".format(self.currentMessage.id)) self.hugvey.eventLogger.info("interrupt") # message is playing self.hugvey.sendCommand({ 'action': 'stop', 'id': self.currentMessage.id, }) message.uuid = shortuuid.uuid() self.currentMessage = message self.lastMsgTime = time.time() self.lastMsgFinishTime = None # to be filled in by the event self.lastMsgStartTime = None # to be filled in by the event # if not reset: self.previousReply = self.currentReply # we can use this for interrptions self.currentReply = useReply #self.currentMessage.reply # send command to already mute mic self.hugvey.sendCommand({ 'action': 'prepare', 'id': message.id }) # else: # # if we press 'save & play', it should not remember it's last reply to that msg # self.previousReply = self.currentReply # we can use this for interrptions # self.currentReply = self.currentMessage.reply self.logger.info("Current message: ({0}) \"{1}\"".format( message.id, message.text)) self.addToLog(message) self.hugvey.eventLogger.info(f"message: {message.id} {message.uuid} start \"{message.text}\"") # TODO: prep events & timer etc. fn = await message.getAudioFilePath() # get duration of audio file, so the client can detect a hang of 'play' with wave.open(fn,'r') as fp: frames = fp.getnframes() rate = fp.getframerate() duration = frames/float(rate) # self.hugvey.google.pause() # pause STT to avoid text events while decision is made self.hugvey.sendCommand({ 'action': 'play', 'file': fn, 'id': message.id, 'params': message.getParams(), 'duration': duration }) # 2019-02-22 temporary disable listening while playing audio: # if self.hugvey.google is not None: # self.logger.warn("Temporary 'fix' -> stop recording") # self.hugvey.google.pause() logmsg = "Pending directions:" for direction in self.getCurrentDirections(): conditions = [c.id for c in direction.conditions] logmsg += "\n- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions) self.logger.log(LOG_BS,logmsg) def getCurrentDirections(self): if self.currentMessage.id not in self.directionsPerMsg: return [] else: return self.directionsPerMsg[self.currentMessage.id] def getNextChapterForMsg(self, msg, canIncludeSelf = True, depth = 0): if canIncludeSelf and msg.chapterStart: self.logger.info(f"Next chapter: {msg.id}") return msg if depth >= 70: # protection against infinite loop? return None if msg.id not in self.directionsPerMsg: return None for direction in self.directionsPerMsg[msg.id]: r = self.getNextChapterForMsg(direction.msgTo, True, depth+1) if r: return r # none found return None async def run(self, customStartMsgId = None): self.logger.info("Starting story") self.hugvey.eventLogger.info("story: start") self.timer.reset() self.isRunning = True if customStartMsgId is not None: startMsg = self.get(customStartMsgId) else: startMsg = self.startMessage await self.setCurrentMessage(startMsg) await self._renderer() def isFinished(self): if hasattr(self, 'finish_time') and self.finish_time: return time.time() - self.finish_time return False def _finish(self): """ Finish story and set hugvey to the right state """ self.finish() #stop google etc: self.hugvey.available() def finish(self): """ Finish only the story """ self.logger.info(f"Finished story for {self.hugvey.id}") self.hugvey.eventLogger.info("story: finished") self.stop() self.finish_time = time.time() self.timer.pause()