import asyncio import json import logging import os import pickle import random import re import threading import time from tornado.httpclient import AsyncHTTPClient, HTTPRequest import traceback import urllib.parse import uuid import wave import zmq from zmq.asyncio import Context from pythonosc import udp_client import shortuuid import sox from .communication import LOG_BS mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("narrative") class Utterance(object): """Part of a reply""" def __init__(self, startTime): self.startTime = startTime self.endTime = None self.text = "" self.lastUpdateTime = startTime def setText(self, text, now): self.text = text.lower() # always lowercase self.lastUpdateTime = now def hasText(self): return len(self.text) > 0 def setFinished(self, endTime): self.endTime = endTime def isFinished(self): return self.endTime is not None def __getstate__(self): # print(f'get utterance {self}') state = self.__dict__.copy() return state class Message(object): def __init__(self, id, text): self.id = id self.text = text self.label = None self.isStart = False self.isStrandStart = False self.chapterStart = False self.reply = None # self.replyTime = None self.audioFile= None self.filenameFetchLock = asyncio.Lock() self.interruptCount = 0 self.timeoutDiversionCount = 0 self.afterrunTime = 0. # the time after this message to allow for interrupts self.finishTime = None # message can be finished without finished utterance (with instant replycontains) self.params = {} self.variableValues = {} self.parseForVariables() self.uuid = None # Have a unique id each time the message is played back. self.color = None self.lightChange = None self.didRepeat = False self.fileError = False # Used by diversions, autogenerated directions should link to next chapter mark instead of the given msgTo self.generatedDirectionsJumpToChapter = False # Used by diversions, no return directions should be autogenerated, so this message becomes an ending self.dontGenerateDirections = False def __getstate__(self): # Copy the object's state from self.__dict__ which contains # all our instance attributes. Always use the dict.copy() # method to avoid modifying the original state. # print(f'get msg {self.id}') state = self.__dict__.copy() # Remove the unpicklable entries. del state['filenameFetchLock'] return state def __setstate__(self, state): self.__dict__.update(state) self.filenameFetchLock = asyncio.Lock() def setStory(self, story): self.story = story self.logger = story.logger.getChild("message") @classmethod def initFromJson(message, data, story): msg = message(data['@id'], data['text']) msg.isStart = data['beginning'] if 'beginning' in data else False msg.isStrandStart = data['start'] if 'start' in data else False msg.chapterStart = bool(data['chapterStart']) if 'chapterStart' in data else False msg.afterrunTime = data['afterrun'] if 'afterrun' in data else 0. # TODO: investigate deprecation? msg.color = data['color'] if 'color' in data else None msg.label = data['label'] if 'label' in data else None if 'audio' in data and data['audio'] is not None: msg.audioFile = data['audio']['file'] msg.setStory(story) if 'params' in data: msg.params = data['params'] if not 'vol' in msg.params: # prevent clipping on some Lyrebird tracks msg.params['vol'] = .8 msg.lightChange = data['light'] if 'light' in data else None msg.generatedDirectionsJumpToChapter = bool(data['generatedDirectionsJumpToChapter']) if 'generatedDirectionsJumpToChapter' in data else False msg.dontGenerateDirections = bool(data['dontGenerateDirections']) if 'dontGenerateDirections' in data else False msg.params['vol'] = float(msg.params['vol']) return msg def parseForVariables(self): """ Find variables in text """ self.variables = re.findall('\$(\w+)', self.text) for var in self.variables: self.variableValues[var] = None def hasVariables(self) -> bool: return len(self.variables) > 0 def setVariable(self, name, value): if name not in self.variables: self.logger.critical("Set nonexisting variable") return if self.variableValues[name] == value: return self.variableValues[name] = value self.logger.warn(f"Set variable, fetch {name}") # disable prefetching of audio files, to avoid a bunch of calls at once to the voice api's # Other possiblity would be to wrap getAudioFilePath() into a method that delays execution incrementally # with an asyncio.sleep(i) # if not None in self.variableValues.values(): # self.logger.warn(f"now fetch {name} for {self.id}") # asyncio.get_event_loop().create_task(self.getAudioFilePath()) def getVariableValue(self, var): return self.variableValues[var] if (self.variableValues[var] is not None) else self.story.configuration.nothing_text #TODO: translate nothing to each language def getText(self): # sort reverse to avoid replacing the wrong variable self.variables.sort(key=len, reverse=True) text = self.text # self.logger.debug(f"Getting text for {self.id}") for var in self.variables: self.logger.debug(f"try replacing ${var} with {self.variableValues[var]} in {text}") replacement = self.getVariableValue(var) text = text.replace('$'+var, replacement) return text def getLabel(self): """ When a label is set, return that, else the original text """ if self.label and len(self.label): return self.label return self.getText() def getTextLabel(self): """ A combination of getText and getLabel for maximum verbosity """ l = f" ({self.label})" if self.label and len(self.label) else "" return f"{self.getText()}{l}" def setReply(self, reply): self.reply = reply def hasReply(self): return self.reply is not None def getReply(self): if not self.hasReply(): raise Exception( "Getting reply while there is none! {0}".format(self.id)) return self.reply def isFinished(self): return self.finishTime is not None def setFinished(self, currentTime): self.finishTime = currentTime def getFinishedTime(self): return self.finishTime def getParams(self): return self.params def getLogSummary(self): return { 'id': self.id, 'time': None if self.reply is None else [u.startTime for u in self.reply.utterances], 'text': self.getTextLabel(), 'replyText': None if self.reply is None else [u.text for u in self.reply.utterances] } async def getAudioFilePath(self): if self.audioFile is not None: return self.audioFile text = self.getText() textlabel = self.getTextLabel() self.logger.debug(f"Fetching audio for {textlabel}") # return "test"; async with self.filenameFetchLock: # print(threading.enumerate()) info = { 'text': text, 'variable': True if self.hasVariables() else False } s = Context.instance().socket(zmq.REQ) #: :type s: zmq.sugar.Socket voiceAddr = f"ipc://voice{self.story.hugvey.id}" s.connect(voiceAddr) await s.send_json(info) filename = await s.recv_string() s.close() # TODO: should this go trough the event Queue? risking a too long delay though if filename == 'local/crash.wav' or len(filename) < 1: self.logger.warning("Noting crash") self.fileError = True # print(threading.enumerate()) self.logger.debug(f"Fetched audio for {textlabel}: {filename}") return filename class Reply(object): def __init__(self, message: Message): self.forMessage = None self.utterances = [] self.setForMessage(message) def __getstate__(self): # print(f'get reply {self}') state = self.__dict__.copy() return state def setForMessage(self, message: Message): self.forMessage = message message.setReply(self) def getLastUtterance(self) -> Utterance: if not self.hasUtterances(): return None u = self.utterances[-1] #: :type u: Utterance # attempt to fix a glitch that google does not always send is_finished if u.isFinished(): return u now = self.forMessage.story.timer.getElapsed() diff = now - u.lastUpdateTime if diff > 5: # time in seconds to force silence in utterance # useful for eg. 'hello', or 'no' self.forMessage.story.logger.warn( f"Set finish time for utterance after {diff}s {u.text}" ) u.setFinished(now) return u def getFirstUtterance(self) -> Utterance: if not self.hasUtterances(): return None return self.utterances[0] def hasUtterances(self) -> bool: return len(self.utterances) > 0 def addUtterance(self, utterance: Utterance): self.utterances.append(utterance) def getText(self) -> str: # for some reason utterances are sometimes empty.Strip these out utterances = filter(None, [u.text.strip() for u in self.utterances]) return ". ".join(utterances).strip() def getActiveUtterance(self, currentTime) -> Utterance: """ If no utterance is active, create a new one. Otherwise return non-finished utterance for update """ if len(self.utterances) < 1 or self.getLastUtterance().isFinished(): u = Utterance(currentTime) self.addUtterance(u) else: u = self.getLastUtterance() return u def isSpeaking(self): u = self.getLastUtterance() if u is not None and not u.isFinished(): return True return False def getTimeSinceLastUtterance(self): if not self.hasUtterances(): return None return self.forMessage.story.timer.getElapsed() - self.getLastUtterance().lastUpdateTime class Condition(object): """ A condition, basic conditions are built in, custom condition can be given by providing a custom method. """ def __init__(self, id): self.id = id self.method = None self.type = None self.vars = {} self.logInfo = None self.originalJsonString = None self.usedContainsDuration = None def __getstate__(self): # print(f'get condition {self.id}') state = self.__dict__.copy() return state @classmethod def initFromJson(conditionClass, data, story): condition = conditionClass(data['@id']) condition.type = data['type'] condition.originalJsonString = json.dumps(data) #: :type condition: Condition # TODO: should Condition be subclassed? if data['type'] == "replyContains": condition.method = condition._hasMetReplyContains if data['type'] == "timeout": condition.method = condition._hasMetTimeout if data['type'] == "variable": condition.method = condition._hasVariable if data['type'] == "diversion": condition.method = condition._hasDiverged if data['type'] == "audioError": condition.method = condition._hasAudioError if data['type'] == "messagePlayed": condition.method = condition._hasPlayed if data['type'] == "variableEquals": condition.method = condition._variableEquals if data['type'] == "loop_time": condition.method = condition._hasTimer if data['type'] == "variable_storage": condition.method = condition._hasVariableStorage condition.hasRan = False if 'vars' in data: condition.vars = data['vars'] if 'regex' in condition.vars: condition.vars['regex'] = condition.vars['regex'].rstrip() return condition def isMet(self, story): """ Validate if condition is met for the current story state """ try: r = self.method(story) except Exception as e: story.logger.critical("Exception condition: {self.id}, ignoring it") story.logger.exception(e) r = False return r def _hasMetTimeout(self, story): now = story.timer.getElapsed() # check if the message already finished playing if not story.lastMsgFinishTime: return False if 'onlyIfNoReply' in self.vars and self.vars['onlyIfNoReply']: if story.currentReply and story.currentReply is not None and story.currentReply.hasUtterances(): story.logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}') # 'onlyIfNoReply': only use this timeout if participants doesn't speak. return False # else: # story.logger.debug('Only if no reply has no text yet!') hasMetTimeout = now - story.lastMsgFinishTime >= story.applyTimeFactor(self.vars['seconds']) if not hasMetTimeout: return False # update stats: story.stats['timeouts'] +=1 if 'needsReply' in self.vars and self.vars['needsReply'] is True: if story.currentReply is None or not story.currentReply.hasUtterances(): story.stats['silentTimeouts'] +=1 story.stats['consecutiveSilentTimeouts'] += 1 self.logInfo = "{}s".format(self.vars['seconds']) return True def _hasVariable(self, story) -> bool: if not story.lastMsgFinishTime: return False r = story.hasVariableSet(self.vars['variable']) if r: story.logger.debug(f"Variable {self.vars['variable']} is set.") if 'notSet' in self.vars and self.vars['notSet']: # inverse: r = not r self.logInfo = "Does {} have variable {}".format( 'not' if 'notSet' in self.vars and self.vars['notSet'] else '', self.vars['variable'] ) return r def _hasTimer(self, story) -> bool: if not story.lastMsgFinishTime: return False loopTime = story.hugvey.command.timer.getElapsed() % 3600 ltTime = int(self.vars['less_than']) gtTime = int(self.vars['more_than']) if not ltTime and not gtTime: # ignore invalid times return elif not gtTime and loopTime < ltTime: r = True elif not ltTime and loopTime > gtTime: r = True elif loopTime < ltTime and loopTime > gtTime: r = True else: r = False if 'inverseMatch' in self.vars and self.vars['inverseMatch']: r = not r self.logInfo = "Looptime is {} {} < {} < {}".format( '' if r else 'not', f'{gtTime}' if gtTime else '-', loopTime, f'{ltTime}' if ltTime else '-', ) return r def _variableEquals(self, story) -> bool: v1 = story.variableValues[self.vars['variable1']] if story.hasVariableSet(self.vars['variable1']) else None v2 = story.variableValues[self.vars['variable2']] if story.hasVariableSet(self.vars['variable2']) else None if v1: story.logger.debug(f"Variable {self.vars['variable1']}: {v1}") if v2: story.logger.debug(f"Variable {self.vars['variable2']}: {v2}") if 'notEq' in self.vars and self.vars['notEq']: # inverse: r = (v1 != v2) else: r = (v1 == v2) story.logger.info("'{}' {} '{}' ({})".format(v1, '==' if v1 == v2 else '!=', v2, r)) return r def _hasDiverged(self, story) -> bool: if not story.lastMsgFinishTime: return False d = story.get(self.vars['diversionId']) if not d: story.logger.warning(f"Condition on non-existing diversion: {self.vars['diversionId']}") r = d.hasHit if r: story.logger.debug(f"Diversion {self.vars['diversionId']} has been hit.") if 'inverseMatch' in self.vars and self.vars['inverseMatch']: # inverse: r = not r self.logInfo = "Has {} diverged to {}".format( 'not' if 'inverseMatch' in self.vars and self.vars['inverseMatch'] else '', self.vars['diversionId'] ) return r def _hasAudioError(self, story) -> bool: if not story.currentMessage or not story.currentMessage.fileError: return False self.logInfo = f"Has error loading audio file for {story.currentMessage.id}" return True def _hasPlayed(self, story) -> bool: if not story.lastMsgFinishTime: return False msgId = self.vars['msgId'].strip() msg = story.get(msgId) if not msg: if not self.logInfo: # show error only once story.logger.warning(f"Condition on non-existing message: {msgId}") # assigning false to r, keeps 'inverseMatch' working, even when msgId is wrong r = False else: #: :type msg: Message r = msg.isFinished() if r: story.logger.debug(f"Msg {msgId} has been played.") if 'inverseMatch' in self.vars and self.vars['inverseMatch']: # inverse: r = not r self.logInfo = "Has {} played msg {}".format( 'not' if 'inverseMatch' in self.vars and self.vars['inverseMatch'] else '', msgId ) return r def _hasVariableStorage(self, story) -> bool: if not story.lastMsgFinishTime: return False if self.hasRan: # Prevent multiple runs of the same query within eg. waiting for a timeout. return False number = int(self.vars['number']) unique = bool(self.vars['unique']) if 'unique' in self.vars else False varValues = story.hugvey.command.variableStore.getLastOfName(self.vars['var_name'], story.language_code, number, unique) self.hasRan = True if len(varValues) < number: story.logger.warn(f"{self.id}: Too few instances of {self.vars['var_name']}, only {len(varValues)} in store") return False for i in range(number): story.setVariableValue( f"stored_{self.vars['var_name']}_{i+1}", varValues[i], store=False ) return True def _hasMetReplyContains(self, story) -> bool: """ Check the reply for specific characteristics: - regex: regular expression. If empy, just way for isFinished() - delays: an array of [{'minReplyDuration', 'waitTime'},...] - minReplyDuration: the nr of seconds the reply should take. Preferably have one with 0 - waitTime: the time to wait after isFinished() before continuing """ r = story.currentReply # make sure we keep working with the same object if not r or not r.hasUtterances(): return False capturedVariables = None t = None result = None if 'regex' in self.vars and len(self.vars['regex']): if 'regexCompiled' not in self.vars: # Compile once, as we probably run it more than once self.vars['regexCompiled'] = re.compile(self.vars['regex']) t = r.getText().lower() story.logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t)) result = self.vars['regexCompiled'].search(t) if result is None: #if there is something to match, but not found, it's never ok return False story.logger.debug('Got match on {}'.format(self.vars['regex'])) capturedVariables = result.groupdict() if 'instantMatch' in self.vars and self.vars['instantMatch']: story.logger.info(f"Instant match on {self.vars['regex']}, {self.vars}") self.logInfo = "Instant match of {}, captured {}".format( self.vars['regex'], capturedVariables ) # Set variables only when direction returns true if capturedVariables is not None: for captureGroup in capturedVariables: story.setVariableValue(captureGroup, capturedVariables[captureGroup]) return True # TODO: implement 'instant match' -> don't wait for isFinished() # print(self.vars) # either there's a match, or nothing to match at all if 'delays' in self.vars: if story.lastMsgFinishTime is None: story.logger.debug("not finished playback yet") return False # time between finishing playback and ending of speaking: replyDuration = r.getLastUtterance().lastUpdateTime - story.lastMsgFinishTime # using lastUpdateTime instead of endTime delays = sorted(self.vars['delays'], key=lambda k: float(k['minReplyDuration']), reverse=True) for delay in delays: if replyDuration > float(delay['minReplyDuration']): timeSinceReply = r.getTimeSinceLastUtterance() waitTime = story.applyTimeDelay(story.applyTimeFactor(delay['waitTime'])) story.logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {waitTime}") if timeSinceReply > waitTime: # if variables are captured, only set them the moment the condition matches if capturedVariables is not None: for captureGroup in capturedVariables: story.setVariableValue(captureGroup, capturedVariables[captureGroup]) # self.logInfo = "Match of {}, captured {} after, {}".format( # self.vars['regex'], # capturedVariables, # timeSinceReply # ) matchestxt = "" if not result else result.group() self.logInfo = f"{self.id} - search \"{self.vars['regex']}\" on \"{t}\" matches \"{matchestxt}\", captured {capturedVariables} - after {timeSinceReply}s" self.usedContainsDuration = waitTime return True break # don't check other delays # wait for delay to match story.logger.log(LOG_BS, "Wait for it...") return False # If there is a delay, it takes precedence of isSpeaking, since google does not always give an is_finished on short utterances (eg. "hello" or "no") if r.isSpeaking(): story.logger.log(LOG_BS, f"is speaking: {r.getLastUtterance().text} - {r.getLastUtterance().startTime}") return False # There is a match and no delay say, person finished speaking. Go ahead sir! self.logInfo = "Match" return True def getLogSummary(self): return { 'id': self.id } class Direction(object): """ A condition based edge in the story graph """ def __init__(self, id, msgFrom: Message, msgTo: Message): self.id = id self.msgFrom = msgFrom self.msgTo = msgTo #: :type self.conditions: list(Condition) self.conditions = [] self.conditionMet = None self.isDiversionReturn = False self.diversionHasReturned = False # for isDiversionReturn. def __getstate__(self): # print(f'get direction {self.id}') state = self.__dict__.copy() return state def addCondition(self, condition: Condition): self.conditions.append(condition) def setMetCondition(self, condition: Condition): self.conditionMet = condition @classmethod def initFromJson(direction, data, story): msgFrom = story.get(data['source']) msgTo = story.get(data['target']) direction = direction(data['@id'], msgFrom, msgTo) if 'conditions' in data: for conditionId in data['conditions']: c = story.get(conditionId) direction.addCondition(c) return direction def getLogSummary(self): return { 'id': self.id, 'condition': self.conditionMet.id if self.conditionMet else None } class Diversion(object): """ An Diversion. Used to catch events outside of story flow. Not sure why I'm not using subclasses here o:) """ def __init__(self, id, type: str, params: dict): self.id = id self.params = params self.finaliseMethod = None self.hasHit = False self.disabled = False self.type = type self.counter = 0 self.logInfo = self.id # default info is merely ID if type == 'no_response': self.method = self._divergeIfNoResponse self.finaliseMethod = self._returnAfterNoResponse self.counter = 0 if type == 'reply_contains': self.method = self._divergeIfReplyContains self.finaliseMethod = self._returnAfterReplyContains if len(self.params['regex']) > 0: self.regex = re.compile(self.params['regex'].rstrip()) else: self.regex = None if type == 'timeout': self.method = self._divergeIfTimeout self.finaliseMethod = self._returnAfterTimeout if type == 'collective_moment': self.method = self._divergeIfCollectiveMoment if type == 'repeat': self.method = self._divergeIfRepeatRequest self.regex = re.compile(self.params['regex'].rstrip()) if type == 'interrupt': self.method = self._divergeIfInterrupted if not self.method: raise Exception("No valid type given for diversion") def __getstate__(self): # print(f'get diversion {self.id}') state = self.__dict__.copy() return state @classmethod def initFromJson(diversionClass, data, story): diversion = diversionClass(data['@id'], data['type'], data['params']) return diversion def getLogSummary(self): return { 'id': self.id, } async def divergeIfNeeded(self, story, direction = None): """ Validate if condition is met for the current story state Returns True when diverging """ # For all diversion except repeat (which simply doesn't have the variable) if 'notAfterMsgId' in self.params and self.params['notAfterMsgId']: msg = story.get(self.params['notAfterMsgId']) if msg is None: story.logger.warn(f"Invalid message selected for diversion: {self.params['notAfterMsgId']} for {self.id}") elif story.logHasMsg(msg): # story.logger.warn(f"Block diversion {self.id} because of hit message {self.params['notAfterMsgId']}") self.disabled = True # never run it and allow following timeouts/no_responses to run return False try: r = await self.method(story, direction.msgFrom if direction else None, direction.msgTo if direction else None, direction if direction else None) if r: if self.type != 'repeat' and self.type !='interrupt': # repeat diversion should be usable infinte times self.hasHit = True story.addToLog(self) story.hugvey.eventLogger.info(f"diverge {self.id}") except Exception as e: story.logger.critical("Exception when attempting diversion") story.logger.exception(e) return False return r def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True, timeoutDuration = .5, replyContainsDurations = None): """ The finishes of this diversion's strand should point to the return message with the right timeout/timing. If hit, this direction should also notify this diversion. replyContainsDurations: list formatted as in JSON [{ "minReplyDuration": "0", "waitTime": "3" }] """ self.counter +=1 story.logger.info(f"Creating return directions for {startMsg.id}") finishMessageIds = story.getFinishesForMsg(startMsg) finalTimeoutDuration = timeoutDuration finalContainsDurations = replyContainsDurations #: :type story: Story #: :type originalDirection: Direction # story.directionsPerMsg[story.currentMessage.id] # take the timeouts that are on the current message, and apply it to our return # as to have somewhat equal pace as to where we originate from if inheritTiming: for originalDirection in story.getCurrentDirections(): # if originalDirection: for condition in originalDirection.conditions: if condition.type == 'timeout': finalTimeoutDuration = float(condition.vars['seconds']) if condition.type == 'replyContains': finalContainsDurations = json.loads(condition.originalJsonString)['vars']['delays'] story.logger.debug(f"Finishes for {startMsg.id}: {finishMessageIds}") i = 0 # story.logger.warn(f"FINISHES: {finishMessageIds}") for msgId in finishMessageIds: # Some very ugly hack to add a direction & condition i+=1 msg = story.get(msgId) if not msg: continue if msg.dontGenerateDirections: story.logger.info(f"Diversion ending {msg.id} is story ending. Don't generate return direction.") continue usedReturnMessage = returnMsg if msg.generatedDirectionsJumpToChapter: usedReturnMessage = story.getNextChapterForMsg(returnMsg, canIncludeSelf=True) if not usedReturnMessage: # in case of a diversion in the last bit of the story, it can be there there is no return message. raise Exception(f"No return message found for {msg.id}") direction = Direction(f"{self.id}-{i}-{self.counter}", msg, usedReturnMessage) data = json.loads(f""" {{ "@id": "{self.id}-ct{i}-{self.counter}", "@type": "Condition", "type": "timeout", "label": "Autogenerated Timeout", "vars": {{ "seconds": "{finalTimeoutDuration}" }} }} """) data['vars']['onlyIfNoReply'] = finalContainsDurations is not None # TODO: also at replycontains if it exists, with the same timings condition = Condition.initFromJson(data, story) direction.addCondition(condition) if finalContainsDurations is not None: data2 = json.loads(f""" {{ "@id": "{self.id}-cr{i}-{self.counter}", "@type": "Condition", "type": "replyContains", "label": "Autogenerated Reply Contains", "vars": {{ "regex": "", "instantMatch": false }} }} """) data2['vars']['delays'] = finalContainsDurations condition2 = Condition.initFromJson(data2, story) direction.addCondition(condition2) story.add(condition2) direction.isDiversionReturn = True # will clear the currentDiversion on story story.diversionDirections.append(direction) story.logger.info(f"Created direction: {direction.id} ({msg.id} -> {usedReturnMessage.id}) {condition.id} with timeout {finalTimeoutDuration}s") story.add(condition) story.add(direction) async def finalise(self, story): """" Only used if the Diversion sets the story.currentDiversion """ story.logger.info("end of diversion") story.hugvey.eventLogger.info(f"return from {self.id}") if not self.finaliseMethod: story.logger.info(f"No finalisation for diversion {self.id}") story.currentDiversion = None return False await self.finaliseMethod(story) story.currentDiversion = None return True async def _divergeIfNoResponse(self, story, msgFrom, msgTo, direction): """ Participant doesn't speak for x consecutive replies (has had timeout) """ ':type story: Story' if story.currentDiversion or not msgFrom or not msgTo: return False if story.stats['consecutiveSilentTimeouts'] >= int(self.params['consecutiveSilences']): story.stats['diversions']['no_response'] += 1 msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") return story.logger.info(f"Diverge: No response {self.id} {story.stats}") self.returnMessage = msgTo if self.params['returnAfterStrand']: self.createReturnDirectionsTo(story, msg, msgTo, direction) await story.setCurrentMessage(msg) story.currentDiversion = self return True return async def _returnAfterNoResponse(self, story): story.logger.info(f"Finalise diversion: {self.id}") story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging # if self.params['returnAfterStrand']: # await story.setCurrentMessage(self.returnMessage) async def _divergeIfReplyContains(self, story, msgFrom, msgTo, _): """ Participant doesn't speak for x consecutive replies (has had timeout) """ ':type story: Story' # use story.currentReply.getTimeSinceLastUtterance() > 2 if story.currentDiversion: # or not msgFrom or not msgTo: # don't do nested diversions return False if self.hasHit: # don't match twice return if story.currentReply is None or not self.regex: return direction = story.getDefaultDirectionForMsg(story.currentMessage) if not direction: # ignore the direction argument, and only check if the current message has a valid default return waitTime = story.applyTimeDelay(story.applyTimeFactor(1.8 if 'waitTime' not in self.params else float(self.params['waitTime']))) timeSince = story.currentReply.getTimeSinceLastUtterance() if timeSince < waitTime: story.logger.log(LOG_BS, f"Waiting for replyContains: {timeSince} (needs {waitTime})") return text = story.currentReply.getText() r = self.regex.search(text) if r is None: return if 'notForColor' in self.params and self.params['notForColor'] and story.currentMessage.color: if story.currentMessage.color.lower() == self.params['notForColor'].lower(): story.logger.debug(f"Skip diversion {self.id} because of section color") return story.logger.info(f"Diverge: reply contains {self.id}") story.stats['diversions']['reply_contains'] += 1 msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") return # valid diversion: self.logInfo = f"{self.id} - search \"{r.re.pattern}\" on \"{text}\" matches \"{r.group()}\" after {timeSince}s" if 'nextChapterOnReturn' in self.params and self.params['nextChapterOnReturn']: msgTo = story.getNextChapterForMsg(story.currentMessage, False) or direction.msgTo returnInheritTiming = False else: msgTo = direction.msgTo returnInheritTiming = True self.returnMessage = msgTo if self.params['returnAfterStrand']: self.createReturnDirectionsTo(story, msg, msgTo, direction, inheritTiming=returnInheritTiming) await story.setCurrentMessage(msg) story.currentDiversion = self return True async def _returnAfterReplyContains(self, story): story.logger.info(f"Finalise diversion: {self.id}") # if self.params['returnAfterStrand']: # await story.setCurrentMessage(self.returnMessage) async def _divergeIfCollectiveMoment(self, story, msgFrom, msgTo, direction): """ Central command timer times to a collective moment """ #: :var story: Story if story.currentDiversion or not msgFrom or not msgTo: return False if not msgTo.chapterStart: # only when changing chapter return window_open_second = float(self.params['start_second']) window_close_second = window_open_second + float(self.params['window']) # Only keep a 1h loop now = story.hugvey.command.timer.getElapsed() % 3600 if now < window_open_second or now > window_close_second: return #open! msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.id} {self.params['msgId']}") return self.returnMessage = msgTo self.createReturnDirectionsTo(story, msg, msgTo, direction, inheritTiming=True) await story.setCurrentMessage(msg) story.currentDiversion = self return True async def _divergeIfRepeatRequest(self, story, msgFrom, msgTo, direction): """ Participant asks if message can be repeated. """ # if not msgFrom or not msgTo: # return # TODO: how to handle this now we sometimes use different timings. # Perhaps set isFinished when matching condition. if story.currentReply is None or story.currentReply.getTimeSinceLastUtterance() < story.applyTimeFactor(1.8): return if story.currentMessage.didRepeat: # repeat only once return r = self.regex.search(story.currentReply.getText()) if r is None: return logger.info(f"Diverge: request repeat {self.id}") story.stats['diversions']['repeat'] += 1 await story.setCurrentMessage(story.currentMessage) story.currentMessage.didRepeat = True return True async def _divergeIfTimeout(self, story, msgFrom, msgTo, direction): """ (1) last spoken at all (2) or duration for this last reply only Only can kick in if there's no 'timeout' condition set. """ if story.currentDiversion: return if msgFrom or msgTo: # not applicable a direction has been chosen return if not story.lastMsgFinishTime: # not during play back return # not applicable when timeout is set directions = story.getCurrentDirections() for direction in directions: for condition in direction.conditions: if condition.type == 'timeout': return now = story.timer.getElapsed() if now - story.lastMsgFinishTime < story.applyTimeFactor(self.params['minTimeAfterMessage']): # not less than x sec after it return interval = story.applyTimeFactor(self.params['interval']) if not self.params['fromLastMessage']: # (1) last spoken at all timeSince = story.timer.getElapsed('last_speech') if story.timer.hasMark('last_speech') else story.timer.getElapsed('start') if story.timer.hasMark('last_diversion_timeout') and story.timer.getElapsed('last_diversion_timeout') > timeSince: timeSince = story.timer.getElapsed('last_diversion_timeout') if timeSince < interval: return story.stats['diversions']['timeout_total'] += 1 else: if story.currentMessage is None: return # if story.currentMessage.timeoutDiversionCount + 1 if story.currentReply is not None: # still playing back # or somebody has spoken already (timeout only works on silences) return if now - story.lastMsgFinishTime < interval: return story.currentMessage.timeoutDiversionCount += 1 story.stats['diversions']['timeout_last'] += 1 # if we're still here, there's a match! story.logger.info(f"Diverge: Timeout {self.id} of {self.params['interval']}") story.stats['diversions']['timeout'] += 1 msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") return # fall back to the currentMessage to return on. # TODO: maybe, if not chapter is found, the diversion should be # blocked alltogether? self.returnMessage = story.getNextChapterForMsg(story.currentMessage, False) or story.currentMessage if self.params['returnAfterStrand']: # no direction is here, as this diversion triggers before a direction is taken self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False) await story.setCurrentMessage(msg, allowReplyInterrupt=True) story.currentDiversion = self story.timer.setMark('last_diversion_timeout') return True async def _returnAfterTimeout(self, story): story.logger.info(f"Finalise diversion: {self.id}") async def _divergeIfInterrupted(self, story, msgFrom, msgTo, direction): """ This is here as a placeholder for the interruption diversion. These will however be triggered differently """ if story.currentDiversion or story.allowReplyInterrupt: return False msg = story.get(self.params['msgId']) if msg is None: story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}") return self.returnMessage = story.currentMessage # no direction is here, as this diversion triggers before a direction is taken self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False, timeoutDuration=3, replyContainsDurations = [{ "minReplyDuration": "0", "waitTime": "2" }]) await story.setCurrentMessage(msg) story.currentDiversion = self return True class Configuration(object): id = 'configuration' volume = 1 # Volume multiplier for 'play' command nothing_text = "nothing" # When variable is not set, but used in sentence, replace it with this word. time_factor = 1 # time is multiplied to timeouts etc. (not playback) time_extra_delay = 0 # time adder for reply contains diversion/condition (see applyTimeDelay()) tempo_factor = 1 # tempo is multiplied (playback) pitch_modifier = 1 # pitch is added (playback) light0_intensity = 0 light0_fade = 30. # fade duration in seconds light0_isSophie = False light1_intensity = 150 light1_fade = 10. light1_isSophie = False light2_intensity = 75 light2_fade = 10. light2_isSophie = False light3_intensity = 150 light3_fade = 10. light3_isSophie = False light4_intensity = 150 light4_fade = 10. light4_isSophie = False @classmethod def initFromJson(configClass, data, story): config = Configuration() config.__dict__.update(data) return config def getLightPresets(self): c = self.__dict__ l = [] for i in range(5): l.append({ 'intensity': int(c[f"light{i}_intensity"]), 'fade': float(c[f"light{i}_fade"]), 'isSophie': float(c[f"light{i}_isSophie"]) }) return l storyClasses = { 'Msg': Message, 'Direction': Direction, 'Condition': Condition, 'Diversion': Diversion, 'Configuration': Configuration, } class Stopwatch(object): """ Keep track of elapsed time. Use multiple markers, but a single pause/resume button """ def __init__(self): self.isRunning = asyncio.Event() self.reset() def getElapsed(self, since_mark='start'): t = time.time() if self.paused_at != 0: pause_duration = t - self.paused_at else: pause_duration = 0 return t - self.marks[since_mark] - pause_duration def pause(self): self.paused_at = time.time() self.isRunning.clear() def resume(self): if self.paused_at == 0: return pause_duration = time.time() - self.paused_at for m in self.marks: self.marks[m] += pause_duration self.paused_at = 0 self.isRunning.set() def reset(self): self.marks = {} self.setMark('start') self.paused_at = 0 self.isRunning.set() def setMark(self, name, overrideValue = None): """ Set a marker to current time. Or , if given, to any float one desires """ self.marks[name] = overrideValue if overrideValue else time.time() def hasMark(self, name): return name in self.marks def clearMark(self, name): if name in self.marks: self.marks.pop(name) def __getstate__(self): # print(f'get stopwatch') state = self.__dict__.copy() state['isRunning'] = self.isRunning.is_set() return state def __setstate__(self, state): self.__dict__.update(state) self.isRunning = asyncio.Event() if 'isRunning' in state and state['isRunning']: self.isRunning.set() else: self.isRunning.clear() class StoryState(object): """ Because Story not only contains state, but also logic/control variables, we need a separate class to keep track of the state of things. This way, we can recreate the exact state in which a story was before. """ msgLog = [] currentMessage = None currentDiversion = None currentReply = None allowReplyInterrupt = False timer = Stopwatch() isRunning = False lastMsgTime = None lastSpeechStartTime = None lastSpeechEndTime = None variableValues = {} # captured variables from replies finish_time = False events = [] # queue of received events commands = [] # queue of commands to send log = [] # all nodes/elements that are triggered msgLog = [] stats = { 'timeouts': 0, 'silentTimeouts': 0, 'consecutiveSilentTimeouts': 0, 'diversions': { 'no_response': 0, 'repeat': 0, 'reply_contains': 0, 'timeout': 0, 'timeout_total': 0, 'timeout_last': 0 } } def __init__(self): pass # class Story(object): """Story represents and manages a story/narrative flow""" # TODO should we separate 'narrative' (the graph) from the story (the # current user flow) def __init__(self, hugvey_state, panopticon_port): super(Story, self).__init__() self.hugvey = hugvey_state self.panopticon_port = panopticon_port self.events = [] # queue of received events self.commands = [] # queue of commands to send self.log = [] # all nodes/elements that are triggered self.msgLog = [] # hit messages self.logger = mainLogger.getChild(f"{self.hugvey.id}").getChild("story") self.currentMessage = None self.currentDiversion = None self.currentReply = None self.allowReplyInterrupt = False self.timer = Stopwatch() self.isRunning = False self.diversions = [] self.interruptionDiversions = [] self.variables = {} self.variableValues = {} # captured variables from replies self.runId = uuid.uuid4().hex self.currentLightPresetNr = None def pause(self): self.logger.debug('pause hugvey') self.timer.pause() def resume(self): self.logger.debug('resume hugvey') self.timer.resume() def getLogSummary(self): summary = { # e[0]: the entity, e[1]: the logged time 'messages': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Message)], 'directions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Direction)], 'diversions': [(e[0].getLogSummary(), e[1]) for e in self.log if isinstance(e[0], Diversion)], } # print(self.log) return summary def getLogCounts(self): return { 'messages': len([1 for e in self.log if isinstance(e[0], Message)]), 'diversions': len([1 for e in self.log if isinstance(e[0], Diversion)]), } def registerVariable(self, variableName, message): if variableName not in self.variables: self.variables[variableName] = [message] else: self.variables[variableName].append(message) def setVariableValue(self, name, value, store=True): if name not in self.variables: self.logger.warn(f"Set variable that is not needed in the story: {name}") if name in self.variableValues and self.variableValues[name] == value: self.logger.debug(f"Skip double setting of variable {name} to {value}") return self.logger.debug(f"Set variable {name} to {value}") self.variableValues[name] = value if store: self.hugvey.command.variableStore.addVariable(self.runId, name, value, self.hugvey.id, self.language_code) if name not in self.variables: return for message in self.variables[name]: message.setVariable(name, value) def hasVariableSet(self, name) -> bool: return name in self.variableValues and self.variableValues is not None def setStoryData(self, story_data, language_code): """ Parse self.data into a working story engine """ self.data = story_data self.language_code = language_code # keep to be able to reset it in the end currentId = self.currentMessage.id if self.currentMessage else None self.elements = {} self.strands = {} self.diversions = [] self.interruptionDiversions = [] self.directionsPerMsg = {} self.startMessage = None # The entrypoint to the graph self.variables = {} self.reset() for el in self.data: try: className = storyClasses[el['@type']] obj = className.initFromJson(el, self) self.add(obj) except Exception as e: self.logger.critical(f"Error loading story element: {el}") self.logger.exception(e) raise e # self.logger.debug(self.elements) # self.logger.debug(self.directionsPerMsg) self.diversions = [el for el in self.elements.values() if type(el) == Diversion] self.interruptionDiversions = [el for el in self.elements.values() if type(el) == Diversion and el.type == 'interrupt'] configurations = [el for el in self.elements.values() if type(el) == Configuration] self.configuration = configurations[0] if len(configurations) else Configuration() if currentId: self.currentMessage = self.get(currentId) if self.currentMessage: self.logger.info( f"Reinstantiated current message: {self.currentMessage.id}") else: self.logger.warn( "Could not reinstatiate current message. Starting over") # Register variables for msg in self.getMessages(): # print(msg.id, msg.hasVariables()) if not msg.hasVariables(): continue for var in msg.variables: self.registerVariable(var, msg) self.logger.debug(f'has variables: {self.variables}') self.logger.debug(f'has {len(self.strands)} strands: {self.strands}') # self.logger.info(f"Directions: {self.directionsPerMsg}") self.calculateFinishesForStrands() def reset(self): self.timer.reset() # self.startTime = time.time() # currently active message, determines active listeners etc. self.currentMessage = None self.currentDiversion = None self.lastMsgTime = None self.lastSpeechStartTime = None self.lastSpeechEndTime = None self.variableValues = {} # captured variables from replies self.finish_time = False self.runId = uuid.uuid4().hex self.diversionDirections = [] self.gaveErrorForNotContinuing = False self.events = [] # queue of received events self.commands = [] # queue of commands to send self.log = [] # all nodes/elements that are triggered self.msgLog = [] self.currentReply = None self.stats = { 'timeouts': 0, 'silentTimeouts': 0, 'consecutiveSilentTimeouts': 0, 'diversions': { 'no_response': 0, 'repeat': 0, 'reply_contains': 0, 'timeout': 0, 'timeout_total': 0, 'timeout_last': 0 } } for msg in self.getMessages(): pass def add(self, obj): if obj.id in self.elements: # print(obj) raise Exception("Duplicate id for ''".format(obj.id)) self.elements[obj.id] = obj if type(obj) == Diversion: self.diversions.append(obj) if type(obj) == Message: if obj.isStart: #confusingly, isStart is 'beginning' in the story json file self.startMessage = obj if obj.isStrandStart: self.strands[obj.id] = [] if type(obj) == Direction: if obj.msgFrom.id not in self.directionsPerMsg: self.directionsPerMsg[obj.msgFrom.id] = [] self.directionsPerMsg[obj.msgFrom.id].append(obj) def get(self, id): """ Get a story element by its id """ if id in self.elements: return self.elements[id] return None def getMessages(self): return [el for el in self.elements.values() if type(el) == Message] def stop(self): self.logger.info("Stop Story") if self.isRunning: self.isRunning = False def shutdown(self): self.stop() self.hugvey = None async def _processPendingEvents(self): # Gather events: nr = len(self.events) for i in range(nr): e = self.events.pop(0) self.logger.debug("handle '{}'".format(e)) if e['event'] == "exit": self.stop() if e['event'] == 'connect': # a client connected. Should only happen in the beginning or in case of error # that is, until we have a 'reset' or 'start' event. # reinitiate current message await self.setCurrentMessage(self.currentMessage) if e['event'] == "playbackStart": if e['msgId'] != self.currentMessage.id: continue self.lastMsgStartTime = self.timer.getElapsed() self.logger.debug("Start playback") if e['event'] == "playbackFinish": if e['msgId'] == self.currentMessage.id: #TODO: migrate value to Messagage instead of Story self.lastMsgFinishTime = self.timer.getElapsed() self.hugvey.eventLogger.info(f"message: {self.currentMessage.id} {self.currentMessage.uuid} done") # 2019-02-22 temporary disable listening while playing audio: # if self.hugvey.google is not None: # self.logger.warn("Temporary 'fix' -> resume recording?") # self.hugvey.google.resume() if self.currentMessage.id not in self.directionsPerMsg: # print(self.currentDiversion) # if self.currentDiversion is not None: # await self.currentDiversion.finalise(self) # else: # TODO: check if direction that exists are diversion returns, and if they are already taken. Otherwise story blocks self.logger.info("THE END!") self._finish() return if e['event'] == 'speech': # TODO if transcript is empty, ignore (happened sometimes in french) if len(e['transcript'].strip()) < 1: self.logger.warning(f'ignore empty transcription {e}') continue # participants speaks, reset counter self.stats['consecutiveSilentTimeouts'] = 0 # if self.currentMessage and not self.lastMsgStartTime: if self.currentMessage and not self.lastMsgFinishTime: # Ignore incoming speech events until we receive a 'playbackStart' event. # After that moment the mic will be muted, so nothing should come in _anyway_ # unless google is really slow on us. But by taking the start time we don't ignore # messages that come in, in the case google is faster than our playbackFinish event. # (if this setup doesn't work, try to test on self.lastMsgFinish time anyway) # it keeps tricky with all these run conditions # if len(self.interruptionDiversions) and not self.currentDiversion and not self.allowReplyInterrupt: # self.logger.warn("diverge when speech during playing message") # diversion = random.choice(self.interruptionDiversions) # #: :type diversion: Diversion # r = await diversion.divergeIfNeeded(self, None) # print(r) # is always needed :-) # else: self.logger.info("ignore speech during playing message") continue # log if somebody starts speaking if self.currentReply is None: self.logger.info("Start speaking") self.currentReply= Reply(self.currentMessage) now = self.timer.getElapsed() utterance = self.currentReply.getActiveUtterance(now) # The 'is_final' from google sometimes comes 2 sec after finishing speaking # therefore, we ignore the timing of this transcription if something has been said already if e['is_final'] and utterance.hasText(): self.logger.debug(f'ignore timing: {now} use {utterance.lastUpdateTime}') utterance.setText(e['transcript'], utterance.lastUpdateTime) else: utterance.setText(e['transcript'], now) self.hugvey.eventLogger.debug("speaking: content {} \"{}\"".format(id(utterance), e['transcript'])) if not self.timer.hasMark('first_speech'): self.timer.setMark('first_speech') self.timer.setMark('last_speech') if e['is_final']: utterance.setFinished(self.timer.getElapsed()) self.hugvey.eventLogger.info("speaking: stop {}".format(id(utterance))) if self.hugvey.recorder: self.hugvey.recorder.updateTranscription(self.currentReply.getText()) def _processDirection(self, direction): """ return matching condition """ for condition in direction.conditions: if condition.isMet(self): self.logger.info("Condition is met: {0} ({2}), going to {1}".format( condition.id, direction.msgTo.id, condition.type)) self.hugvey.eventLogger.info("condition: {0}".format(condition.id)) self.hugvey.eventLogger.info("direction: {0}".format(direction.id)) direction.setMetCondition(condition) return condition return None async def _processDirections(self, directions): ':type directions: list(Direction)' chosenDirection = None metCondition = None for direction in directions: if direction.isDiversionReturn and direction.diversionHasReturned: # Prevent that returns created from the same message send you # back to a previous point in time. # self.logger.warn("Skipping double direction for diversion") continue condition = self._processDirection(direction) if not condition: continue self.addToLog(condition) self.addToLog(direction) self.currentMessage.setFinished(self.timer.getElapsed()) chosenDirection = direction metCondition = condition break isDiverging = await self._processDiversions(chosenDirection) allowReplyInterrupt = False # in some cases, conditions should be allowed to interrupt the reply if metCondition: if metCondition.type == 'timeout' and not ('onlyIfNoReply' in metCondition.vars and metCondition.vars['onlyIfNoReply']): allowReplyInterrupt = True if metCondition.usedContainsDuration is not None and metCondition.usedContainsDuration < 0.1: allowReplyInterrupt = True if not isDiverging and chosenDirection: if chosenDirection.isDiversionReturn and self.currentDiversion: for direction in self.diversionDirections: if direction.isDiversionReturn and not direction.diversionHasReturned: self.logger.info(f"Mark diversion as returned for return direction {direction.id}") direction.diversionHasReturned = True # chosenDirection.diversionHasReturned = True await self.currentDiversion.finalise(self) await self.setCurrentMessage(chosenDirection.msgTo, allowReplyInterrupt=allowReplyInterrupt) return chosenDirection async def _processDiversions(self, direction: None) -> bool: """ Process the diversions on stack. If diverging, return True, else False msgFrom and msgTo contain the source and target of a headed direction if given Else, they are None """ diverge = False activeDiversions = [] activeTimeoutDiv = None activeTimeoutLastDiv = None activeNoResponseDiv = None for diversion in self.diversions: #: :type diversion: Diversion if diversion.disabled or diversion.hasHit or diversion.type == 'interrupt': # interruptions are triggered somewhere else. continue if diversion.type == 'timeout': if diversion.params['timesOccured'] > 0: if not diversion.params['fromLastMessage']: # perhaps neater if we collect them in a list, and then sort by key, but this works just as well if not activeTimeoutDiv or activeTimeoutDiv.params['timesOccured'] > diversion.params['timesOccured']: activeTimeoutDiv = diversion else: if not activeTimeoutLastDiv or activeTimeoutLastDiv.params['timesOccured'] > diversion.params['timesOccured']: activeTimeoutLastDiv = diversion continue if diversion.type == 'no_response': if diversion.params['timesOccured'] > 0: if not activeNoResponseDiv or activeNoResponseDiv.params['timesOccured'] > diversion.params['timesOccured']: activeNoResponseDiv = diversion continue activeDiversions.append(diversion) if activeTimeoutDiv: activeDiversions.append(activeTimeoutDiv) if activeTimeoutLastDiv: activeDiversions.append(activeTimeoutLastDiv) if activeNoResponseDiv: activeDiversions.append(activeNoResponseDiv) for diversion in activeDiversions: # TODO: collect diversions and order by times + timesOccured (for timeout & no_response) d = await diversion.divergeIfNeeded(self, direction) if d: diverge = True return diverge def addToLog(self, node): self.log.append((node, self.timer.getElapsed())) if isinstance(node, Message): self.msgLog.append(node) if self.hugvey.recorder: if isinstance(node, Message): self.hugvey.recorder.log('hugvey', node.text, node.id) if isinstance(node, Diversion): self.hugvey.recorder.log('diversion',node.logInfo, node.id) if isinstance(node, Condition): self.hugvey.recorder.log('condition',node.logInfo, node.id) def logHasMsg(self, node): return node in self.msgLog def checkIfGone(self): ''' Make a guestimation if the audience has left... just really a simple timer check. If we do think so, give an error and stop the conversation ''' if not self.lastMsgFinishTime: # don't do it when hugvey is speaking return if self.timer.hasMark('last_speech') and self.timer.getElapsed('last_speech') > 30*60: self.hugvey.eventLogger.warning("Audience is quiet for too long...stopping") self.logger.warning("Audience is quiet, force END!") self._finish() def checkIfHanging(self): ''' Make a guestimation if the story is hanging at a message. Raise exception once. ''' if not self.lastMsgFinishTime or self.gaveErrorForNotContinuing: # don't do it when hugvey is speaking # or when it already gave the error for this message return diff = self.timer.getElapsed() - self.lastMsgFinishTime safeDiff = self.hugvey.command.config['story']['hugvey_critical_silence'] if 'hugvey_critical_silence' in self.hugvey.command.config['story'] else 90 if diff > safeDiff: self.hugvey.eventLogger.warning("Hugvey is quiet for very long!") self.logger.critical("Hugvey is quiet for very long!") # critical messages are forwarded to telegram self.gaveErrorForNotContinuing = True async def _renderer(self): """ every 1/10 sec. determine what needs to be done based on the current story state """ loopDuration = 0.1 # Configure fps lastTime = time.time() self.logger.debug("Start renderer") while self.isRunning: if self.isRunning is False: break # pause on timer paused await self.timer.isRunning.wait() # wait for un-pause for i in range(len(self.events)): await self._processPendingEvents() directions = self.getCurrentDirections() await self._processDirections(directions) self.checkIfGone() self.checkIfHanging() # TODO create timer event # self.commands.append({'msg':'TEST!'}) # Test stability of Central Command with deliberate crash # if self.timer.getElapsed() > 10: # raise Exception("Test exception") if not self.timer.hasMark('state_save') or self.timer.getElapsed('state_save') > 30: self.storeState() self.timer.setMark('state_save') # wait for next iteration to avoid too high CPU t = time.time() await asyncio.sleep(max(0, loopDuration - (t - lastTime))) lastTime = t self.logger.debug("Stop renderer") async def setCurrentMessage(self, message, useReply = None, allowReplyInterrupt = False): """ Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption. """ if self.currentMessage and not self.lastMsgFinishTime: self.logger.info("Interrupt playback {}".format(self.currentMessage.id)) self.hugvey.eventLogger.info("interrupt") # message is playing self.hugvey.sendCommand({ 'action': 'stop', 'id': self.currentMessage.id, }) message.uuid = shortuuid.uuid() self.currentMessage = message self.lastMsgTime = time.time() self.lastMsgFinishTime = None # to be filled in by the event self.lastMsgStartTime = None # to be filled in by the event self.gaveErrorForNotContinuing = False self.allowReplyInterrupt = allowReplyInterrupt # if not reset: self.previousReply = self.currentReply # we can use this for interrptions self.currentReply = useReply #self.currentMessage.reply # send command to already mute mic self.hugvey.sendCommand({ 'action': 'prepare', 'id': message.id }) # else: # # if we press 'save & play', it should not remember it's last reply to that msg # self.previousReply = self.currentReply # we can use this for interrptions # self.currentReply = self.currentMessage.reply self.logger.info("Current message: ({0}) \"{1}\"".format( message.id, message.getTextLabel())) if message.id != self.startMessage.id: self.addToLog(message) self.hugvey.eventLogger.info(f"message: {message.id} {message.uuid} start \"{message.getLabel()}\"") # TODO: prep events & timer etc. fn = await message.getAudioFilePath() # get duration of audio file, so the client can detect a hang of 'play' try: duration = sox.file_info.duration(fn) except Exception as e: self.hugvey.eventLogger.critical(f"error: crash when reading wave file: {fn}") self.logger.critical(f"error: crash when reading wave file: {fn}") self.logger.exception(e) duration = 10 # some default duration to have something to fall back to params = message.getParams().copy() params['vol'] = params['vol'] * self.configuration.volume if 'vol' in params else self.configuration.volume params['vol'] = "{:.4f}".format(params['vol']) params['tempo'] = (float(params['tempo']) if 'tempo' in params else 1) * (float(self.configuration.tempo_factor) if hasattr(self.configuration, 'tempo_factor') else 1) duration = float(duration) / params['tempo'] params['tempo'] = "{:.4f}".format(params['tempo']) params['pitch'] = (float(params['pitch']) if 'pitch' in params else 0)\ + (float(self.configuration.pitch_modifier) if hasattr(self.configuration, 'pitch_modifier') else 0) params['pitch'] = "{:.4f}".format(params['pitch']) # self.hugvey.google.pause() # pause STT to avoid text events while decision is made self.hugvey.sendCommand({ 'action': 'play', 'file': fn, 'id': message.id, 'params': params, 'duration': duration }) if message.lightChange is not None: self.fadeLightPreset(message.lightChange) # self.hugvey.setLightStatus(message.lightChange) # 2019-02-22 temporary disable listening while playing audio: # if self.hugvey.google is not None: # self.logger.warn("Temporary 'fix' -> stop recording") # self.hugvey.google.pause() logmsg = "Pending directions:" for direction in self.getCurrentDirections(): conditions = [c.id for c in direction.conditions] logmsg += "\n- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions) self.logger.log(LOG_BS,logmsg) # if message.id != self.startMessage.id: # self.storeState() def fadeLightPreset(self, presetNr: int): if presetNr < 0 or presetNr > 4: self.logger.critical(f"Error parsing light fade preset code '{presetNr}'") return preset = self.configuration.getLightPresets()[presetNr] self.currentLightPresetNr = presetNr self.hugvey.transitionLight(preset['intensity'], preset['fade'], preset['isSophie']) def getCurrentDirections(self): if self.currentMessage.id not in self.directionsPerMsg: return [] else: return self.directionsPerMsg[self.currentMessage.id] def getNextChapterForMsg(self, msg, canIncludeSelf = True, depth = 0): if canIncludeSelf and msg.chapterStart: self.logger.info(f"Next chapter: {msg.id}") return msg if depth >= 70: # protection against infinite loop? return None if msg.id not in self.directionsPerMsg: return None for direction in self.directionsPerMsg[msg.id]: r = self.getNextChapterForMsg(direction.msgTo, True, depth+1) if r: return r # none found return None async def run(self, customStartMsgId = None, resuming = False): self.logger.info("Starting story") if not resuming: self.hugvey.eventLogger.info("story: start") self.timer.reset() self.isRunning = True if customStartMsgId is not None: startMsg = self.get(customStartMsgId) else: startMsg = self.startMessage await self.setCurrentMessage(startMsg) else: self.hugvey.eventLogger.info(f"story: resume from {self.currentMessage}") self.isRunning = True if not self.lastMsgFinishTime and self.currentMessage: await self.setCurrentMessage(self.currentMessage) await self._renderer() def isFinished(self): if hasattr(self, 'finish_time') and self.finish_time: return time.time() - self.finish_time return False def _finish(self): """ Finish story and set hugvey to the right state """ self.finish() #stop google etc: self.hugvey.available(log=False) def finish(self): """ Finish only the story """ self.logger.info(f"Finished story for {self.hugvey.id}") self.hugvey.eventLogger.info("story: finished") self.stop() self.finish_time = time.time() self.timer.pause() if self.hugvey.google: self.hugvey.google.stop() def calculateFinishesForMsg(self, msgId, depth = 0, checked = []): """ BEWARE: checked = [] is evaluated at creation time of the method. Meaning that each call to this method which doesn't explicitly specify the checked list, relies upon the list created at parse time. This means subsequent call to the method make the list larger!! So the default should actually never be used. (found out the hard way ;-) ) """ # print(checked) if msgId in checked: # self.logger.log(LOG_BS, f"Finish for {msgId} already checked") return [] checked.append(msgId) if not msgId in self.directionsPerMsg or len(self.directionsPerMsg[msgId]) < 1: # is finish return [msgId] if depth == 400: self.logger.warn(f"Very deep hidden message to calculate finish for: msgId {msgId}") # return [] finishes = [] for d in self.directionsPerMsg[msgId]: if d.msgTo.id == msgId: continue finishes.extend(self.calculateFinishesForMsg(d.msgTo.id, depth+1, checked)) # de-duplicate before returning return list(set(finishes)) def calculateFinishesForStrands(self): for startMsgId in self.strands: msg = self.get(startMsgId) #: :type msg: Message if msg.isStart: # ignore for the beginning continue self.logger.log(LOG_BS, f"Get finishes for {startMsgId}") self.strands[startMsgId] = self.calculateFinishesForMsg(startMsgId, checked=[]) self.logger.log(LOG_BS, f"Finishes: {self.strands}") def getFinishesForMsg(self, msg): """ Find the end of strands Most often they will be 'start's so to speed up these are pre-calculated Others can be calculated on the spot returns message ids """ self.logger.debug(f"Get finishes for {msg.id} from {self.strands}") if msg.id in self.strands: return self.strands[msg.id] return self.calculateFinishesForMsg(msg.id, checked=[]) def applyTimeFactor(self, time) -> float: """ Apply the particularities of the configuration.time_factor """ time = float(time) if time < 2: # short timings are not influenced by this factor return time return time * self.configuration.time_factor def applyTimeDelay(self, time) -> float: """ Since Moscow: apparently the interval at which Google returns interim results differs per language, or we have anther cause of irregular results, either way, this screws up the short waitTimes that are crucial for the replyContains condition/diversion. Therefore, have a story configuration option with which we can extra delay to the timings (if non-zero) """ time = float(time) if time > 0: #if zero, it should always match instantly. time += self.configuration.time_extra_delay return time def getDefaultDirectionForMsg(self, msg): """ There is only a default direction (for reply contains diversion) if it has one, and only one, direction to go. If there's more, it should do nothing. """ if not msg.id in self.directionsPerMsg: # is finish return None if len(self.directionsPerMsg[msg.id]) > 1: return None # TODO: should the direction have at least a timeout condition set, or not perse? return self.directionsPerMsg[msg.id][0] @classmethod def getStateDir(self): # return "/tmp" return "./state" # day = time.strftime("%Y%m%d") # t = time.strftime("%H:%M:%S") # # self.out_folder = os.path.join(self.main_folder, day, f"{self.hv_id}", t) # if not os.path.exists(self.out_folder): # self.logger.debug(f"Create directory {self.out_folder}") # self.target_folder = os.makedirs(self.out_folder, exist_ok=True) @classmethod def getStateFilename(cls, hv_id): return os.path.join(cls.getStateDir(), f"state_hugvey{hv_id}") def storeState(self): # TODO: stop stopwatch fn = self.getStateFilename(self.hugvey.lightId) tmpfn = fn + '.tmp' self.stateSave = time.time() self.lightStateSave = self.hugvey.lightStatus with open(tmpfn, 'wb') as fp: pickle.dump(self, fp) # write atomic to disk: flush, close, rename fp.flush() os.fsync(fp.fileno()) os.rename(tmpfn, fn) duration = time.time() - self.stateSave self.logger.debug(f"saved state to {fn} in {duration}s") def hasSavedState(self): return self.hugveyHasSavedState(self.hugvey.lightId) @classmethod def hugveyHasSavedState(cls, hv_id): # print(os.path.exists(cls.getStateFilename(hv_id)), cls.getStateFilename(hv_id)) return os.path.exists(cls.getStateFilename(hv_id)) @classmethod def loadStoryFromState(cls, hugvey_state): # restart stopwatch with open(cls.getStateFilename(hugvey_state.lightId), 'rb') as fp: story = pickle.load(fp) story.hugvey = hugvey_state #: :type story: Story story.logger = mainLogger.getChild(f"{story.hugvey.id}").getChild("story") # TODO: this is not really working because it is overridden by the set-status later. # story.hugvey.setLightStatus(story.lightStateSave) story.logger.critical(f"Light preset {story.currentLightPresetNr}") if story.currentLightPresetNr is not None: story.fadeLightPreset(story.currentLightPresetNr) return story @classmethod def clearSavedState(cls, hv_id): fn = cls.getStateFilename(hv_id) if os.path.exists(fn): os.unlink(fn) mainLogger.info(f"Removed state: {fn}") # def __getstate__(self): # Copy the object's state from self.__dict__ which contains # all our instance attributes. Always use the dict.copy() # method to avoid modifying the original state. state = self.__dict__.copy() # Remove the unpicklable entries. del state['hugvey'] del state['logger'] # del state['isRunning'] return state def __setstate__(self, state): self.__dict__.update(state)