import json import logging import os import pyaudio import re import socket import os from threading import Thread from google.cloud.speech import enums from google.cloud.speech import types from google.auth.credentials import Credentials from google.cloud import speech import asyncio import threading import queue import uuid from hugvey.communication import LOG_BS import audioop import gc import time import string mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("google") # punctuationTranslation = str.maketrans('','',string.punctuation) # has ' etc., so don't use it! punctuationTranslation = str.maketrans('','','.,!?') class RequireRestart(Exception): pass class GoogleVoiceClient(object): def __init__(self, hugvey, src_rate, credential_file, language_code = "en_GB"): self.logger = mainLogger.getChild(f"{hugvey.id}").getChild('google') self.src_rate = src_rate self.hugvey = hugvey self.language_code = language_code os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_file # Create a thread-safe buffer of audio data self.buffer = queue.Queue() self.isRunning = threading.Event() self.isStarted = threading.Event() self.toBeShutdown = False self.target_rate = 16000 self.cv_laststate = None self.restart = False self.task = threading.Thread(target=self.run, name=f"hugvey#{self.hugvey.id}v") self.task.setDaemon(True) self.task.start() self.subsequentMutedFrames = 0 self.lastNonFinalTranscript = None def pause(self): self.isRunning.clear() self.restart = True def resume(self): self.buffer = queue.Queue() # have a clear queue when resuming self.isRunning.set() def start(self): """ Pause/resume are used within a story to create a break/restart in the recording eacht time hugvey speaks. however, this caused issues with the mics that stay on after a story finishes (it cause Google to resume). Hence, we add another layer to its running state: start/stop. This is not influenced by the resume and triggered by starting and stopping the story only. A bit messy but it should work ;-) """ self.logger.info("Start STT") self.resume() self.isStarted.set() def stop(self): self.logger.info("Stop STT") self.pause() self.isStarted.clear() def generator(self): self.logger.debug('start generator') while not self.toBeShutdown and self.isRunning.is_set(): try: # set a timeout, as not to wait infinitely for the buffer when # we actually want to restart yield self.buffer.get(timeout=.3) except queue.Empty as e: self.logger.debug('empty mic buffer - restart?') # print(self.isRunning.isSet()) self.logger.info('stop generator') self.restart = False # don't trigger double restart # raise RequireRestart("Restart required (generator)") def setLanguage(self, language_code): if self.language_code == language_code: return self.logger.info("Change language from {} to {}".format(self.language_code, language_code)) self.language_code = language_code self.isRunning.clear() self.restart = True def getOfficialLangCode(self): # October 2019: multiple versions for the French language exists, called fr-FR2 # Instead of rewriting the whole codebase to accept a different language id, we can also # just strip the numbers from the language code :-) return ''.join([i for i in self.language_code if not i.isdigit()]) def run(self): self.isRunning.set() # Leave this here to avoid "Too many files open" errors. self.speech_client = speech.SpeechClient() while not self.toBeShutdown: config = types.RecognitionConfig( encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self.src_rate, language_code=self.getOfficialLangCode()) self.streaming_config = types.StreamingRecognitionConfig( config=config, interim_results=True) try: self.logger.log(LOG_BS,"wait for Google Voice") if not self.isRunning.wait(timeout=1): continue # re-ceck toBeShutdown self.logger.debug(f"Starting Google Voice for {self.language_code}") audio_generator = self.generator() requests = (types.StreamingRecognizeRequest(audio_content=content) for content in audio_generator) responses = self.speech_client.streaming_recognize( self.streaming_config, requests) self.logger.debug("Starting voice loop") for response in responses: if not response.results: self.logger.debug('...') continue """Iterates through server responses and prints them. The responses passed is a generator that will block until a response is provided by the server. Each response may contain multiple results, and each result may contain multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we print only the transcription for the top alternative of the top result. In this case, responses are provided for interim results as well. If the response is an interim one, print a line feed at the end of it, to allow the next result to overwrite it, until the response is a final one. For the final one, print a newline to preserve the finalized transcription. """ # The `results` list is consecutive. For streaming, we only care about # the first result being considered, since once it's `is_final`, it # moves on to considering the next utterance. result = response.results[0] if not result.alternatives: continue # Display the transcription of the top alternative. transcript = result.alternatives[0].transcript # self.logger.debug("Text: ".format(transcript)) if not result.is_final: self.logger.debug(f"Text: {transcript}") self.lastNonFinalTranscript = transcript else: self.logger.info(f"Text: {transcript}") self.lastNonFinalTranscript = None # if any(p in transcript for p in string.punctuation): # self.logger.warning(f"Google returned punctuation in the string. It is removed: {transcript}") # remove any possible punctuation from string. As google has sometimes # given results _with_ punctuation, even though it should not? transcript = transcript.translate(punctuationTranslation) if result.is_final: self.logger.info("native final") msg = { "event": "speech", "is_final": result.is_final, "transcript": transcript.strip(), } self.hugvey.queueEvent(msg) if self.restart: self.restart = False raise RequireRestart("Restart required") if self.toBeShutdown: self.logger.warn("Stopping voice loop") break except RequireRestart as e: self.restart = False self.logger.warn("Restart Google Voice. Language: {}".format(self.language_code)) except Exception as e: if "305" in str(e): self.logger.warning(f"Long Google Voice: {e}") elif "TRANSIENT_FAILURE" in str(e): self.logger.critical(f"Crashed Google Voice: {e}") time.sleep(.3) self.logger.critical(f"Reload config, in an attempt/guess to fix it") config = types.RecognitionConfig( encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self.src_rate, language_code=self.getOfficialLangCode()) self.streaming_config = types.StreamingRecognitionConfig( config=config, interim_results=True) else: self.logger.critical(f"Crashed Google Voice: {e}") self.logger.exception(e) # sending an extra message is deprecated since we ignore finals anyway # make sure we always send a 'final' transcript. # if self.lastNonFinalTranscript is not None: # msg = { # "event": "speech", # "is_final": True, # "transcript": self.lastNonFinalTranscript.strip(), # } # self.hugvey.queueEvent(msg) self.logger.warn("Stop google run()") # finish means wrapping of hugvey#3v thread # time.sleep(1) # for i in gc.get_referrers(self): # print(i) def receive(self, chunk): if not self.task.isAlive(): raise Exception("Voice thread died") if audioop.max(chunk, 2) == 0: # mic is muted on client side. self.subsequentMutedFrames += 1 # self.logger.debug("Muted") if self.subsequentMutedFrames > 4 and self.isRunning.is_set(): self.logger.info("Pause muted stream!") self.pause() return self.subsequentMutedFrames = 0 if not self.isStarted.is_set(): # Stopped state, so resume is not triggered return # self.logger.debug("We have mic!") if not self.isRunning.is_set(): self.logger.info("Resume voice") self.resume() if not self.isRunning.is_set(): # logger.log(LOG_BS, "Don't put to queue if google is paused") return if self.src_rate == self.target_rate: data = chunk else: data, self.cv_laststate = audioop.ratecv(chunk, 2, 1, self.src_rate, self.target_rate, self.cv_laststate) self.buffer.put_nowait(data) def shutdown(self): self.toBeShutdown = True self.hugvey = None def triggerStart(self): self.start() def __del__(self): self.logger.warn("Destroyed google object")