diff --git a/hugvey/central_command.py b/hugvey/central_command.py index 39096e2..735b273 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -18,6 +18,9 @@ from hugvey.voice.google import GoogleVoiceClient from hugvey.voice.player import Player from hugvey.voice.streamer import AudioStreamer import uuid +from hugvey.story import Story +import time +from _ast import Await logger = logging.getLogger("command") @@ -39,11 +42,20 @@ class CentralCommand(object): logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) - - self.hugvey_ids = [i+1 for i in range(self.config['hugveys'])] + + # load languages: + self.languages = {} + + for lang in self.config['languages']: + with open(lang['file'], 'r') as fp: + self.languages[lang['code']] = yaml.load(fp) + def commandHugvey(self, hv_id, msg): + """ + prepare command to be picked up by the sender + """ self.commandQueue.put_nowait((hv_id, msg)) def commandAllHugveys(self, msg): @@ -119,6 +131,7 @@ class CentralCommand(object): logger.debug("Message contains: {}".format(msg)) continue else: + await self.hugveys[hugvey_id].eventQueue.put(msg) pass def start(self): @@ -140,12 +153,15 @@ class HugveyState(object): Manages server connections & voice parsing etc. """ def __init__(self, id: int, command: CentralCommand): - super(HugveyState, self).__init__() self.id = id self.command = command self.logger = logging.getLogger(f"hugvey{self.id}") self.loop = asyncio.new_event_loop() self.isConfigured = False + self.eventQueue = None + self.language_code = 'en-GB' + self.story = Story(self) + self.story.setStoryData(self.command.languages[self.language_code]) def config(self, hostname, ip): self.ip = ip @@ -158,14 +174,72 @@ class HugveyState(object): self.isConfigured = True + def sendCommand(self, msg): + """ + Send message or command to hugvey + @param msg: The message to be sent. Probably a dict() + """ + self.command.commandHugvey(self.id, msg) + def start(self): + """ + Start the tasks + """ # stop on isRunning.is_set() or wait() -# self.loop.create_task(self.startAudioProcessing()) - tasks = asyncio.gather(self.startAudioProcessing(), loop=self.loop) +# self.loop.create_task(self.processAudio()) + tasks = asyncio.gather( + self.catchException(self.processAudio()), + self.catchException(self.handleEvents()), + self.catchException(self.playStory()), + loop=self.loop) self.loop.run_until_complete(tasks) # asyncio.run_coroutine_threadsafe(self._start(), self.loop) + + async def catchException(self, awaitable): + try: + await awaitable + except Exception as e: + logger.exception(e) + logger.critical(f"Hugvey restart required but not implemented yet") + + # TODO: restart + + def queueEvent(self, msg): + if 'time' not in msg: + # add time, so we can track time passed + msg['time'] = time.time() + if not self.eventQueue: + self.logger.critical("No event queue to put {}".format(msg)) + else: + # Allow for both the Hugvey Command, or the Story handle the event. + self.eventQueue.put_nowait(msg) + self.story.events.append(msg) + + async def handleEvents(self): + self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues + while self.command.isRunning.is_set(): + event = await self.eventQueue.get() + self.logger.info("Received: {}".format(event)) + + if event['event'] =='language': + self.setLanguage(event['code']) + + self.eventQueue = None + + def setLanguage(self, language_code): + if language_code not in self.command.languages: + raise Exception("Invalid language {}".format(language_code)) + + self.language_code = language_code + self.google.setLanguage(language_code) + + self.story.reset() + self.story.setStoryData(self.command.languages[language_code]) - async def startAudioProcessing(self): + async def playStory(self): + await self.story.start() + + async def processAudio(self): ''' Start the audio streamer service ''' @@ -177,16 +251,16 @@ class HugveyState(object): if self.command.debug: self.logger.warn("Debug on: Connecting Audio player") - player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) - streamer.addConsumer(player) + self.player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) + streamer.addConsumer(self.player) self.logger.info("Start Speech") - google = GoogleVoiceClient( - hugvey_id=self.id, + self.google = GoogleVoiceClient( + hugvey=self, src_rate=self.command.config['voice']['src_rate'], credential_file=self.command.config['voice']['google_credentials'], - language_code='en-US' + language_code=self.language_code ) - streamer.addConsumer(google) + streamer.addConsumer(self.google) await streamer.run() diff --git a/hugvey/narrative.py b/hugvey/event_emitter.py similarity index 100% rename from hugvey/narrative.py rename to hugvey/event_emitter.py diff --git a/hugvey/story.py b/hugvey/story.py new file mode 100644 index 0000000..d54f72d --- /dev/null +++ b/hugvey/story.py @@ -0,0 +1,341 @@ +import json +import time +import threading +import logging +import asyncio + + +logger = logging.getLogger("narrative") + +class Message(object): + def __init__(self, id, text): + self.id = id + self.text = text + self.isStart = False + self.reply = None + + @classmethod + def initFromJson(message, data, story): + msg = message(data['@id'], data['text']) + msg.isStart = data['start'] if 'start' in data else False + return msg; + + def setReply(self, text): + self.reply = text + + def hasReply(self): + return self.reply is not None + + def getReply(self): + if self.reply is None: + raise Exception("Getting reply while there is none! {0}".format(self.id)) + + return self.reply + + +class Condition(object): + """ + A condition, basic conditions are built in, custom condition can be given by + providing a custom method. + """ + def __init__(self, id): + self.id = id + self.method = None + self.vars = {} + + @classmethod + def initFromJson(conditionClass, data, story): + condition = conditionClass(data['@id']) + # TODO: should Condition be subclassed? + if data['type'] == "replyContains": + condition.method = condition._hasMetReplyContains + if data['type'] == "timeout": + condition.method = condition._hasMetTimeout + + if 'vars' in data: + condition.vars = data['vars'] + + return condition; + + def isMet(self, story): + """ + Validate if condition is met for the current story state + """ + return self.method(story) + + def _hasMetTimeout(self, story): + now = time.time() + # check if the message already finished playing + if not story.lastMsgFinishTime: + return False + + return now - story.lastMsgFinishTime >= self.vars['seconds'] + + def _hasMetReplyContains(self, story): + if not story.currentMessage.hasReply(): + return False + + if 'regex' in self.vars: + if 'regexCompiled' not in self.vars: + # Compile once, as we probably run it more than once + self.vars['regexCompiled'] = re.compile(self.vars['regex']) + result = re.match(self.vars['regexCompiled']) + if result is None: + return False + results = result.groupdict() + for captureGroup in results: + story.variables[captureGroup] = results[captureGroup] + logger.critical("Regex not implemented yet") + return False + + if 'contains' in self.vars: + if self.vars['contains'] == '*': + return True + return self.vars['contains'] in story.currentMessage.getReply() + + +class Direction(object): + """ + A condition based edge in the story graph + """ + def __init__(self, id, msgFrom: Message, msgTo: Message): + self.id = id + self.msgFrom = msgFrom + self.msgTo = msgTo + self.conditions = [] + + def addCondition(self, condition: Condition): + self.conditions.append(condition) + + @classmethod + def initFromJson(direction, data, story): + msgFrom = story.get(data['source']) + msgTo = story.get(data['target']) + direction = direction(data['@id'], msgFrom, msgTo) + if 'conditions' in data: + for conditionId in data['conditions']: + c = story.get(conditionId) + direction.addCondition(c) + return direction; + + +class Interruption(object): + """ + An Interruption. Used to catch events outside of story flow. + """ + def __init__(self, id): + self.id = id + self.conditions = [] + + def addCondition(self, condition: Condition): + self.conditions.append(condition) + + @classmethod + def initFromJson(interruptionClass, data, story): + interrupt = interruptionClass(data['@id']) + if 'conditions' in data: + for conditionId in data['conditions']: + c = story.get(conditionId) + interrupt.addCondition(c) + return interrupt; + +storyClasses = { + 'Msg': Message, + 'Direction': Direction, + 'Condition': Condition, + 'Interruption': Interruption, +} + +class Story(object): + """Story represents and manages a story/narrative flow""" + #TODO should we separate 'narrative' (the graph) from the story (the current user flow) + def __init__(self, hugvey_state): + super(Story, self).__init__() + self.hugvey = hugvey_state + + self.events = [] # queue of received events + self.commands = [] # queue of commands to send + self.log = [] # all nodes/elements that are triggered + self.currentMessage = None + + + def setStoryData(self, story_data): + """ + Parse self.data into a working story engine + """ + self.data = story_data + + # keep to be able to reset it in the end + currentId = self.currentMessage.id if self.currentMessage else None + + self.elements = {} + self.interruptions = [] + self.directionsPerMsg = {} + self.startMessage = None # The entrypoint to the graph + self.reset() + + for el in self.data: + className = storyClasses[el['@type']] + obj = className.initFromJson(el, self) + self.add(obj) + + logger.debug(self.elements) + logger.debug(self.directionsPerMsg) + + if currentId: + self.currentMessage = self.get(currentId) + if self.currentMessage: + logger.info(f"Reinstantiated current message: {self.currentMessage.id}") + else: + logger.warn("Could not reinstatiate current message. Starting over") + + def reset(self): + self.startTime = time.time() + self.currentMessage = None # currently active message, determines active listeners etc. + self.lastMsgTime = None + self.lastSpeechStartTime = None + self.lastSpeechEndTime = None + self.variables = {} # captured variables from replies + + def add(self, obj): + if obj.id in self.elements: + # print(obj) + raise Exception("Duplicate id for ''".format(obj.id)) + + if type(obj) == Message and obj.isStart: + self.startMessage = obj + + self.elements[obj.id] = obj + + if type(obj) == Interruption: + self.interruptions.append(obj) + + if type(obj) == Direction: + if obj.msgFrom.id not in self.directionsPerMsg: + self.directionsPerMsg[obj.msgFrom.id] = [] + self.directionsPerMsg[obj.msgFrom.id].append(obj) + + def get(self, id): + """ + Get a story element by its id + """ + if id in self.elements: + return self.elements[id] + return None + + + def stop(self): + logger.info("Stop Story") + if self.isRunning: + self.isRunning = False + + + def _processPendingEvents(self): + # Gather events: + nr = len(self.events) + for i in range(nr): + e = self.events.pop(0) + logger.info("handle '{}'".format( e )) + if e['event'] == "exit": + self.stop() + if e['event'] == 'connect': + # a client connected. Shold only happen in the beginning or in case of error + # that is, until we have a 'reset' or 'start' event. + self.setCurrentMessage(self.currentMessage) # reinitiate current message + + if e['event'] == "playbackFinish": + if e['msgId'] == self.currentMessage.id: + self.lastMsgFinishTime = time.time() + + if self.currentMessage.id not in self.directionsPerMsg: + logger.info("THE END!") + self.stop() + return + + if e['event'] == 'speech': + # log if somebody starts speaking + if self.lastSpeechStartTime is None or self.lastSpeechStartTime < self.lastMsgTime: + self.lastSpeechStartTime = e['time'] + + if e['is_final']: + # final result + self.lastSpeechEndTime = e['time'] + self.currentMessage.setReply(e['transcript']) + + def _processDirections(self, directions): + for direction in directions: + for condition in direction.conditions: + if condition.isMet(self): + logger.info("Condition is met: {0}, going to {1}".format(condition.id, direction.msgTo.id)) + self.log.append(condition) + self.log.append(direction) + self.setCurrentMessage(direction.msgTo) + return direction + + async def _renderer(self): + """ + every 1/10 sec. determine what needs to be done based on the current story state + """ + loopDuration = 0.1 # Configure fps + lastTime = time.time() + logger.info("Start renderer") + while self.isRunning: + if self.isRunning is False: + break + + for i in range(len(self.events)): + self._processPendingEvents() + + if self.currentMessage.id not in self.directionsPerMsg: + # TODO: finish! + + pass + + directions = self.getCurrentDirections() + self._processDirections(directions) + + # TODO create timer event + # self.commands.append({'msg':'TEST!'}) + + # wait for next iteration to avoid too high CPU + t = time.time() + await asyncio.sleep(max(0, loopDuration - (t - lastTime))) + lastTime = t + + logger.info("Stop renderer") + + def setCurrentMessage(self, message): + self.currentMessage = message + self.lastMsgTime = time.time() + self.lastMsgFinishTime = None # to be filled in by the event + + logger.info("Current message: ({0}) \"{1}\"".format(message.id, message.text)) + # TODO: prep events & timer etc. + self.hugvey.sendCommand({ + 'action': 'play', + 'msg': message.text, + 'id': message.id, + }) + + logger.debug("Pending directions: ") + + for direction in self.getCurrentDirections(): + conditions = [c.id for c in direction.conditions] + logger.debug("- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions)) + + + def getCurrentDirections(self): + if self.currentMessage.id not in self.directionsPerMsg: + return [] + else: + return self.directionsPerMsg[self.currentMessage.id] + + async def start(self): + logger.info("Starting story") + self.startTime = time.time() + self.isRunning = True + self.setCurrentMessage(self.startMessage) + await self._renderer() + + + diff --git a/hugvey/voice/google.py b/hugvey/voice/google.py index 883156f..1e50ee7 100644 --- a/hugvey/voice/google.py +++ b/hugvey/voice/google.py @@ -20,10 +20,13 @@ import uuid logger = logging.getLogger("voice") +class RequireRestart(Exception): + pass class GoogleVoiceClient(object): - def __init__(self, hugvey_id, src_rate, credential_file, language_code = "en_GB"): + def __init__(self, hugvey, src_rate, credential_file, language_code = "en_GB"): self.src_rate = src_rate + self.hugvey = hugvey self.language_code = language_code os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file @@ -32,18 +35,28 @@ class GoogleVoiceClient(object): self.isRunning = False self.target_rate = 16000 self.cv_laststate = None + self.restart = False - self.task = threading.Thread(target=self.run, name=f"voice-hv{hugvey_id}" + str(uuid.uuid1())) + self.task = threading.Thread(target=self.run, name=f"hugvey#{self.hugvey.id}v") self.task.setDaemon(True) self.task.start() def generator(self): while self.isRunning: yield self.buffer.get() + + def setLanguage(self, language_code): + if self.language_code == language_code: + return + + logger.info("Change language from {} to {}".format(self.language_code, language_code)) + self.language_code = language_code + self.restart = True def run(self): self.isRunning = True + while self.isRunning: try: self.speech_client = speech.SpeechClient() @@ -97,9 +110,23 @@ class GoogleVoiceClient(object): else: logger.info(f"Text: {transcript}") + msg = { + "event": "speech", + "is_final": result.is_final, + "transcript": transcript.strip(), + } + + self.hugvey.queueEvent(msg) + + if self.restart: + self.restart = False + raise RequireRestart("Restart required") + if not self.isRunning: logger.warn("Stopping voice loop") break + except RequireRestart as e: + logger.warn("Restart Google Voice. Language: {}".format(self.language_code)) except Exception as e: logger.critical(f"Crashed Google Voice: {e}") diff --git a/requirements.server.txt b/requirements.server.txt new file mode 100644 index 0000000..667b814 --- /dev/null +++ b/requirements.server.txt @@ -0,0 +1,6 @@ +pyzmq +pyaudio +coloredlogs +pyyaml +audioop +google-cloud-speech \ No newline at end of file diff --git a/server_config.yml b/server_config.yml index 1f45314..a6967c4 100644 --- a/server_config.yml +++ b/server_config.yml @@ -7,4 +7,12 @@ voice: port: 4444 chunk: 2972 google_credentials: "/home/ruben/Documents/Projecten/2018/Hugvey/test_googlespeech/My First Project-0c7833e0d5fa.json" -hugveys: 3 \ No newline at end of file +hugveys: 3 +languages: + - code: en-GB + file: story_en.json + - code: de-DE + file: story_de.json + - code: fr-FR + file: story_fr.json + \ No newline at end of file diff --git a/story_de.json b/story_de.json new file mode 100644 index 0000000..c60d5a1 --- /dev/null +++ b/story_de.json @@ -0,0 +1,128 @@ +[ + { + "@type": "Msg", + "@id": "n1", + "text": "Welcome", + "start": true, + "file": "shallweplayagame.wav" + }, + { + "@type": "Msg", + "@id": "n2", + "text": "How have you been?", + "file": "sothegamegoeslikethis.wav" + }, + { + "@type": "Msg", + "@id": "n4", + "text": "That's a shame!", + "file": "sleep.wav" + }, + { + "@type": "Msg", + "@id": "n5", + "text": "Great to hear!", + "file": "retreat.wav" + }, + { + "@type": "Msg", + "@id": "n3", + "text": "Bye!", + "file": "istart.wav" + }, + { + "@id": "c1", + "@type": "Condition", + "type": "replyContains", + "vars": {"contains": "Well"} + }, + { + "@id": "c5", + "@type": "Condition", + "type": "replyContains", + "label": "Match any response", + "vars": {"contains": "*"} + }, + { + "@id": "c4", + "@type": "Condition", + "type": "replyContains", + "vars": {"regex": "I want more (?.*)"} + }, + { + "@id": "c2", + "@type": "Condition", + "type": "timeout", + "label": "General response timeout", + "vars": {"seconds": 10.0} + }, + { + "@id": "c3", + "@type": "Condition", + "type": "timeout", + "vars": {"seconds": 0.8} + }, + { + "@id": "d5", + "@type": "Direction", + "source": "n1", + "target": "n2", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "d1", + "@type": "Direction", + "source": "n2", + "target": "n4", + "order": 1, + "conditions": [ + "c5","c2" + ] + }, + { + "@id": "d2", + "@type": "Direction", + "source": "n2", + "target": "n5", + "order": 2, + "conditions": [ + "c2" + ] + }, + { + "@id": "d3", + "@type": "Direction", + "source": "n5", + "target": "n3", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "d4", + "@type": "Direction", + "source": "n4", + "target": "n3", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "i1", + "@type": "Interruption", + "forEvent": "laugh", + "conditions": [ + "c1" + ] + }, + { + "@id": "i2", + "@type": "Interruption", + "forEvent": "laugh", + "conditions": [ + "c1" + ] + } +] diff --git a/story_en.json b/story_en.json new file mode 100644 index 0000000..c60d5a1 --- /dev/null +++ b/story_en.json @@ -0,0 +1,128 @@ +[ + { + "@type": "Msg", + "@id": "n1", + "text": "Welcome", + "start": true, + "file": "shallweplayagame.wav" + }, + { + "@type": "Msg", + "@id": "n2", + "text": "How have you been?", + "file": "sothegamegoeslikethis.wav" + }, + { + "@type": "Msg", + "@id": "n4", + "text": "That's a shame!", + "file": "sleep.wav" + }, + { + "@type": "Msg", + "@id": "n5", + "text": "Great to hear!", + "file": "retreat.wav" + }, + { + "@type": "Msg", + "@id": "n3", + "text": "Bye!", + "file": "istart.wav" + }, + { + "@id": "c1", + "@type": "Condition", + "type": "replyContains", + "vars": {"contains": "Well"} + }, + { + "@id": "c5", + "@type": "Condition", + "type": "replyContains", + "label": "Match any response", + "vars": {"contains": "*"} + }, + { + "@id": "c4", + "@type": "Condition", + "type": "replyContains", + "vars": {"regex": "I want more (?.*)"} + }, + { + "@id": "c2", + "@type": "Condition", + "type": "timeout", + "label": "General response timeout", + "vars": {"seconds": 10.0} + }, + { + "@id": "c3", + "@type": "Condition", + "type": "timeout", + "vars": {"seconds": 0.8} + }, + { + "@id": "d5", + "@type": "Direction", + "source": "n1", + "target": "n2", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "d1", + "@type": "Direction", + "source": "n2", + "target": "n4", + "order": 1, + "conditions": [ + "c5","c2" + ] + }, + { + "@id": "d2", + "@type": "Direction", + "source": "n2", + "target": "n5", + "order": 2, + "conditions": [ + "c2" + ] + }, + { + "@id": "d3", + "@type": "Direction", + "source": "n5", + "target": "n3", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "d4", + "@type": "Direction", + "source": "n4", + "target": "n3", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "i1", + "@type": "Interruption", + "forEvent": "laugh", + "conditions": [ + "c1" + ] + }, + { + "@id": "i2", + "@type": "Interruption", + "forEvent": "laugh", + "conditions": [ + "c1" + ] + } +] diff --git a/story_fr.json b/story_fr.json new file mode 100644 index 0000000..c60d5a1 --- /dev/null +++ b/story_fr.json @@ -0,0 +1,128 @@ +[ + { + "@type": "Msg", + "@id": "n1", + "text": "Welcome", + "start": true, + "file": "shallweplayagame.wav" + }, + { + "@type": "Msg", + "@id": "n2", + "text": "How have you been?", + "file": "sothegamegoeslikethis.wav" + }, + { + "@type": "Msg", + "@id": "n4", + "text": "That's a shame!", + "file": "sleep.wav" + }, + { + "@type": "Msg", + "@id": "n5", + "text": "Great to hear!", + "file": "retreat.wav" + }, + { + "@type": "Msg", + "@id": "n3", + "text": "Bye!", + "file": "istart.wav" + }, + { + "@id": "c1", + "@type": "Condition", + "type": "replyContains", + "vars": {"contains": "Well"} + }, + { + "@id": "c5", + "@type": "Condition", + "type": "replyContains", + "label": "Match any response", + "vars": {"contains": "*"} + }, + { + "@id": "c4", + "@type": "Condition", + "type": "replyContains", + "vars": {"regex": "I want more (?.*)"} + }, + { + "@id": "c2", + "@type": "Condition", + "type": "timeout", + "label": "General response timeout", + "vars": {"seconds": 10.0} + }, + { + "@id": "c3", + "@type": "Condition", + "type": "timeout", + "vars": {"seconds": 0.8} + }, + { + "@id": "d5", + "@type": "Direction", + "source": "n1", + "target": "n2", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "d1", + "@type": "Direction", + "source": "n2", + "target": "n4", + "order": 1, + "conditions": [ + "c5","c2" + ] + }, + { + "@id": "d2", + "@type": "Direction", + "source": "n2", + "target": "n5", + "order": 2, + "conditions": [ + "c2" + ] + }, + { + "@id": "d3", + "@type": "Direction", + "source": "n5", + "target": "n3", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "d4", + "@type": "Direction", + "source": "n4", + "target": "n3", + "conditions": [ + "c2","c5" + ] + }, + { + "@id": "i1", + "@type": "Interruption", + "forEvent": "laugh", + "conditions": [ + "c1" + ] + }, + { + "@id": "i2", + "@type": "Interruption", + "forEvent": "laugh", + "conditions": [ + "c1" + ] + } +]