Logging made fit for cutelog / client audio stream now on port = port_nr + hugvey_id'

This commit is contained in:
Ruben van de Ven 2019-03-23 18:18:52 +01:00
parent 3eb4c78ae4
commit d95709b9a1
10 changed files with 102 additions and 85 deletions

View File

@ -22,8 +22,8 @@ import queue
import threading
from hugvey.voice import VoiceStorage
logger = logging.getLogger("command")
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("command")
# def exceptionEmitter(a):
# print(a)
@ -159,7 +159,7 @@ class CentralCommand(object):
while self.isRunning.is_set():
hv_id, cmd = await self.commandQueue.get()
logger.info('Got command to send: {} {}'.format(hv_id, cmd))
logger.debug('Got command to send: {} {}'.format(hv_id, cmd))
zmqSend(s, hv_id, cmd)
logger.warn('Stopping command sender')
@ -191,7 +191,7 @@ class CentralCommand(object):
async def eventListener(self):
s = self.ctx.socket(zmq.SUB)
s.bind(self.config['events']['listen_address'])
logger.info("Listen for events on: {}".format(
logger.debug("Listen for events on: {}".format(
self.config['events']['listen_address']))
for id in self.hugvey_ids:
@ -262,10 +262,9 @@ class HugveyState(object):
STATE_RUNNING = "running"
def __init__(self, id: int, command: CentralCommand):
self.id = id
self.command = command
self.logger = logging.getLogger(f"hugvey{self.id}")
self.logger = mainLogger.getChild(f"{self.id}").getChild("command")
self.loop = asyncio.new_event_loop()
self.isConfigured = False
self.isRunning = asyncio.Event(loop=self.loop)
@ -323,8 +322,8 @@ class HugveyState(object):
print(awaitable)
await awaitable
except Exception as e:
logger.exception(e)
logger.critical(f"Hugvey restart required but not implemented yet")
self.logger.exception(e)
self.logger.critical(f"Hugvey restart required but not implemented yet")
# TODO: restart
@ -348,7 +347,7 @@ class HugveyState(object):
self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues
while self.command.isRunning.is_set():
event = await self.eventQueue.get()
self.logger.info("Received: {}".format(event))
self.logger.debug("Received: {}".format(event))
if event['event'] == 'connection' and not self.isRunning.is_set():
self.restart()
@ -462,7 +461,7 @@ class HugveyState(object):
self.streamer = AudioStreamer(
self.command.config['voice']['chunk'],
self.ip,
int(self.command.config['voice']['port']))
int(self.command.config['voice']['port']) + self.id)
if self.command.config['voyeur']:
self.logger.warn("Debug on: Connecting Audio player")
@ -470,7 +469,7 @@ class HugveyState(object):
self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
self.streamer.addConsumer(self.player)
self.logger.info("Start Speech")
self.logger.debug("Start Speech")
self.google = GoogleVoiceClient(
hugvey=self,
src_rate=self.command.config['voice']['src_rate'],
@ -485,13 +484,13 @@ class HugveyState(object):
Start the audio streamer service
'''
self.logger.info("Start audio stream")
self.logger.debug("Start audio stream")
while self.notShuttingDown:
await self.isRunning.wait()
self.logger.info("Start audio stream")
self.logger.debug("Start audio stream")
await self.getStreamer().run()
self.logger.warn(f"stream has left the building from {self.ip}")
self.logger.critical(f"stream has left the building from {self.ip}")
# if we end up here, the streamer finished, probably meaning hte hugvey shutdown
self.gone()

View File

@ -98,7 +98,7 @@ class VoiceServer(object):
while not self.stopped:
try:
address = "tcp://*:{}".format(self.voice_port)
address = "tcp://*:{}".format(self.voice_port + self.hugvey.id)
self.voice_socket = self.ctx.socket(zmq.PUB)
self.voice_socket.bind(address)

View File

@ -1,7 +1,8 @@
import json
import logging
logger = logging.getLogger("communication")
mainLogger = logging.getLogger("hugvey")
# hyper verbose log level. Have it here, becase it needs to be _somewhere_
LOG_BS = 5
@ -11,14 +12,17 @@ def getTopic(hugvey_id):
def zmqSend(socket, hugvey_id, msg):
logger.info("SEND: {}".format(msg))
log = mainLogger.getChild(f"{hugvey_id}").getChild("communication")
log.debug("SEND: {}".format(msg))
msgData = json.dumps(msg)
topic = getTopic(hugvey_id)
logger.info("Send 0mq to {} containing {}".format(topic, msg))
log.debug("Send 0mq to {} containing {}".format(topic, msg))
socket.send_multipart([topic.encode(), msgData.encode()])
async def zmqReceive(socket):
topic, msg = await socket.recv_multipart()
hugvey_id = topic.decode()[2:]
logger.info("Received 0mq messages for Hugvey #{} containing {}".format(hugvey_id, msg.decode()))
mainLogger.getChild(f"{hugvey_id}").getChild("communication").debug(
"Received 0mq messages for Hugvey #{} containing {}".format(hugvey_id, msg.decode())
)
return int(hugvey_id), json.loads(msg.decode())

View File

@ -18,7 +18,8 @@ from urllib.parse import urlparse
from hugvey import central_command
from hugvey.voice import VoiceStorage
logger = logging.getLogger("panopticon")
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("panopticon")
web_dir = os.path.join(os.path.split(__file__)[0], '..', 'www')

View File

@ -18,14 +18,15 @@ import queue
import uuid
from hugvey.communication import LOG_BS
logger = logging.getLogger("speech")
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("speech")
class RequireRestart(Exception):
pass
class GoogleVoiceClient(object):
def __init__(self, hugvey, src_rate, credential_file, language_code = "en_GB"):
self.logger = mainLogger.getChild(f"{hugvey.id}").getChild('speech')
self.src_rate = src_rate
self.hugvey = hugvey
self.language_code = language_code
@ -60,7 +61,7 @@ class GoogleVoiceClient(object):
if self.language_code == language_code:
return
logger.info("Change language from {} to {}".format(self.language_code, language_code))
self.logger.info("Change language from {} to {}".format(self.language_code, language_code))
self.language_code = language_code
self.restart = True
@ -71,7 +72,7 @@ class GoogleVoiceClient(object):
while not self.toBeShutdown:
try:
self.isRunning.wait()
logger.info("Starting Google Voice")
self.logger.info("Starting Google Voice")
self.speech_client = speech.SpeechClient()
config = types.RecognitionConfig(
@ -88,7 +89,7 @@ class GoogleVoiceClient(object):
responses = self.speech_client.streaming_recognize(
self.streaming_config, requests)
logger.info("Starting voice loop")
self.logger.info("Starting voice loop")
for response in responses:
if not response.results:
continue
@ -117,12 +118,12 @@ class GoogleVoiceClient(object):
# Display the transcription of the top alternative.
transcript = result.alternatives[0].transcript
# logger.debug("Text: ".format(transcript))
# self.logger.debug("Text: ".format(transcript))
if not result.is_final:
logger.debug(f"Text: {transcript}")
self.logger.debug(f"Text: {transcript}")
else:
logger.info(f"Text: {transcript}")
self.logger.info(f"Text: {transcript}")
msg = {
"event": "speech",
@ -137,12 +138,12 @@ class GoogleVoiceClient(object):
raise RequireRestart("Restart required")
if self.toBeShutdown:
logger.warn("Stopping voice loop")
self.logger.warn("Stopping voice loop")
break
except RequireRestart as e:
logger.warn("Restart Google Voice. Language: {}".format(self.language_code))
self.logger.warn("Restart Google Voice. Language: {}".format(self.language_code))
except Exception as e:
logger.critical(f"Crashed Google Voice: {e}")
self.logger.critical(f"Crashed Google Voice: {e}")
def receive(self, chunk):

View File

@ -2,8 +2,8 @@ import pyaudio
import logging
import audioop
logger = logging.getLogger("player")
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("player")
class Player:
"""

View File

@ -6,7 +6,8 @@ import logging
from zmq.asyncio import Context
import zmq
logger = logging.getLogger("streamer")
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("streamer")
class AudioStreamer(object):
def __init__(self, chunk, address: str, port: int):
@ -37,7 +38,7 @@ class AudioStreamer(object):
# s.connect((self.address, self.port))
#
try:
while self.isRunning:
while self.isRunning:
data = await self.socket.recv()
# logger.debug('chunk received')
self.process(data)

View File

@ -7,7 +7,8 @@ import urllib.parse
from .communication import LOG_BS
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
logger = logging.getLogger("narrative")
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("narrative")
class Utterance(object):
"""Part of a reply"""
@ -44,6 +45,7 @@ class Message(object):
def setStory(self, story):
self.story = story
self.logger = story.logger.getChild("message")
@classmethod
def initFromJson(message, data, story):
@ -70,7 +72,7 @@ class Message(object):
def setVariable(self, name, value):
if name not in self.variables:
logger.critical("Set nonexisting variable")
self.logger.critical("Set nonexisting variable")
return
if self.variableValues[name] == value:
@ -78,21 +80,21 @@ class Message(object):
self.variableValues[name] = value
logger.warn(f"Set variable, now fetch {name}")
self.story.warn(f"Set variable, now fetch {name}")
if not None in self.variableValues.values():
logger.warn(f"now fetch indeed {name}")
self.story.warn(f"now fetch indeed {name}")
asyncio.get_event_loop().create_task(self.getAudioFilePath())
# asyncio.get_event_loop().call_soon_threadsafe(self.getAudioFilePath)
logger.warn(f"started {name}")
self.story.warn(f"started {name}")
def getText(self):
# sort reverse to avoid replacing the wrong variable
self.variables.sort(key=len, reverse=True)
text = self.text
logger.info(f"Getting text for {self.id}")
logger.debug(self.variables)
self.logger.debug(f"Getting text for {self.id}")
self.logger.debug(self.variables)
for var in self.variables:
logger.debug(f"try replacing ${var} with {self.variableValues[var]} in {text}")
self.logger.debug(f"try replacing ${var} with {self.variableValues[var]} in {text}")
replacement = self.variableValues[var] if (self.variableValues[var] is not None) else "nothing" #TODO: translate nothing to each language
text = text.replace('$'+var, replacement)
return text
@ -134,7 +136,7 @@ class Message(object):
if self.audioFile is not None:
return self.audioFile
logger.warn(f"Fetching audio for {self.getText()}")
self.logger.debug(f"Fetching audio for {self.getText()}")
async with self.filenameFetchLock:
client = AsyncHTTPClient()
queryString = urllib.parse.urlencode({
@ -146,14 +148,14 @@ class Message(object):
url = f"http://localhost:{self.story.panopticon_port}/voice?{queryString}",
method="GET"
)
logger.log(LOG_BS, request.url)
self.logger.log(LOG_BS, request.url)
response = await client.fetch(request)
if response.code != 200:
logger.critical(f"Error when fetching filename: {response.code} for {queryString}")
self.logger.critical(f"Error when fetching filename: {response.code} for {queryString}")
return None
logger.warn(f"Fetched audio for {self.getText()}")
self.logger.debug(f"Fetched audio for {self.getText()}")
return response.body.decode().strip()
@ -242,11 +244,11 @@ class Condition(object):
if 'onlyIfNoReply' in self.vars and self.vars['onlyIfNoReply']:
if story.currentReply and story.currentReply is not None and story.currentReply.hasUtterances():
logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}')
story.logger.log(LOG_BS, f'Only if no reply has text! {story.currentReply.getText()}')
# 'onlyIfNoReply': only use this timeout if participants doesn't speak.
return False
# else:
# logger.debug('Only if no reply has no text yet!')
# story.logger.debug('Only if no reply has no text yet!')
hasMetTimeout = now - story.lastMsgFinishTime >= float(self.vars['seconds'])
if not hasMetTimeout:
@ -278,12 +280,12 @@ class Condition(object):
self.vars['regexCompiled'] = re.compile(self.vars['regex'])
t = r.getText().lower()
logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t))
story.logger.log(LOG_BS, 'attempt regex: {} on {}'.format(self.vars['regex'], t))
result = self.vars['regexCompiled'].search(t)
if result is None:
#if there is something to match, but not found, it's never ok
return False
logger.debug('Got match on {}'.format(self.vars['regex']))
story.logger.debug('Got match on {}'.format(self.vars['regex']))
if ('instantMatch' in self.vars and self.vars['instantMatch']) or not r.isSpeaking():
# try to avoid setting variables for intermediate strings
@ -292,19 +294,19 @@ class Condition(object):
story.setVariableValue(captureGroup, results[captureGroup])
if 'instantMatch' in self.vars and self.vars['instantMatch']:
logger.info(f"Instant match on {self.vars['regex']}, {self.vars}")
story.logger.info(f"Instant match on {self.vars['regex']}, {self.vars}")
return True
# TODO: implement 'instant match' -> don't wait for isFinished()
if r.isSpeaking():
logger.log(LOG_BS, f"is speaking: {r.getLastUtterance().text} - {r.getLastUtterance().startTime}")
story.logger.log(LOG_BS, f"is speaking: {r.getLastUtterance().text} - {r.getLastUtterance().startTime}")
return False
# print(self.vars)
# either there's a match, or nothing to match at all
if 'delays' in self.vars:
if story.lastMsgFinishTime is None:
logger.debug("not finished playback yet")
story.logger.debug("not finished playback yet")
return False
# time between finishing playback and ending of speaking:
replyDuration = r.getLastUtterance().endTime - story.lastMsgFinishTime
@ -312,12 +314,12 @@ class Condition(object):
for delay in delays:
if replyDuration > float(delay['minReplyDuration']):
timeSinceReply = story.timer.getElapsed() - r.getLastUtterance().endTime
logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {delay['waitTime']}")
story.logger.log(LOG_BS, f"check delay duration is now {replyDuration}, already waiting for {timeSinceReply}, have to wait {delay['waitTime']}")
if timeSinceReply > float(delay['waitTime']):
return True
break # don't check other delays
# wait for delay to match
logger.debug("Wait for it...")
story.logger.debug("Wait for it...")
return False
# There is a match and no delay say, person finished speaking. Go ahead sir!
@ -425,10 +427,10 @@ class Diversion(object):
story.stats['diversions']['no_response'] += 1
msg = story.get(self.params['msgId'])
if msg is None:
logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
return
logger.info(f"Diverge: No response {self.id} {story.stats}")
story.logger.info(f"Diverge: No response {self.id} {story.stats}")
self.returnMessage = msgTo
await story.setCurrentMessage(msg)
story.currentDiversion = self
@ -437,7 +439,7 @@ class Diversion(object):
return
async def _returnAfterNoResponse(self, story):
logger.info(f"Finalise diversion: {self.id}")
story.logger.info(f"Finalise diversion: {self.id}")
story.stats['consecutiveSilentTimeouts'] = 0 # reset counter after diverging
if self.params['returnAfterStrand']:
await story.setCurrentMessage(self.returnMessage)
@ -525,6 +527,7 @@ class Story(object):
self.events = [] # queue of received events
self.commands = [] # queue of commands to send
self.log = [] # all nodes/elements that are triggered
self.logger = mainLogger.getChild(f"{self.hugvey.id}").getChild("story")
self.currentMessage = None
self.currentDiversion = None
self.currentReply = None
@ -534,11 +537,11 @@ class Story(object):
self.variables = {}
def pause(self):
logger.debug('pause hugvey')
self.logger.debug('pause hugvey')
self.timer.pause()
def resume(self):
logger.debug('resume hugvey')
self.logger.debug('resume hugvey')
self.timer.resume()
def getLogSummary(self):
@ -559,7 +562,7 @@ class Story(object):
def setVariableValue(self, name, value):
if name not in self.variables:
logger.warn(f"Set variable that is not needed in the story: {name}")
self.logger.warn(f"Set variable that is not needed in the story: {name}")
self.variableValues[name] = value
for message in self.variables[name]:
@ -586,18 +589,18 @@ class Story(object):
obj = className.initFromJson(el, self)
self.add(obj)
logger.debug(self.elements)
logger.debug(self.directionsPerMsg)
self.logger.debug(self.elements)
self.logger.debug(self.directionsPerMsg)
self.diversions = [el for el in self.elements.values() if type(el) == Diversion]
if currentId:
self.currentMessage = self.get(currentId)
if self.currentMessage:
logger.info(
self.logger.info(
f"Reinstantiated current message: {self.currentMessage.id}")
else:
logger.warn(
self.logger.warn(
"Could not reinstatiate current message. Starting over")
# Register variables
@ -610,7 +613,7 @@ class Story(object):
self.registerVariable(var, msg)
logger.info(f'has variables: {self.variables}')
self.logger.info(f'has variables: {self.variables}')
def reset(self):
self.timer.reset()
@ -672,7 +675,7 @@ class Story(object):
return [el for el in self.elements.values() if type(el) == Message]
def stop(self):
logger.info("Stop Story")
self.logger.info("Stop Story")
if self.isRunning:
self.isRunning = False
@ -681,7 +684,7 @@ class Story(object):
nr = len(self.events)
for i in range(nr):
e = self.events.pop(0)
logger.info("handle '{}'".format(e))
self.logger.debug("handle '{}'".format(e))
if e['event'] == "exit":
self.stop()
if e['event'] == 'connect':
@ -697,16 +700,16 @@ class Story(object):
# 2019-02-22 temporary disable listening while playing audio:
# if self.hugvey.google is not None:
# logger.warn("Temporary 'fix' -> resume recording?")
# self.logger.warn("Temporary 'fix' -> resume recording?")
# self.hugvey.google.resume()
if self.currentMessage.id not in self.directionsPerMsg:
if self.currentDiversion is not None:
logger.info("end of diversion")
self.logger.info("end of diversion")
await self.currentDiversion.finalise(self)
self.currentDiversion = None
else:
logger.info("THE END!")
self.logger.info("THE END!")
self.stop()
return
@ -719,7 +722,7 @@ class Story(object):
timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime()
if self.previousReply.forMessage.afterrunTime > timeDiff:
#interrupt only in given interval:
logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id))
self.logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id))
self.currentReply = self.previousReply
self.previousReply.forMessage.interruptCount += 1
self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage, self.previousReply)
@ -727,6 +730,7 @@ class Story(object):
# log if somebody starts speaking
# TODO: implement interrupt
if self.currentReply is None:
self.logger.info("Start speaking")
self.currentReply= Reply(self.currentMessage)
utterance = self.currentReply.getActiveUtterance(self.timer.getElapsed())
@ -741,7 +745,7 @@ class Story(object):
for direction in directions:
for condition in direction.conditions:
if condition.isMet(self):
logger.info("Condition is met: {0}, going to {1}".format(
self.logger.info("Condition is met: {0}, going to {1}".format(
condition.id, direction.msgTo.id))
direction.setMetCondition(condition)
self.addToLog(condition)
@ -772,7 +776,7 @@ class Story(object):
"""
loopDuration = 0.1 # Configure fps
lastTime = time.time()
logger.info("Start renderer")
self.logger.debug("Start renderer")
while self.isRunning:
if self.isRunning is False:
break
@ -797,14 +801,14 @@ class Story(object):
await asyncio.sleep(max(0, loopDuration - (t - lastTime)))
lastTime = t
logger.info("Stop renderer")
self.logger.debug("Stop renderer")
async def setCurrentMessage(self, message, useReply = None):
"""
Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption.
"""
if self.currentMessage and not self.lastMsgFinishTime:
logger.info("Interrupt playback {}".format(self.currentMessage.id))
self.logger.info("Interrupt playback {}".format(self.currentMessage.id))
# message is playing
self.hugvey.sendCommand({
'action': 'stop',
@ -823,7 +827,7 @@ class Story(object):
# self.previousReply = self.currentReply # we can use this for interrptions
# self.currentReply = self.currentMessage.reply
logger.info("Current message: ({0}) \"{1}\"".format(
self.logger.info("Current message: ({0}) \"{1}\"".format(
message.id, message.text))
self.addToLog(message)
# TODO: prep events & timer etc.
@ -837,14 +841,14 @@ class Story(object):
# 2019-02-22 temporary disable listening while playing audio:
# if self.hugvey.google is not None:
# logger.warn("Temporary 'fix' -> stop recording")
# self.logger.warn("Temporary 'fix' -> stop recording")
# self.hugvey.google.pause()
logger.debug("Pending directions: ")
self.logger.debug("Pending directions: ")
for direction in self.getCurrentDirections():
conditions = [c.id for c in direction.conditions]
logger.debug(
self.logger.debug(
"- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions))
def getCurrentDirections(self):
@ -854,7 +858,7 @@ class Story(object):
return self.directionsPerMsg[self.currentMessage.id]
async def run(self, customStartMsgId = None):
logger.info("Starting story")
self.logger.info("Starting story")
self.timer.reset()
self.isRunning = True
if customStartMsgId is not None:
@ -871,7 +875,7 @@ class Story(object):
return False
def finish(self):
logger.info(f"Finished story for {self.hugvey.id}")
self.logger.info(f"Finished story for {self.hugvey.id}")
self.hugvey.pause()
self.finish_time = time.time()
self.timer.pause()

View File

@ -8,7 +8,8 @@ from hashlib import sha1
import asyncio
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
logger = logging.getLogger("voice")
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("voice")
class VoiceStorage(object):
"""

View File

@ -37,7 +37,13 @@ if __name__ == '__main__':
# default: "%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s"
fmt="%(asctime)s %(hostname)s %(name)s[%(process)d,%(threadName)s] %(levelname)s %(message)s"
)
logger = logging.getLogger("hugvey")
# logger.setLevel(1) # to send all records to cutelog
socket_handler = logging.handlers.SocketHandler('127.0.0.1', 19996) # default listening address
logger.addHandler(socket_handler);
logger.info("Start server")
command = CentralCommand(args=args, debug_mode=args.verbose > 0)
command.loadConfig(args.config)
command.start()