264 lines
9.7 KiB
Python
264 lines
9.7 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import pyaudio
|
|
import re
|
|
import socket
|
|
import os
|
|
from threading import Thread
|
|
|
|
from google.cloud.speech import enums
|
|
from google.cloud.speech import types
|
|
from google.auth.credentials import Credentials
|
|
|
|
from google.cloud import speech
|
|
import asyncio
|
|
import threading
|
|
import queue
|
|
import uuid
|
|
from hugvey.communication import LOG_BS
|
|
import audioop
|
|
import gc
|
|
|
|
mainLogger = logging.getLogger("hugvey")
|
|
logger = mainLogger.getChild("google")
|
|
|
|
class RequireRestart(Exception):
|
|
pass
|
|
|
|
class GoogleVoiceClient(object):
|
|
def __init__(self, hugvey, src_rate, credential_file, language_code = "en_GB"):
|
|
self.logger = mainLogger.getChild(f"{hugvey.id}").getChild('google')
|
|
self.src_rate = src_rate
|
|
self.hugvey = hugvey
|
|
self.language_code = language_code
|
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file
|
|
|
|
# Create a thread-safe buffer of audio data
|
|
self.buffer = queue.Queue()
|
|
self.isRunning = threading.Event()
|
|
self.isStarted = threading.Event()
|
|
self.toBeShutdown = False
|
|
self.target_rate = 16000
|
|
self.cv_laststate = None
|
|
self.restart = False
|
|
|
|
|
|
self.task = threading.Thread(target=self.run, name=f"hugvey#{self.hugvey.id}v")
|
|
self.task.setDaemon(True)
|
|
self.task.start()
|
|
self.subsequentMutedFrames = 0
|
|
|
|
self.lastNonFinalTranscript = None
|
|
|
|
def pause(self):
|
|
self.isRunning.clear()
|
|
self.restart = True
|
|
|
|
def resume(self):
|
|
self.buffer = queue.Queue() # have a clear queue when resuming
|
|
self.isRunning.set()
|
|
|
|
def start(self):
|
|
"""
|
|
Pause/resume are used within a story to create a break/restart in the recording
|
|
eacht time hugvey speaks. however, this caused issues with the mics that stay on after
|
|
a story finishes (it cause Google to resume). Hence, we add another layer to its running
|
|
state: start/stop. This is not influenced by the resume and triggered by starting and
|
|
stopping the story only.
|
|
A bit messy but it should work ;-)
|
|
"""
|
|
self.logger.info("Start STT")
|
|
self.resume()
|
|
self.isStarted.set()
|
|
|
|
|
|
def stop(self):
|
|
self.logger.info("Stop STT")
|
|
self.pause()
|
|
self.isStarted.clear()
|
|
|
|
def generator(self):
|
|
self.logger.debug('start generator')
|
|
while not self.toBeShutdown and self.isRunning.is_set():
|
|
try:
|
|
# set a timeout, as not to wait infinitely for the buffer when
|
|
# we actually want to restart
|
|
yield self.buffer.get(timeout=.3)
|
|
except queue.Empty as e:
|
|
self.logger.debug('empty mic buffer - restart?')
|
|
# print(self.isRunning.isSet())
|
|
self.logger.info('stop generator')
|
|
self.restart = False # don't trigger double restart
|
|
|
|
# raise RequireRestart("Restart required (generator)")
|
|
|
|
def setLanguage(self, language_code):
|
|
if self.language_code == language_code:
|
|
return
|
|
|
|
self.logger.info("Change language from {} to {}".format(self.language_code, language_code))
|
|
self.language_code = language_code
|
|
self.isRunning.clear()
|
|
self.restart = True
|
|
|
|
def getOfficialLangCode(self):
|
|
# October 2019: multiple versions for the French language exists, called fr-FR2
|
|
# Instead of rewriting the whole codebase to accept a different language id, we can also
|
|
# just strip the numbers from the language code :-)
|
|
return ''.join([i for i in self.language_code if not i.isdigit()])
|
|
|
|
def run(self):
|
|
self.isRunning.set()
|
|
|
|
# Leave this here to avoid "Too many files open" errors.
|
|
self.speech_client = speech.SpeechClient()
|
|
|
|
while not self.toBeShutdown:
|
|
config = types.RecognitionConfig(
|
|
encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
|
|
sample_rate_hertz=self.src_rate,
|
|
language_code=self.getOfficialLangCode())
|
|
self.streaming_config = types.StreamingRecognitionConfig(
|
|
config=config,
|
|
interim_results=True)
|
|
|
|
try:
|
|
self.logger.log(LOG_BS,"wait for Google Voice")
|
|
if not self.isRunning.wait(timeout=1):
|
|
continue # re-ceck toBeShutdown
|
|
self.logger.debug("Starting Google Voice")
|
|
|
|
|
|
audio_generator = self.generator()
|
|
requests = (types.StreamingRecognizeRequest(audio_content=content)
|
|
for content in audio_generator)
|
|
responses = self.speech_client.streaming_recognize(
|
|
self.streaming_config, requests)
|
|
|
|
self.logger.debug("Starting voice loop")
|
|
for response in responses:
|
|
if not response.results:
|
|
self.logger.debug('...')
|
|
continue
|
|
"""Iterates through server responses and prints them.
|
|
|
|
The responses passed is a generator that will block until a response
|
|
is provided by the server.
|
|
|
|
Each response may contain multiple results, and each result may contain
|
|
multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
|
|
print only the transcription for the top alternative of the top result.
|
|
|
|
In this case, responses are provided for interim results as well. If the
|
|
response is an interim one, print a line feed at the end of it, to allow
|
|
the next result to overwrite it, until the response is a final one. For the
|
|
final one, print a newline to preserve the finalized transcription.
|
|
"""
|
|
|
|
# The `results` list is consecutive. For streaming, we only care about
|
|
# the first result being considered, since once it's `is_final`, it
|
|
# moves on to considering the next utterance.
|
|
result = response.results[0]
|
|
if not result.alternatives:
|
|
continue
|
|
|
|
# Display the transcription of the top alternative.
|
|
transcript = result.alternatives[0].transcript
|
|
|
|
# self.logger.debug("Text: ".format(transcript))
|
|
|
|
if not result.is_final:
|
|
self.logger.debug(f"Text: {transcript}")
|
|
self.lastNonFinalTranscript = transcript
|
|
else:
|
|
self.logger.info(f"Text: {transcript}")
|
|
self.lastNonFinalTranscript = None
|
|
|
|
if result.is_final:
|
|
self.logger.info("native final")
|
|
msg = {
|
|
"event": "speech",
|
|
"is_final": result.is_final,
|
|
"transcript": transcript.strip(),
|
|
}
|
|
|
|
self.hugvey.queueEvent(msg)
|
|
|
|
if self.restart:
|
|
self.restart = False
|
|
raise RequireRestart("Restart required")
|
|
|
|
if self.toBeShutdown:
|
|
self.logger.warn("Stopping voice loop")
|
|
break
|
|
except RequireRestart as e:
|
|
self.restart = False
|
|
self.logger.warn("Restart Google Voice. Language: {}".format(self.language_code))
|
|
except Exception as e:
|
|
if "305" in str(e):
|
|
self.logger.warning(f"Long Google Voice: {e}")
|
|
else:
|
|
self.logger.critical(f"Crashed Google Voice: {e}")
|
|
|
|
# sending an extra message is deprecated since we ignore finals anyway
|
|
# make sure we always send a 'final' transcript.
|
|
# if self.lastNonFinalTranscript is not None:
|
|
# msg = {
|
|
# "event": "speech",
|
|
# "is_final": True,
|
|
# "transcript": self.lastNonFinalTranscript.strip(),
|
|
# }
|
|
# self.hugvey.queueEvent(msg)
|
|
|
|
self.logger.warn("Stop google run()") # finish means wrapping of hugvey#3v thread
|
|
|
|
# time.sleep(1)
|
|
# for i in gc.get_referrers(self):
|
|
# print(i)
|
|
|
|
|
|
def receive(self, chunk):
|
|
if not self.task.isAlive():
|
|
raise Exception("Voice thread died")
|
|
|
|
|
|
if audioop.max(chunk, 2) == 0:
|
|
# mic is muted on client side.
|
|
self.subsequentMutedFrames += 1
|
|
# self.logger.debug("Muted")
|
|
if self.subsequentMutedFrames > 4 and self.isRunning.is_set():
|
|
self.logger.info("Pause muted stream!")
|
|
self.pause()
|
|
return
|
|
|
|
self.subsequentMutedFrames = 0
|
|
|
|
if not self.isStarted.is_set():
|
|
# Stopped state, so resume is not triggered
|
|
return
|
|
|
|
# self.logger.debug("We have mic!")
|
|
if not self.isRunning.is_set():
|
|
self.logger.info("Resume voice")
|
|
self.resume()
|
|
|
|
if not self.isRunning.is_set():
|
|
# logger.log(LOG_BS, "Don't put to queue if google is paused")
|
|
return
|
|
|
|
if self.src_rate == self.target_rate:
|
|
data = chunk
|
|
else:
|
|
data, self.cv_laststate = audioop.ratecv(chunk, 2, 1, self.src_rate, self.target_rate, self.cv_laststate)
|
|
self.buffer.put_nowait(data)
|
|
|
|
def shutdown(self):
|
|
self.toBeShutdown = True
|
|
self.hugvey = None
|
|
|
|
def triggerStart(self):
|
|
self.start()
|
|
|
|
def __del__(self):
|
|
self.logger.warn("Destroyed google object")
|