hugvey/hugvey/client.py

539 lines
20 KiB
Python

import asyncio
import audioop
import logging
import pyaudio
import re
import socket
import threading
import time
import yaml
import zmq
from zmq.asyncio import Context
import sys
from hugvey.communication import LOG_BS
import os
import collections
import math
import urllib.request
import io
try:
import alsaaudio
except ImportError:
print("No volume settings available")
from .communication import zmqReceive, zmqSend, getTopic
import subprocess
logger = logging.getLogger("client")
def setLogger(hv_id):
global logger
logger = logging.getLogger("hugvey").getChild("{}".format(hv_id)).getChild("client")
class VoiceServer(object):
"""A UDP server, providing mic data at 16 kHz"""
def __init__(self, loop, hugvey, config):
self.config = config
self.input_rate = self.config['voice']['input_rate']
self.target_rate = self.config['voice']['target_rate']
self.stopped = True
self.clients = []
self.laststate = None
self.ctx = Context.instance()
self.loop = loop
self.hugvey = hugvey
self.chunk = 4096
self.mic_prerol_sec = .2
self.prerol_frame_count = math.ceil((self.input_rate / self.chunk) * self.mic_prerol_sec)
self.prerol_frames = collections.deque(maxlen = self.prerol_frame_count)
self.p = pyaudio.PyAudio()
# wait a sec for the input devices to come up
logger.debug("Use a mic prerol of {} frames".format(self.prerol_frame_count))
logger.debug('wait for mic')
time.sleep(3)
logger.debug('done waiting for mic')
self.info = self.get_card_info()
def get_card_info(self):
output_device_idx = None
input_device_idx = None
devices_count = self.p.get_device_count()
for i in range(devices_count):
dev = self.p.get_device_info_by_index(i)
if output_device_idx is None and dev['maxOutputChannels'] > 0:
if (self.config['voice']['output_name'] and self.config['voice']['output_name'] in dev['name']) or \
(not self.config['voice']['output_name'] and dev['name'] != 'default'):
output_device_idx = dev['index']
logger.info("Use output device {0}: {1}".format(
dev['index'], dev['name']))
if input_device_idx is None and dev['maxInputChannels'] > 0:
if (self.config['voice']['input_name'] and self.config['voice']['input_name'] in dev['name']) or \
(not self.config['voice']['input_name'] and dev['name'] != 'default'):
input_device_idx = dev['index']
logger.info("Use input device {0}: {1}".format(
dev['index'], dev['name']))
logger.debug("{} {:0d} {} (i: {}, o: {})".format(
"< " if output_device_idx == i else "> " if input_device_idx == i else "- ", i, dev['name'],
dev['maxInputChannels'],
dev['maxOutputChannels']))
# Don't continue without pyAudio indexes
if input_device_idx is None:
raise Exception("Input device is not found: {}".format(self.config['voice']['input_name']))
if output_device_idx is None:
raise Exception("Output device is not found: {}".format(self.config['voice']['output_name']))
try:
# get eg: "hw:1,0" or "hw:0,3" -> used by Sox' play
output_device_name = self.p.get_device_info_by_index(output_device_idx)['name'].split("(",1)[1][:-1]
# get eg: "hw:1" or "hw:0" -> used by alsaaudio.Mixer(device=..)
output_card_name = output_device_name.split(",",1)[0]
except IndexError as e:
output_device_name = None
output_card_name = None
try:
input_device_name = self.p.get_device_info_by_index(input_device_idx)['name'].split("(",1)[1][:-1]
input_card_name = input_device_name.split(",",1)[0]
except IndexError as e:
input_device_name = None
input_card_name = None
logger.debug("Output: {}, Input: {}".format(output_device_name, input_device_name))
return {
'input': {
'idx': input_device_idx,
'device': input_device_name,
'card': input_card_name
},
'output': {
'idx': output_device_idx,
'device': output_device_name,
'card': output_card_name
}
}
#
# def get_output_idxs(self):
# pass
#
# def get_input_idx(self):
# input_device_idx = None
# # input_device_idx = 6
# # input_device_idx = 0
# devices_count = self.p.get_device_count()
# for i in range(devices_count):
# dev = self.p.get_device_info_by_index(i)
# if input_device_idx is None and dev['maxInputChannels'] > 0:
# if (self.input_name and self.input_name in dev['name']) or \
# (not self.input_name and dev['name'] != 'default'):
# input_device_idx = dev['index']
# logger.info("Use device {0}: {1}".format(
# dev['index'], dev['name']))
# logger.debug("{} {:0d} {}".format(
# "* " if input_device_idx == i else "- ", i, dev['name']))
# return input_device_idx
def onBuffer(self, in_data, frame_count, time_info, status):
if self.input_rate == self.target_rate:
f = in_data
else:
# chunk 4096, with 2 bytes per frame gives len(in_data) of 8192
# rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192)
f, self.laststate = audioop.ratecv(
in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
try:
if self.hugvey.cmd_server.muteMic:
# logger.log(LOG_BS, 'block recording {}' .format(
# self.hugvey.cmd_server.muteMic))
# multiply by 0 to disable audio recording while playback
f = audioop.mul(f, 2, 0)
self.loop.call_soon_threadsafe(self.voice_socket.send, f)
except Exception as e:
logger.warn("Error sending to {}".format(e))
pass
return (None, pyaudio.paContinue)
async def start(self):
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK = 4096
self.stopped = False
if 'alsaaudio' in sys.modules:
if self.config['voice']['input_mixer'] and self.config['voice']['input_volume'] and self.info['input']['card']:
logger.info("Set input volume on {}/{} to {}".format(
self.config['voice']['input_mixer'],
self.info['input']['card'],
self.config['voice']['input_volume']
))
alsaaudio.Mixer(self.config['voice']['input_mixer'], device=self.info['input']['card']).setvolume(
self.config['voice']['input_volume'])
if self.config['voice']['output_mixer'] and self.config['voice']['output_volume'] and self.info['output']['card']:
logger.info("Set output volume on {}/{} to {}".format(
self.config['voice']['output_mixer'],
self.info['output']['card'],
self.config['voice']['output_volume']
))
alsaaudio.Mixer(self.config['voice']['output_mixer'], device=self.info['output']['card']).setvolume(
self.config['voice']['output_volume'])
stream = self.p.open(
format=FORMAT,
channels=CHANNELS,
rate=self.input_rate,
input=True,
frames_per_buffer=CHUNK,
stream_callback=self.onBuffer,
input_device_index=self.info['input']['idx']
)
while not self.stopped:
try:
address = "tcp://*:{}".format(self.config['voice']['port'] + self.hugvey.id)
self.voice_socket = self.ctx.socket(zmq.PUB)
self.voice_socket.set_hwm(25) # not too high, otherwise we flood the network after a short interuption (with stuff that is 'old' anyway)
self.voice_socket.bind(address)
logger.info(
"Waiting for voice connections on {}".format(address))
while not self.stopped:
await asyncio.sleep(1)
logger.info("Stop recording & streaming")
self.voice_socket.close()
# stop Recording
stream.stop_stream()
stream.close()
self.p.terminate()
except Exception as e:
logging.critical("Socket Exception {}".format(e))
self.voice_socket.close()
time.sleep(.5)
def stop(self):
self.stopped = True
async def asyncStart(self, loop):
future = loop.run_in_executor(None, self.start)
r = await future
class CommandHandler(object):
def __init__(self, hugvey_id, cmd_address, publish_address, file_address, play_audiodev = None, play_audiodriver=None, remote_ip='8.8.8.8'):
self.eventQueue = []
self.ctx = Context.instance()
self.hugvey_id = hugvey_id
self.cmd_address = cmd_address
self.publish_address = publish_address
self.muteMic = False
self.playPopen = None
self.file_address = file_address
self.playingMsgId = None
self.play_audiodev = play_audiodev
self.play_audiodriver = play_audiodriver
self.remote_ip = remote_ip
# self.showMyself() # queue message for connection request
def handle(self, cmd):
try:
print('handle', cmd)
# self.sendMessage({'reply':'test'})
if not 'action' in cmd:
logger.critical("Invalid command: {}".format(cmd))
return
logger.info("Received {}".format(cmd))
if cmd['action'] == 'show_yourself':
self.showMyself()
if cmd['action'] == 'prepare':
self.muteMic = True
if cmd['action'] == 'play':
self.cmdPlay(cmd)
if cmd['action'] == 'stop':
self.cmdStop(cmd['id'])
except Exception as e:
logger.critical("Exception during handling command: {}".format(cmd))
logger.exception(e)
def cmdPlay(self, cmd):
self.muteMic = True
msgId = cmd['id']
pitch = cmd['pitch'] if 'pitch' in cmd else 50
filepath = cmd['file'] if 'file' in cmd else None
text = cmd['msg'] if 'msg' in cmd else None
params = cmd['params'] if 'params' in cmd else {}
# use duration for timing the popen duration (and redo it if needed)
duration = cmd['duration'] if 'duration' in cmd else None
self.playingMsgId = msgId
try:
if self.playPopen:
logger.info("Interrupting playback of {}".format(self.playingMsgId))
self.playPopen.kill()
err = None
if filepath is None and text is None:
logger.critical("No file nor text given: {}".format(cmd))
else:
if filepath is not None:
file = self.file_address + "/" + filepath
logger.debug("Fetch to play: {}".format(filepath))
start = time.time()
#: var response: http.client.HTTPResponse
response = urllib.request.urlopen(file, timeout=4)
fetchend = time.time()
logger.info("Fetched {} in {:.4f}s".format(file, fetchend-start))
if fetchend-start > 1:
logger.warning("Super slow fetching of {} in {}s".format(file, fetchend-start))
if response.getcode() != 200:
logger.critical("Error fetching: {} - {}".format(file, response))
else:
audioFile = io.BytesIO(response.read())
logger.info("Play: {}".format(filepath))
# logger.debug(['play', file])
playCmd = ['play', '-']
for param, value in params.items():
if not value:
continue
playCmd.append(param)
print(param, value)
if value is True:
continue
playCmd.append(str(value))
environment_vars = dict(os.environ)
if self.play_audiodriver is not None:
environment_vars['AUDIODRIVER'] = self.play_audiodriver
elif self.play_audiodev is not None:
environment_vars['AUDIODEV'] = self.play_audiodev
logger.debug(playCmd)
t = None
if duration is not None:
t = threading.Timer(duration+3, self.checkPopen, (msgId,duration+3))
t.start()
self.playPopen = subprocess.Popen(
playCmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=environment_vars)
self.sendMessage({
'event': 'playbackStart',
'msgId': msgId
})
out, err = self.playPopen.communicate(input=audioFile.getvalue())
playend = time.time()
returnCode = self.playPopen.returncode if self.playPopen else 0
logger.info('finished playing {} in {:.4f}s (duration: {}s)'.format(filepath, playend-fetchend, duration))
self.playPopen = None
if t is not None:
t.cancel()
else:
logger.info("Speak: {}".format(text))
playCmd = ['espeak', '-p', '{0}'.format(pitch), text]
self.playPopen = subprocess.Popen(
playCmd, stdout=subprocess.PIPE)
out, err = self.playPopen.communicate()
returnCode = self.playPopen.returncode
self.playPopen = None
if returnCode:
logger.critical("Had returncode {} on play: {}\n\n{}\n{}".format(returnCode, playCmd, out, err))
else:
logger.debug("Finished playback.")
except Exception as e:
# sometimes playback gives a timeout, then a playbackFinish still needs to be send to inform the server to continue
logger.critical("Exception during playing of message")
logger.exception(e)
self.playingMsgId = None
self.muteMic = False
self.sendMessage({
'event': 'playbackFinish',
'msgId': msgId
})
def checkPopen(self, msgId, duration):
if self.playingMsgId != msgId:
return
if self.playPopen is None:
return
# prevent a lock of the story, no repeat or anything for now
logger.critical("Interrupting playback after timeout of {}: {}".format(str(duration), self.playingMsgId))
self.playPopen.kill()
def cmdStop(self, msgId):
if self.playPopen and self.playingMsgId == msgId:
logger.info("Interrupting playback")
try:
self.playPopen.terminate()
except Exception as e:
logger.critical("Could not stop playback: {}".format(e))
else:
logger.warn("Interrupt ignored")
def showMyself(self):
"""Publish about this hugvey to central command
"""
self.sendMessage({
'event': 'connection',
'id': self.hugvey_id,
'host': socket.gethostname(),
'ip': self.getIp(),
})
def getIp(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# TODO: make it a local ip, eg. 192.168.1.1
s.connect((self.remote_ip, 80))
ip = s.getsockname()[0]
s.close()
return ip
def sendMessage(self, msg):
self.eventQueue.append(msg)
async def command_listener(self):
s = self.ctx.socket(zmq.SUB)
s.set_hwm(15)
s.connect(self.cmd_address)
topic = getTopic(self.hugvey_id)
s.subscribe(topic)
logger.info("Subscribed to commands for {} on {}".format(
topic, self.cmd_address))
while True:
hugvey_id, cmd = await zmqReceive(s)
# print("GOGOG", hugvey_id, cmd)
t = threading.Thread(target=self.handle, args=(cmd,))
t.start()
# topic, msg = await s.recv_multipart()
# print('received', msg, time.time())
s.close()
async def event_sender(self):
s = self.ctx.socket(zmq.PUB)
s.set_hwm(15)
s.connect(self.publish_address)
logger.info("Publish on: {}".format(self.publish_address))
# For some reason, sending only one message is lost, perhaps due
# to connect() rather than bind() ??
await asyncio.sleep(1) # wait for connection to be proper set
self.showMyself()
while True:
for i in range(len(self.eventQueue)):
zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
if len(self.eventQueue) == 0:
await asyncio.sleep(0.05)
s.close()
async def heartbeat(self):
while True:
self.showMyself()
await asyncio.sleep(3)
class Hugvey(object):
"""The Hugvey client, to be ran on the Raspberry Pi's
"""
def __init__(self, id = None):
self.id = self.getId() if id is None else id
setLogger(self.id)
def getId(self) -> int:
"""Get Hugvey ID from hostname"""
try:
h = socket.gethostname()
id = int(re.findall('\d+', h)[0])
except Exception:
logger.critical("No automatic ID, fall back to 1")
id = 1
return id
def loadConfig(self, filename):
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
async def startCommandListener(self):
return await self.cmd_server.command_listener()
def start(self):
logger.debug('Hugvey {}, reporting'.format(self.id))
self.loop = asyncio.get_event_loop()
self.voice_server = VoiceServer(
loop=self.loop,
hugvey=self,
config=self.config
)
remote_ip = self.config['events']['remote_ip'] if 'remote_ip' in self.config['events'] else '8.8.8.8'
logger.debug("Using remote_ip for getIp: {}".format(remote_ip))
self.cmd_server = CommandHandler(
hugvey_id=self.id,
cmd_address=self.config['events']['cmd_address'],
publish_address=self.config['events']['publish_address'],
file_address=self.config['voice']['file_address'],
play_audiodev=self.voice_server.info['output']['device'],
play_audiodriver=self.config['voice']['output_driver'] if 'output_driver' in self.config['voice'] else None,
remote_ip=remote_ip,
)
logger.info('start')
# self.voice_server.asyncStart(loop)
# loop.run_until_complete(self.voice_server.start())
asyncio.ensure_future(self.catchException(self.voice_server.start()))
asyncio.ensure_future(self.catchException(self.cmd_server.command_listener()))
asyncio.ensure_future(self.catchException(self.cmd_server.event_sender()))
asyncio.ensure_future(self.catchException(self.cmd_server.heartbeat()))
self.loop.run_forever()
logger.info('done')
async def catchException(self, awaitable):
try:
await awaitable
except Exception as e:
logger.exception(e)
logger.critical("Hugvey quiting")
# self.loop.stop() # not fully quits program for reboot
exit()