hugvey/hugvey/voice/google.py

160 lines
5.8 KiB
Python

import json
import logging
import os
import pyaudio
import re
import socket
import os
from threading import Thread
from google.cloud.speech import enums
from google.cloud.speech import types
from google.auth.credentials import Credentials
from google.cloud import speech
import asyncio
import threading
import queue
import uuid
logger = logging.getLogger("voice")
class RequireRestart(Exception):
pass
class GoogleVoiceClient(object):
def __init__(self, hugvey, src_rate, credential_file, language_code = "en_GB"):
self.src_rate = src_rate
self.hugvey = hugvey
self.language_code = language_code
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file
# Create a thread-safe buffer of audio data
self.buffer = queue.Queue()
self.isRunning = threading.Event()
self.toBeShutdown = False
self.target_rate = 16000
self.cv_laststate = None
self.restart = False
self.task = threading.Thread(target=self.run, name=f"hugvey#{self.hugvey.id}v")
self.task.setDaemon(True)
self.task.start()
def pause(self):
self.isRunning.clear()
self.restart = True
def resume(self):
self.isRunning.set()
def generator(self):
while not self.toBeShutdown:
yield self.buffer.get()
def setLanguage(self, language_code):
if self.language_code == language_code:
return
logger.info("Change language from {} to {}".format(self.language_code, language_code))
self.language_code = language_code
self.restart = True
def run(self):
self.isRunning.set()
while not self.toBeShutdown:
try:
self.isRunning.wait()
self.speech_client = speech.SpeechClient()
config = types.RecognitionConfig(
encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=self.src_rate,
language_code=self.language_code)
self.streaming_config = types.StreamingRecognitionConfig(
config=config,
interim_results=True)
audio_generator = self.generator()
requests = (types.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator)
responses = self.speech_client.streaming_recognize(
self.streaming_config, requests)
logger.info("Starting voice loop")
for response in responses:
if not response.results:
continue
"""Iterates through server responses and prints them.
The responses passed is a generator that will block until a response
is provided by the server.
Each response may contain multiple results, and each result may contain
multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
print only the transcription for the top alternative of the top result.
In this case, responses are provided for interim results as well. If the
response is an interim one, print a line feed at the end of it, to allow
the next result to overwrite it, until the response is a final one. For the
final one, print a newline to preserve the finalized transcription.
"""
# The `results` list is consecutive. For streaming, we only care about
# the first result being considered, since once it's `is_final`, it
# moves on to considering the next utterance.
result = response.results[0]
if not result.alternatives:
continue
# Display the transcription of the top alternative.
transcript = result.alternatives[0].transcript
# logger.debug("Text: ".format(transcript))
if not result.is_final:
logger.debug(f"Text: {transcript}")
else:
logger.info(f"Text: {transcript}")
msg = {
"event": "speech",
"is_final": result.is_final,
"transcript": transcript.strip(),
}
self.hugvey.queueEvent(msg)
if self.restart:
self.restart = False
raise RequireRestart("Restart required")
if self.toBeShutdown:
logger.warn("Stopping voice loop")
break
except RequireRestart as e:
logger.warn("Restart Google Voice. Language: {}".format(self.language_code))
except Exception as e:
logger.critical(f"Crashed Google Voice: {e}")
def receive(self, chunk):
if not self.task.isAlive():
raise Exception("Voice thread died")
if self.src_rate == self.target_rate:
data = chunk
else:
data, self.cv_laststate = audioop.ratecv(chunk, 2, 1, self.src_rate, self.target_rate, self.cv_laststate)
self.buffer.put_nowait(data)
def shutdown(self):
self.toBeShutdown = True