Reimplemented existing story line

This commit is contained in:
Ruben van de Ven 2019-01-18 12:42:50 +01:00
parent e8bfa8a6da
commit 13b61225d7
9 changed files with 855 additions and 15 deletions

View file

@ -18,6 +18,9 @@ from hugvey.voice.google import GoogleVoiceClient
from hugvey.voice.player import Player
from hugvey.voice.streamer import AudioStreamer
import uuid
from hugvey.story import Story
import time
from _ast import Await
logger = logging.getLogger("command")
@ -39,11 +42,20 @@ class CentralCommand(object):
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
self.hugvey_ids = [i+1 for i in range(self.config['hugveys'])]
# load languages:
self.languages = {}
for lang in self.config['languages']:
with open(lang['file'], 'r') as fp:
self.languages[lang['code']] = yaml.load(fp)
def commandHugvey(self, hv_id, msg):
"""
prepare command to be picked up by the sender
"""
self.commandQueue.put_nowait((hv_id, msg))
def commandAllHugveys(self, msg):
@ -119,6 +131,7 @@ class CentralCommand(object):
logger.debug("Message contains: {}".format(msg))
continue
else:
await self.hugveys[hugvey_id].eventQueue.put(msg)
pass
def start(self):
@ -140,12 +153,15 @@ class HugveyState(object):
Manages server connections & voice parsing etc.
"""
def __init__(self, id: int, command: CentralCommand):
super(HugveyState, self).__init__()
self.id = id
self.command = command
self.logger = logging.getLogger(f"hugvey{self.id}")
self.loop = asyncio.new_event_loop()
self.isConfigured = False
self.eventQueue = None
self.language_code = 'en-GB'
self.story = Story(self)
self.story.setStoryData(self.command.languages[self.language_code])
def config(self, hostname, ip):
self.ip = ip
@ -158,14 +174,72 @@ class HugveyState(object):
self.isConfigured = True
def sendCommand(self, msg):
"""
Send message or command to hugvey
@param msg: The message to be sent. Probably a dict()
"""
self.command.commandHugvey(self.id, msg)
def start(self):
"""
Start the tasks
"""
# stop on isRunning.is_set() or wait()
# self.loop.create_task(self.startAudioProcessing())
tasks = asyncio.gather(self.startAudioProcessing(), loop=self.loop)
# self.loop.create_task(self.processAudio())
tasks = asyncio.gather(
self.catchException(self.processAudio()),
self.catchException(self.handleEvents()),
self.catchException(self.playStory()),
loop=self.loop)
self.loop.run_until_complete(tasks)
# asyncio.run_coroutine_threadsafe(self._start(), self.loop)
async def catchException(self, awaitable):
try:
await awaitable
except Exception as e:
logger.exception(e)
logger.critical(f"Hugvey restart required but not implemented yet")
# TODO: restart
def queueEvent(self, msg):
if 'time' not in msg:
# add time, so we can track time passed
msg['time'] = time.time()
if not self.eventQueue:
self.logger.critical("No event queue to put {}".format(msg))
else:
# Allow for both the Hugvey Command, or the Story handle the event.
self.eventQueue.put_nowait(msg)
self.story.events.append(msg)
async def handleEvents(self):
self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues
while self.command.isRunning.is_set():
event = await self.eventQueue.get()
self.logger.info("Received: {}".format(event))
if event['event'] =='language':
self.setLanguage(event['code'])
self.eventQueue = None
def setLanguage(self, language_code):
if language_code not in self.command.languages:
raise Exception("Invalid language {}".format(language_code))
self.language_code = language_code
self.google.setLanguage(language_code)
self.story.reset()
self.story.setStoryData(self.command.languages[language_code])
async def startAudioProcessing(self):
async def playStory(self):
await self.story.start()
async def processAudio(self):
'''
Start the audio streamer service
'''
@ -177,16 +251,16 @@ class HugveyState(object):
if self.command.debug:
self.logger.warn("Debug on: Connecting Audio player")
player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
streamer.addConsumer(player)
self.player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
streamer.addConsumer(self.player)
self.logger.info("Start Speech")
google = GoogleVoiceClient(
hugvey_id=self.id,
self.google = GoogleVoiceClient(
hugvey=self,
src_rate=self.command.config['voice']['src_rate'],
credential_file=self.command.config['voice']['google_credentials'],
language_code='en-US'
language_code=self.language_code
)
streamer.addConsumer(google)
streamer.addConsumer(self.google)
await streamer.run()

341
hugvey/story.py Normal file
View file

@ -0,0 +1,341 @@
import json
import time
import threading
import logging
import asyncio
logger = logging.getLogger("narrative")
class Message(object):
def __init__(self, id, text):
self.id = id
self.text = text
self.isStart = False
self.reply = None
@classmethod
def initFromJson(message, data, story):
msg = message(data['@id'], data['text'])
msg.isStart = data['start'] if 'start' in data else False
return msg;
def setReply(self, text):
self.reply = text
def hasReply(self):
return self.reply is not None
def getReply(self):
if self.reply is None:
raise Exception("Getting reply while there is none! {0}".format(self.id))
return self.reply
class Condition(object):
"""
A condition, basic conditions are built in, custom condition can be given by
providing a custom method.
"""
def __init__(self, id):
self.id = id
self.method = None
self.vars = {}
@classmethod
def initFromJson(conditionClass, data, story):
condition = conditionClass(data['@id'])
# TODO: should Condition be subclassed?
if data['type'] == "replyContains":
condition.method = condition._hasMetReplyContains
if data['type'] == "timeout":
condition.method = condition._hasMetTimeout
if 'vars' in data:
condition.vars = data['vars']
return condition;
def isMet(self, story):
"""
Validate if condition is met for the current story state
"""
return self.method(story)
def _hasMetTimeout(self, story):
now = time.time()
# check if the message already finished playing
if not story.lastMsgFinishTime:
return False
return now - story.lastMsgFinishTime >= self.vars['seconds']
def _hasMetReplyContains(self, story):
if not story.currentMessage.hasReply():
return False
if 'regex' in self.vars:
if 'regexCompiled' not in self.vars:
# Compile once, as we probably run it more than once
self.vars['regexCompiled'] = re.compile(self.vars['regex'])
result = re.match(self.vars['regexCompiled'])
if result is None:
return False
results = result.groupdict()
for captureGroup in results:
story.variables[captureGroup] = results[captureGroup]
logger.critical("Regex not implemented yet")
return False
if 'contains' in self.vars:
if self.vars['contains'] == '*':
return True
return self.vars['contains'] in story.currentMessage.getReply()
class Direction(object):
"""
A condition based edge in the story graph
"""
def __init__(self, id, msgFrom: Message, msgTo: Message):
self.id = id
self.msgFrom = msgFrom
self.msgTo = msgTo
self.conditions = []
def addCondition(self, condition: Condition):
self.conditions.append(condition)
@classmethod
def initFromJson(direction, data, story):
msgFrom = story.get(data['source'])
msgTo = story.get(data['target'])
direction = direction(data['@id'], msgFrom, msgTo)
if 'conditions' in data:
for conditionId in data['conditions']:
c = story.get(conditionId)
direction.addCondition(c)
return direction;
class Interruption(object):
"""
An Interruption. Used to catch events outside of story flow.
"""
def __init__(self, id):
self.id = id
self.conditions = []
def addCondition(self, condition: Condition):
self.conditions.append(condition)
@classmethod
def initFromJson(interruptionClass, data, story):
interrupt = interruptionClass(data['@id'])
if 'conditions' in data:
for conditionId in data['conditions']:
c = story.get(conditionId)
interrupt.addCondition(c)
return interrupt;
storyClasses = {
'Msg': Message,
'Direction': Direction,
'Condition': Condition,
'Interruption': Interruption,
}
class Story(object):
"""Story represents and manages a story/narrative flow"""
#TODO should we separate 'narrative' (the graph) from the story (the current user flow)
def __init__(self, hugvey_state):
super(Story, self).__init__()
self.hugvey = hugvey_state
self.events = [] # queue of received events
self.commands = [] # queue of commands to send
self.log = [] # all nodes/elements that are triggered
self.currentMessage = None
def setStoryData(self, story_data):
"""
Parse self.data into a working story engine
"""
self.data = story_data
# keep to be able to reset it in the end
currentId = self.currentMessage.id if self.currentMessage else None
self.elements = {}
self.interruptions = []
self.directionsPerMsg = {}
self.startMessage = None # The entrypoint to the graph
self.reset()
for el in self.data:
className = storyClasses[el['@type']]
obj = className.initFromJson(el, self)
self.add(obj)
logger.debug(self.elements)
logger.debug(self.directionsPerMsg)
if currentId:
self.currentMessage = self.get(currentId)
if self.currentMessage:
logger.info(f"Reinstantiated current message: {self.currentMessage.id}")
else:
logger.warn("Could not reinstatiate current message. Starting over")
def reset(self):
self.startTime = time.time()
self.currentMessage = None # currently active message, determines active listeners etc.
self.lastMsgTime = None
self.lastSpeechStartTime = None
self.lastSpeechEndTime = None
self.variables = {} # captured variables from replies
def add(self, obj):
if obj.id in self.elements:
# print(obj)
raise Exception("Duplicate id for ''".format(obj.id))
if type(obj) == Message and obj.isStart:
self.startMessage = obj
self.elements[obj.id] = obj
if type(obj) == Interruption:
self.interruptions.append(obj)
if type(obj) == Direction:
if obj.msgFrom.id not in self.directionsPerMsg:
self.directionsPerMsg[obj.msgFrom.id] = []
self.directionsPerMsg[obj.msgFrom.id].append(obj)
def get(self, id):
"""
Get a story element by its id
"""
if id in self.elements:
return self.elements[id]
return None
def stop(self):
logger.info("Stop Story")
if self.isRunning:
self.isRunning = False
def _processPendingEvents(self):
# Gather events:
nr = len(self.events)
for i in range(nr):
e = self.events.pop(0)
logger.info("handle '{}'".format( e ))
if e['event'] == "exit":
self.stop()
if e['event'] == 'connect':
# a client connected. Shold only happen in the beginning or in case of error
# that is, until we have a 'reset' or 'start' event.
self.setCurrentMessage(self.currentMessage) # reinitiate current message
if e['event'] == "playbackFinish":
if e['msgId'] == self.currentMessage.id:
self.lastMsgFinishTime = time.time()
if self.currentMessage.id not in self.directionsPerMsg:
logger.info("THE END!")
self.stop()
return
if e['event'] == 'speech':
# log if somebody starts speaking
if self.lastSpeechStartTime is None or self.lastSpeechStartTime < self.lastMsgTime:
self.lastSpeechStartTime = e['time']
if e['is_final']:
# final result
self.lastSpeechEndTime = e['time']
self.currentMessage.setReply(e['transcript'])
def _processDirections(self, directions):
for direction in directions:
for condition in direction.conditions:
if condition.isMet(self):
logger.info("Condition is met: {0}, going to {1}".format(condition.id, direction.msgTo.id))
self.log.append(condition)
self.log.append(direction)
self.setCurrentMessage(direction.msgTo)
return direction
async def _renderer(self):
"""
every 1/10 sec. determine what needs to be done based on the current story state
"""
loopDuration = 0.1 # Configure fps
lastTime = time.time()
logger.info("Start renderer")
while self.isRunning:
if self.isRunning is False:
break
for i in range(len(self.events)):
self._processPendingEvents()
if self.currentMessage.id not in self.directionsPerMsg:
# TODO: finish!
pass
directions = self.getCurrentDirections()
self._processDirections(directions)
# TODO create timer event
# self.commands.append({'msg':'TEST!'})
# wait for next iteration to avoid too high CPU
t = time.time()
await asyncio.sleep(max(0, loopDuration - (t - lastTime)))
lastTime = t
logger.info("Stop renderer")
def setCurrentMessage(self, message):
self.currentMessage = message
self.lastMsgTime = time.time()
self.lastMsgFinishTime = None # to be filled in by the event
logger.info("Current message: ({0}) \"{1}\"".format(message.id, message.text))
# TODO: prep events & timer etc.
self.hugvey.sendCommand({
'action': 'play',
'msg': message.text,
'id': message.id,
})
logger.debug("Pending directions: ")
for direction in self.getCurrentDirections():
conditions = [c.id for c in direction.conditions]
logger.debug("- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions))
def getCurrentDirections(self):
if self.currentMessage.id not in self.directionsPerMsg:
return []
else:
return self.directionsPerMsg[self.currentMessage.id]
async def start(self):
logger.info("Starting story")
self.startTime = time.time()
self.isRunning = True
self.setCurrentMessage(self.startMessage)
await self._renderer()

View file

@ -20,10 +20,13 @@ import uuid
logger = logging.getLogger("voice")
class RequireRestart(Exception):
pass
class GoogleVoiceClient(object):
def __init__(self, hugvey_id, src_rate, credential_file, language_code = "en_GB"):
def __init__(self, hugvey, src_rate, credential_file, language_code = "en_GB"):
self.src_rate = src_rate
self.hugvey = hugvey
self.language_code = language_code
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file
@ -32,18 +35,28 @@ class GoogleVoiceClient(object):
self.isRunning = False
self.target_rate = 16000
self.cv_laststate = None
self.restart = False
self.task = threading.Thread(target=self.run, name=f"voice-hv{hugvey_id}" + str(uuid.uuid1()))
self.task = threading.Thread(target=self.run, name=f"hugvey#{self.hugvey.id}v")
self.task.setDaemon(True)
self.task.start()
def generator(self):
while self.isRunning:
yield self.buffer.get()
def setLanguage(self, language_code):
if self.language_code == language_code:
return
logger.info("Change language from {} to {}".format(self.language_code, language_code))
self.language_code = language_code
self.restart = True
def run(self):
self.isRunning = True
while self.isRunning:
try:
self.speech_client = speech.SpeechClient()
@ -97,9 +110,23 @@ class GoogleVoiceClient(object):
else:
logger.info(f"Text: {transcript}")
msg = {
"event": "speech",
"is_final": result.is_final,
"transcript": transcript.strip(),
}
self.hugvey.queueEvent(msg)
if self.restart:
self.restart = False
raise RequireRestart("Restart required")
if not self.isRunning:
logger.warn("Stopping voice loop")
break
except RequireRestart as e:
logger.warn("Restart Google Voice. Language: {}".format(self.language_code))
except Exception as e:
logger.critical(f"Crashed Google Voice: {e}")

6
requirements.server.txt Normal file
View file

@ -0,0 +1,6 @@
pyzmq
pyaudio
coloredlogs
pyyaml
audioop
google-cloud-speech

View file

@ -7,4 +7,12 @@ voice:
port: 4444
chunk: 2972
google_credentials: "/home/ruben/Documents/Projecten/2018/Hugvey/test_googlespeech/My First Project-0c7833e0d5fa.json"
hugveys: 3
hugveys: 3
languages:
- code: en-GB
file: story_en.json
- code: de-DE
file: story_de.json
- code: fr-FR
file: story_fr.json

128
story_de.json Normal file
View file

@ -0,0 +1,128 @@
[
{
"@type": "Msg",
"@id": "n1",
"text": "Welcome",
"start": true,
"file": "shallweplayagame.wav"
},
{
"@type": "Msg",
"@id": "n2",
"text": "How have you been?",
"file": "sothegamegoeslikethis.wav"
},
{
"@type": "Msg",
"@id": "n4",
"text": "That's a shame!",
"file": "sleep.wav"
},
{
"@type": "Msg",
"@id": "n5",
"text": "Great to hear!",
"file": "retreat.wav"
},
{
"@type": "Msg",
"@id": "n3",
"text": "Bye!",
"file": "istart.wav"
},
{
"@id": "c1",
"@type": "Condition",
"type": "replyContains",
"vars": {"contains": "Well"}
},
{
"@id": "c5",
"@type": "Condition",
"type": "replyContains",
"label": "Match any response",
"vars": {"contains": "*"}
},
{
"@id": "c4",
"@type": "Condition",
"type": "replyContains",
"vars": {"regex": "I want more (?<whatIwant>.*)"}
},
{
"@id": "c2",
"@type": "Condition",
"type": "timeout",
"label": "General response timeout",
"vars": {"seconds": 10.0}
},
{
"@id": "c3",
"@type": "Condition",
"type": "timeout",
"vars": {"seconds": 0.8}
},
{
"@id": "d5",
"@type": "Direction",
"source": "n1",
"target": "n2",
"conditions": [
"c2","c5"
]
},
{
"@id": "d1",
"@type": "Direction",
"source": "n2",
"target": "n4",
"order": 1,
"conditions": [
"c5","c2"
]
},
{
"@id": "d2",
"@type": "Direction",
"source": "n2",
"target": "n5",
"order": 2,
"conditions": [
"c2"
]
},
{
"@id": "d3",
"@type": "Direction",
"source": "n5",
"target": "n3",
"conditions": [
"c2","c5"
]
},
{
"@id": "d4",
"@type": "Direction",
"source": "n4",
"target": "n3",
"conditions": [
"c2","c5"
]
},
{
"@id": "i1",
"@type": "Interruption",
"forEvent": "laugh",
"conditions": [
"c1"
]
},
{
"@id": "i2",
"@type": "Interruption",
"forEvent": "laugh",
"conditions": [
"c1"
]
}
]

128
story_en.json Normal file
View file

@ -0,0 +1,128 @@
[
{
"@type": "Msg",
"@id": "n1",
"text": "Welcome",
"start": true,
"file": "shallweplayagame.wav"
},
{
"@type": "Msg",
"@id": "n2",
"text": "How have you been?",
"file": "sothegamegoeslikethis.wav"
},
{
"@type": "Msg",
"@id": "n4",
"text": "That's a shame!",
"file": "sleep.wav"
},
{
"@type": "Msg",
"@id": "n5",
"text": "Great to hear!",
"file": "retreat.wav"
},
{
"@type": "Msg",
"@id": "n3",
"text": "Bye!",
"file": "istart.wav"
},
{
"@id": "c1",
"@type": "Condition",
"type": "replyContains",
"vars": {"contains": "Well"}
},
{
"@id": "c5",
"@type": "Condition",
"type": "replyContains",
"label": "Match any response",
"vars": {"contains": "*"}
},
{
"@id": "c4",
"@type": "Condition",
"type": "replyContains",
"vars": {"regex": "I want more (?<whatIwant>.*)"}
},
{
"@id": "c2",
"@type": "Condition",
"type": "timeout",
"label": "General response timeout",
"vars": {"seconds": 10.0}
},
{
"@id": "c3",
"@type": "Condition",
"type": "timeout",
"vars": {"seconds": 0.8}
},
{
"@id": "d5",
"@type": "Direction",
"source": "n1",
"target": "n2",
"conditions": [
"c2","c5"
]
},
{
"@id": "d1",
"@type": "Direction",
"source": "n2",
"target": "n4",
"order": 1,
"conditions": [
"c5","c2"
]
},
{
"@id": "d2",
"@type": "Direction",
"source": "n2",
"target": "n5",
"order": 2,
"conditions": [
"c2"
]
},
{
"@id": "d3",
"@type": "Direction",
"source": "n5",
"target": "n3",
"conditions": [
"c2","c5"
]
},
{
"@id": "d4",
"@type": "Direction",
"source": "n4",
"target": "n3",
"conditions": [
"c2","c5"
]
},
{
"@id": "i1",
"@type": "Interruption",
"forEvent": "laugh",
"conditions": [
"c1"
]
},
{
"@id": "i2",
"@type": "Interruption",
"forEvent": "laugh",
"conditions": [
"c1"
]
}
]

128
story_fr.json Normal file
View file

@ -0,0 +1,128 @@
[
{
"@type": "Msg",
"@id": "n1",
"text": "Welcome",
"start": true,
"file": "shallweplayagame.wav"
},
{
"@type": "Msg",
"@id": "n2",
"text": "How have you been?",
"file": "sothegamegoeslikethis.wav"
},
{
"@type": "Msg",
"@id": "n4",
"text": "That's a shame!",
"file": "sleep.wav"
},
{
"@type": "Msg",
"@id": "n5",
"text": "Great to hear!",
"file": "retreat.wav"
},
{
"@type": "Msg",
"@id": "n3",
"text": "Bye!",
"file": "istart.wav"
},
{
"@id": "c1",
"@type": "Condition",
"type": "replyContains",
"vars": {"contains": "Well"}
},
{
"@id": "c5",
"@type": "Condition",
"type": "replyContains",
"label": "Match any response",
"vars": {"contains": "*"}
},
{
"@id": "c4",
"@type": "Condition",
"type": "replyContains",
"vars": {"regex": "I want more (?<whatIwant>.*)"}
},
{
"@id": "c2",
"@type": "Condition",
"type": "timeout",
"label": "General response timeout",
"vars": {"seconds": 10.0}
},
{
"@id": "c3",
"@type": "Condition",
"type": "timeout",
"vars": {"seconds": 0.8}
},
{
"@id": "d5",
"@type": "Direction",
"source": "n1",
"target": "n2",
"conditions": [
"c2","c5"
]
},
{
"@id": "d1",
"@type": "Direction",
"source": "n2",
"target": "n4",
"order": 1,
"conditions": [
"c5","c2"
]
},
{
"@id": "d2",
"@type": "Direction",
"source": "n2",
"target": "n5",
"order": 2,
"conditions": [
"c2"
]
},
{
"@id": "d3",
"@type": "Direction",
"source": "n5",
"target": "n3",
"conditions": [
"c2","c5"
]
},
{
"@id": "d4",
"@type": "Direction",
"source": "n4",
"target": "n3",
"conditions": [
"c2","c5"
]
},
{
"@id": "i1",
"@type": "Interruption",
"forEvent": "laugh",
"conditions": [
"c1"
]
},
{
"@id": "i2",
"@type": "Interruption",
"forEvent": "laugh",
"conditions": [
"c1"
]
}
]