From 79424ccfbf4ed0cb9b9447c79b56b2a4965507be Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Mon, 18 Feb 2019 20:38:54 +0100 Subject: [PATCH] Fetch voice from Lyrebird --- hugvey/central_command.py | 19 ++++++++-- hugvey/communication.py | 1 + hugvey/panopticon.py | 27 +++++++++++++ hugvey/story.py | 63 +++++++++++++++++++----------- hugvey/voice.py | 80 +++++++++++++++++++++++++++++++++++++++ server_config.yml | 1 + www/js/hugvey_console.js | 29 +++++++++----- 7 files changed, 186 insertions(+), 34 deletions(-) create mode 100644 hugvey/voice.py diff --git a/hugvey/central_command.py b/hugvey/central_command.py index 7f8e1dd..5d1b32a 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -20,6 +20,7 @@ import json import logging import queue import threading +from hugvey.voice import VoiceStorage logger = logging.getLogger("command") @@ -73,6 +74,7 @@ class CentralCommand(object): self.loadLanguages() + self.panopticon = Panopticon(self, self.config) def loadLanguages(self): @@ -122,6 +124,7 @@ class CentralCommand(object): """ prepare command to be picked up by the sender """ + logging.debug(f"COmmand {hv_id}: {msg}") if threading.current_thread().getName() != 'MainThread': # Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse) # won't trigger asyncios queue.get() so we have to do this thread @@ -226,9 +229,9 @@ class CentralCommand(object): self.tasks = {} # collect tasks so we can cancel in case of error self.tasks['eventListener'] = self.loop.create_task( - self.eventListener()) + self.catchException(self.eventListener())) self.tasks['commandSender'] = self.loop.create_task( - self.commandSender()) + self.catchException(self.commandSender())) # we want the web interface in a separate thread self.panopticon_thread = threading.Thread( @@ -239,6 +242,14 @@ class CentralCommand(object): def stop(self): self.isRunning.clear() + + async def catchException(self, awaitable): + try: + print(awaitable) + await awaitable + except Exception as e: + logger.exception(e) + logger.critical(f"Hugvey restart might be required but not implemented yet") class HugveyState(object): @@ -309,6 +320,7 @@ class HugveyState(object): async def catchException(self, awaitable): try: + print(awaitable) await awaitable except Exception as e: logger.exception(e) @@ -436,7 +448,8 @@ class HugveyState(object): await self.isRunning.wait() # new story instance on each run - self.story = Story(self) + port = self.command.config['web']['port'] + self.story = Story(self, port) startMsgId = self.startMsgId self.startMsgId = None # use only once, reset before 'run' self.logger.warn(f"Starting from {startMsgId}") diff --git a/hugvey/communication.py b/hugvey/communication.py index e6776da..968f374 100644 --- a/hugvey/communication.py +++ b/hugvey/communication.py @@ -11,6 +11,7 @@ def getTopic(hugvey_id): def zmqSend(socket, hugvey_id, msg): + logger.info("SEND: {}".format(msg)) msgData = json.dumps(msg) topic = getTopic(hugvey_id) logger.info("Send 0mq to {} containing {}".format(topic, msg)) diff --git a/hugvey/panopticon.py b/hugvey/panopticon.py index 1e8c0ef..4f15e7b 100644 --- a/hugvey/panopticon.py +++ b/hugvey/panopticon.py @@ -16,6 +16,7 @@ import asyncio import json from urllib.parse import urlparse from hugvey import central_command +from hugvey.voice import VoiceStorage logger = logging.getLogger("panopticon") @@ -165,15 +166,41 @@ def getUploadHandler(central_command): return UploadHandler +def getVoiceHandler(voiceStorage): + class VoiceHandler(tornado.web.RequestHandler): + async def get(self): + # TODO: we should be using ZMQ here... + text = self.get_argument('text') + isVariable = True if int(self.get_argument('variable')) >0 else False + fn = await voiceStorage.requestFile(text, isVariable) + if not fn: + raise Exception(f"No Filename for text: {text}") + + if int(self.get_argument('filename')) == 1: + self.set_header("Content-Type","text/plain") + self.write(fn) + else: + self.set_header("Content-Type","audio/wave") + with open(fn, 'rb') as fp: + self.write(fp.read()) + self.finish() + return VoiceHandler + + class Panopticon(object): def __init__(self, central_command, config): self.command = central_command self.config = config + + voice_dir = os.path.join(self.config['web']['files_dir'], 'voices') + self.voiceStorage = VoiceStorage(voice_dir, self.config['voice']['token']) + self.application = tornado.web.Application([ (r"/ws", getWebSocketHandler(self.command)), (r"/local/(.*)", NonCachingStaticFileHandler, {"path": config['web']['files_dir']}), (r"/upload", getUploadHandler(self.command)), + (r"/voice", getVoiceHandler(self.voiceStorage)), (r"/(.*)", tornado.web.StaticFileHandler, {"path": web_dir, "default_filename": 'index.html'}), ], debug=True) diff --git a/hugvey/story.py b/hugvey/story.py index cb0787c..e431881 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -3,7 +3,9 @@ import time import logging import re import asyncio +import urllib.parse from .communication import LOG_BS +from tornado.httpclient import AsyncHTTPClient, HTTPRequest logger = logging.getLogger("narrative") @@ -54,6 +56,9 @@ class Message(object): """ self.variables = re.findall('\$(\w+)', self.text) + def hasVariables(self) -> bool: + return len(self.variables) > 0 + def setReply(self, reply): self.reply = reply @@ -82,6 +87,25 @@ class Message(object): 'time': None if self.reply is None else [u.startTime for u in self.reply.utterances], 'replyText': None if self.reply is None else [u.text for u in self.reply.utterances] } + + async def getAudioFilePath(self,story): + client = AsyncHTTPClient() + queryString = urllib.parse.urlencode({ + 'text': self.text, + 'filename': 1, + 'variable': 1 if self.hasVariables() else 0 + }) + request = HTTPRequest( + url = f"http://localhost:{story.panopticon_port}/voice?{queryString}", + method="GET" + ) + logger.log(LOG_BS, request.url) + response = await client.fetch(request) + if response.code != 200: + logger.critical(f"Error when fetching filename: {response.code} for {queryString}") + return None + + return response.body.decode().strip() class Reply(object): @@ -363,9 +387,10 @@ class Story(object): # TODO should we separate 'narrative' (the graph) from the story (the # current user flow) - def __init__(self, hugvey_state): + def __init__(self, hugvey_state, panopticon_port): super(Story, self).__init__() self.hugvey = hugvey_state + self.panopticon_port = panopticon_port self.events = [] # queue of received events self.commands = [] # queue of commands to send @@ -485,7 +510,7 @@ class Story(object): if self.isRunning: self.isRunning = False - def _processPendingEvents(self): + async def _processPendingEvents(self): # Gather events: nr = len(self.events) for i in range(nr): @@ -497,7 +522,7 @@ class Story(object): # a client connected. Should only happen in the beginning or in case of error # that is, until we have a 'reset' or 'start' event. # reinitiate current message - self.setCurrentMessage(self.currentMessage) + await self.setCurrentMessage(self.currentMessage) if e['event'] == "playbackFinish": if e['msgId'] == self.currentMessage.id: @@ -518,7 +543,7 @@ class Story(object): logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id)) self.currentReply = self.previousReply self.previousReply.forMessage.interruptCount += 1 - self.currentMessage = self.setCurrentMessage(self.previousReply.forMessage) + self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage) # log if somebody starts speaking # TODO: implement interrupt @@ -532,7 +557,7 @@ class Story(object): utterance.setFinished(self.timer.getElapsed()) - def _processDirections(self, directions): + async def _processDirections(self, directions): for direction in directions: for condition in direction.conditions: if condition.isMet(self): @@ -542,7 +567,7 @@ class Story(object): self.addToLog(condition) self.addToLog(direction) self.currentMessage.setFinished(self.timer.getElapsed()) - self.setCurrentMessage(direction.msgTo) + await self.setCurrentMessage(direction.msgTo) return direction def addToLog(self, node): @@ -563,13 +588,13 @@ class Story(object): await self.timer.isRunning.wait() # wait for un-pause for i in range(len(self.events)): - self._processPendingEvents() + await self._processPendingEvents() if self.currentMessage.id not in self.directionsPerMsg: self.finish() directions = self.getCurrentDirections() - self._processDirections(directions) + await self._processDirections(directions) # TODO create timer event # self.commands.append({'msg':'TEST!'}) @@ -581,7 +606,7 @@ class Story(object): logger.info("Stop renderer") - def setCurrentMessage(self, message): + async def setCurrentMessage(self, message): if self.currentMessage and not self.lastMsgFinishTime: logger.info("Interrupt playback {}".format(self.currentMessage.id)) # message is playing @@ -606,18 +631,12 @@ class Story(object): message.id, message.text)) self.addToLog(message) # TODO: prep events & timer etc. - if message.audioFile: - self.hugvey.sendCommand({ - 'action': 'play', - 'file': message.audioFile, - 'id': message.id, - }) - else: - self.hugvey.sendCommand({ - 'action': 'play', - 'msg': message.text, - 'id': message.id, - }) + # TODO: preload file paths if no variables are set, or once these are loaded + self.hugvey.sendCommand({ + 'action': 'play', + 'file': await message.getAudioFilePath(self), + 'id': message.id, + }) logger.debug("Pending directions: ") @@ -640,7 +659,7 @@ class Story(object): startMsg = self.get(customStartMsgId) else: startMsg = self.startMessage - self.setCurrentMessage(startMsg) + await self.setCurrentMessage(startMsg) await self._renderer() def isFinished(self): diff --git a/hugvey/voice.py b/hugvey/voice.py new file mode 100644 index 0000000..aef8dbc --- /dev/null +++ b/hugvey/voice.py @@ -0,0 +1,80 @@ +import os +import time +import json +import logging +import threading +from requests_threads import AsyncSession +from hashlib import sha1 +import asyncio +from tornado.httpclient import AsyncHTTPClient, HTTPRequest + +logger = logging.getLogger("voice") + +class VoiceStorage(object): + """ + Store & keep voices that are not part of the story json + """ + def __init__(self, cache_dir, token): + self.cache_dir = cache_dir + if not os.path.exists(self.cache_dir): + raise Exception(f"Cache dir does not exists: {self.cache_dir}") +# self.request_session = AsyncSession(n=5) + self.pendingRequests = {} + self.token = token + + def getId(self, text): + return sha1(text.encode()).hexdigest() + + def getFilename(self, text, isVariable=False): + subdir = 'static' if not isVariable else 'variable' + id = self.getId(text) + prefix = id[:2] + storageDir = os.path.join(self.cache_dir, subdir, prefix) + fn = os.path.join(storageDir, f"{id}.wav") + return fn + + async def requestFile(self, text, isVariable=False) -> str: + id = self.getId(text) + fn = self.getFilename(text) + + if os.path.exists(fn): + return fn + + if id in self.pendingRequests: + await self.pendingRequests[id].wait() + if os.path.exists(fn): + return fn + return None + + dirname = os.path.dirname(fn) + if not os.path.exists(dirname): + logger.debug(f"create directory for file: {dirname}") + os.makedirs(dirname, exist_ok=True) + + self.pendingRequests[id] = asyncio.Event() + + http_client = AsyncHTTPClient() + request = HTTPRequest( + method="POST", + url="https://avatar.lyrebird.ai/api/v0/generate", + body=json.dumps({"text": text}), + headers={"authorization": f"Bearer {self.token}"} + ) + try: + response = await http_client.fetch(request) + except Exception as e: + logger.exception(e) + self.pendingRequests[id].set() + return None + else: + if response.code != 200: + logger.critical(f"No proper response! {response.code}") + self.pendingRequests[id].set() + return None + + logger.debug(f"Wrote body: {response.code}") + with open(fn, "wb") as f: + f.write(response.body) + self.pendingRequests[id].set() + print(type(fn), fn) + return fn diff --git a/server_config.yml b/server_config.yml index ff68099..8f3f027 100644 --- a/server_config.yml +++ b/server_config.yml @@ -7,6 +7,7 @@ voice: port: 4444 chunk: 2972 google_credentials: "../test_googlespeech/My First Project-0c7833e0d5fa.json" + token: "oauth_SOMETHING" hugveys: 25 languages: - code: en-GB diff --git a/www/js/hugvey_console.js b/www/js/hugvey_console.js index 61948e3..0ca4710 100644 --- a/www/js/hugvey_console.js +++ b/www/js/hugvey_console.js @@ -264,6 +264,11 @@ class Graph { // used eg. after a condition creation. this.showMsg( this.selectedMsg ); } + + getAudioUrlForMsg(msg) { + let isVariable = msg['text'].includes('$') ? '1' : '0'; + return `http://localhost:8888/voice?text=${encodeURIComponent(msg['text'])}&variable=${isVariable}&filename=0`; + } showMsg( msg ) { let msgEl = document.getElementById( 'msg' ); @@ -286,20 +291,19 @@ class Graph { if ( msg['start'] == true ) { startAttributes['checked'] = 'checked'; } + + let audioSrcEl = crel('source', {'src': this.getAudioUrlForMsg(msg)}); let audioSpan = crel( 'span', { 'title': msg['audio'] ? msg['audio']['file'] : "", 'class': "label-value", }, - msg['audio'] ? msg['audio']['original_name'] : "" + crel( + 'audio', {'controls': 'controls'}, + audioSrcEl + ) ); - if(msg['audio']) { - audioSpan.appendChild(crel( - 'audio', {'controls': 'controls'}, - crel('source', {'src':msg['audio']['file']}) - )); - } let msgInfoEl = crel( 'div', { 'class': 'msg__info' }, crel('div', { 'class':'btn btn--delete', @@ -319,7 +323,10 @@ class Graph { 'name': msg['@id'] + '-text', 'value': msg['text'], 'on': { - 'change': this.getEditEventListener() + 'change': this.getEditEventListener(function(){ + audioSrcEl.src = panopticon.graph.getAudioUrlForMsg(msg); + audioSrcEl.parentElement.load(); + }) } } ) ), @@ -855,7 +862,7 @@ class Graph { * Use wrapper method, because for event handlers 'this' will refer to * the input object */ - getEditEventListener() { + getEditEventListener(callback) { let graph = this; let el = function( e ) { console.info("Changed", e); @@ -880,6 +887,10 @@ class Graph { // node[field] = e.srcElement.value; graph.build(); + + if(typeof callback !== 'undefined'){ + callback(); + } } return el; }