diff --git a/hugvey/central_command.py b/hugvey/central_command.py index 1fcdce2..39096e2 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -6,16 +6,18 @@ This server controls all hugveys and the processing of their narratives. It expo import asyncio import logging +from pandas.conftest import ip +import threading import yaml import zmq +from zmq.asyncio import Context from hugvey import panopticon -from hugvey.voice.streamer import AudioStreamer -from hugvey.voice.player import Player from hugvey.communication import getTopic, zmqSend, zmqReceive -from pandas.conftest import ip -from zmq.asyncio import Context -import threading +from hugvey.voice.google import GoogleVoiceClient +from hugvey.voice.player import Player +from hugvey.voice.streamer import AudioStreamer +import uuid logger = logging.getLogger("command") @@ -29,6 +31,7 @@ class CentralCommand(object): self.isRunning = threading.Event() self.hugveys = {} self.ctx = Context.instance() + self.hugveyLock = asyncio.Lock() def loadConfig(self, filename): @@ -69,7 +72,7 @@ class CentralCommand(object): s.close() - def instantiateHugvey(self, hugvey_id, msg): + async def instantiateHugvey(self, hugvey_id, msg): ''' Start a HugveyState, according to a show_yourself reply @@ -78,13 +81,20 @@ class CentralCommand(object): 'host': socket.gethostname(), 'ip': self.getIp(), ''' -# def startHugvey(): - h = HugveyState(hugvey_id, self) - h.config(msg['host'],msg['ip']) + async with self.hugveyLock: # lock to prevent duplicates on creation + if not hugvey_id in self.hugveys: + logger.info(f'Instantiate hugvey #{hugvey_id}') + h = HugveyState(hugvey_id, self) + h.config(msg['host'],msg['ip']) + self.hugveys[hugvey_id] = h + thread = threading.Thread(target=h.start, name=f"hugvey#{hugvey_id}") + thread.start() + else: + logger.info(f'Reconfigure hugvey #{hugvey_id}') + # (re)configure exisitng hugveys + h.config(msg['host'],msg['ip']) + - thread = threading.Thread(target=h.start) - thread.start() -# self.tasks['hv{}'.format(hugvey_id)] = self.loop.create_task(h.start()) async def eventListener(self): s = self.ctx.socket(zmq.SUB) @@ -103,7 +113,7 @@ class CentralCommand(object): elif hugvey_id not in self.hugveys: if msg['event'] == 'connection': # Create a hugvey - self.instantiateHugvey(hugvey_id, msg) + await self.instantiateHugvey(hugvey_id, msg) else: logger.warning("Message from uninstantiated Hugvey {}".format(hugvey_id)) logger.debug("Message contains: {}".format(msg)) @@ -135,20 +145,27 @@ class HugveyState(object): self.command = command self.logger = logging.getLogger(f"hugvey{self.id}") self.loop = asyncio.new_event_loop() + self.isConfigured = False def config(self, hostname, ip): self.ip = ip self.hostname = hostname self.logger.info(f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") + + if self.isConfigured == True: + # a reconfiguration/reconnection + pass + + self.isConfigured = True def start(self): # stop on isRunning.is_set() or wait() -# self.loop.create_task(self.startAudio()) - tasks = asyncio.gather(self.startAudio(), loop=self.loop) +# self.loop.create_task(self.startAudioProcessing()) + tasks = asyncio.gather(self.startAudioProcessing(), loop=self.loop) self.loop.run_until_complete(tasks) # asyncio.run_coroutine_threadsafe(self._start(), self.loop) - async def startAudio(self): + async def startAudioProcessing(self): ''' Start the audio streamer service ''' @@ -163,4 +180,13 @@ class HugveyState(object): player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) streamer.addConsumer(player) + self.logger.info("Start Speech") + google = GoogleVoiceClient( + hugvey_id=self.id, + src_rate=self.command.config['voice']['src_rate'], + credential_file=self.command.config['voice']['google_credentials'], + language_code='en-US' + ) + streamer.addConsumer(google) + await streamer.run() diff --git a/hugvey/voice/google.py b/hugvey/voice/google.py index e69de29..883156f 100644 --- a/hugvey/voice/google.py +++ b/hugvey/voice/google.py @@ -0,0 +1,121 @@ +import json +import logging +import os +import pyaudio +import re +import socket +import os +from threading import Thread + +from google.cloud.speech import enums +from google.cloud.speech import types +from google.auth.credentials import Credentials + +from google.cloud import speech +import asyncio +import threading +import queue +import uuid + + +logger = logging.getLogger("voice") + + +class GoogleVoiceClient(object): + def __init__(self, hugvey_id, src_rate, credential_file, language_code = "en_GB"): + self.src_rate = src_rate + self.language_code = language_code + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file + + # Create a thread-safe buffer of audio data + self.buffer = queue.Queue() + self.isRunning = False + self.target_rate = 16000 + self.cv_laststate = None + + self.task = threading.Thread(target=self.run, name=f"voice-hv{hugvey_id}" + str(uuid.uuid1())) + self.task.setDaemon(True) + self.task.start() + + def generator(self): + while self.isRunning: + yield self.buffer.get() + + def run(self): + self.isRunning = True + + while self.isRunning: + try: + self.speech_client = speech.SpeechClient() + config = types.RecognitionConfig( + encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, + sample_rate_hertz=self.src_rate, + language_code=self.language_code) + self.streaming_config = types.StreamingRecognitionConfig( + config=config, + interim_results=True) + + audio_generator = self.generator() + requests = (types.StreamingRecognizeRequest(audio_content=content) + for content in audio_generator) + responses = self.speech_client.streaming_recognize( + self.streaming_config, requests) + + logger.info("Starting voice loop") + for response in responses: + if not response.results: + continue + """Iterates through server responses and prints them. + + The responses passed is a generator that will block until a response + is provided by the server. + + Each response may contain multiple results, and each result may contain + multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we + print only the transcription for the top alternative of the top result. + + In this case, responses are provided for interim results as well. If the + response is an interim one, print a line feed at the end of it, to allow + the next result to overwrite it, until the response is a final one. For the + final one, print a newline to preserve the finalized transcription. + """ + + # The `results` list is consecutive. For streaming, we only care about + # the first result being considered, since once it's `is_final`, it + # moves on to considering the next utterance. + result = response.results[0] + if not result.alternatives: + continue + + # Display the transcription of the top alternative. + transcript = result.alternatives[0].transcript + + # logger.debug("Text: ".format(transcript)) + + if not result.is_final: + logger.debug(f"Text: {transcript}") + else: + logger.info(f"Text: {transcript}") + + if not self.isRunning: + logger.warn("Stopping voice loop") + break + except Exception as e: + logger.critical(f"Crashed Google Voice: {e}") + + + def receive(self, chunk): + if not self.task.isAlive(): + raise Exception("Voice thread died") + + if self.src_rate == self.target_rate: + data = chunk + else: + data, self.cv_laststate = audioop.ratecv(chunk, 2, 1, self.src_rate, self.target_rate, self.cv_laststate) + self.buffer.put_nowait(data) + + def shutdown(self): + self.isRunning = False + + + diff --git a/server_config.yml b/server_config.yml index e099b51..1f45314 100644 --- a/server_config.yml +++ b/server_config.yml @@ -6,4 +6,5 @@ voice: out_rate: 44100 port: 4444 chunk: 2972 + google_credentials: "/home/ruben/Documents/Projecten/2018/Hugvey/test_googlespeech/My First Project-0c7833e0d5fa.json" hugveys: 3 \ No newline at end of file