512 lines
18 KiB
Python
512 lines
18 KiB
Python
import asyncio
|
|
import audioop
|
|
import logging
|
|
import pyaudio
|
|
import re
|
|
import socket
|
|
import threading
|
|
import time
|
|
import yaml
|
|
import zmq
|
|
from zmq.asyncio import Context
|
|
import sys
|
|
from hugvey.communication import LOG_BS
|
|
import os
|
|
import collections
|
|
import math
|
|
|
|
try:
|
|
import alsaaudio
|
|
except ImportError:
|
|
print("No volume settings available")
|
|
|
|
from .communication import zmqReceive, zmqSend, getTopic
|
|
import subprocess
|
|
|
|
|
|
logger = logging.getLogger("client")
|
|
|
|
|
|
|
|
def setLogger(hv_id):
|
|
global logger
|
|
logger = logging.getLogger("hugvey").getChild("{}".format(hv_id)).getChild("client")
|
|
|
|
|
|
class VoiceServer(object):
|
|
"""A UDP server, providing mic data at 16 kHz"""
|
|
|
|
def __init__(self, loop, hugvey, config):
|
|
self.config = config
|
|
|
|
self.input_rate = self.config['voice']['input_rate']
|
|
self.target_rate = self.config['voice']['target_rate']
|
|
self.stopped = True
|
|
self.clients = []
|
|
self.laststate = None
|
|
self.ctx = Context.instance()
|
|
self.loop = loop
|
|
self.hugvey = hugvey
|
|
|
|
self.chunk = 4096
|
|
self.mic_prerol_sec = .2
|
|
self.prerol_frame_count = math.ceil((self.input_rate / self.chunk) * self.mic_prerol_sec)
|
|
self.prerol_frames = collections.deque(maxlen = self.prerol_frame_count)
|
|
|
|
self.p = pyaudio.PyAudio()
|
|
# wait a sec for the input devices to come up
|
|
logger.debug("Use a mic prerol of {} frames".format(self.prerol_frame_count))
|
|
logger.debug('wait for mic')
|
|
time.sleep(3)
|
|
logger.debug('done waiting for mic')
|
|
|
|
|
|
self.info = self.get_card_info()
|
|
|
|
|
|
|
|
def get_card_info(self):
|
|
output_device_idx = None
|
|
input_device_idx = None
|
|
|
|
devices_count = self.p.get_device_count()
|
|
for i in range(devices_count):
|
|
dev = self.p.get_device_info_by_index(i)
|
|
if output_device_idx is None and dev['maxOutputChannels'] > 0:
|
|
if (self.config['voice']['output_name'] and self.config['voice']['output_name'] in dev['name']) or \
|
|
(not self.config['voice']['output_name'] and dev['name'] != 'default'):
|
|
output_device_idx = dev['index']
|
|
logger.info("Use output device {0}: {1}".format(
|
|
dev['index'], dev['name']))
|
|
if input_device_idx is None and dev['maxInputChannels'] > 0:
|
|
if (self.config['voice']['input_name'] and self.config['voice']['input_name'] in dev['name']) or \
|
|
(not self.config['voice']['input_name'] and dev['name'] != 'default'):
|
|
input_device_idx = dev['index']
|
|
logger.info("Use input device {0}: {1}".format(
|
|
dev['index'], dev['name']))
|
|
logger.debug("{} {:0d} {} (i: {}, o: {})".format(
|
|
"< " if output_device_idx == i else "> " if input_device_idx == i else "- ", i, dev['name'],
|
|
dev['maxInputChannels'],
|
|
dev['maxOutputChannels']))
|
|
|
|
# Don't continue without pyAudio indexes
|
|
if input_device_idx is None:
|
|
raise Exception("Input device is not found: {}".format(self.config['voice']['input_name']))
|
|
if output_device_idx is None:
|
|
raise Exception("Output device is not found: {}".format(self.config['voice']['output_name']))
|
|
|
|
try:
|
|
# get eg: "hw:1,0" or "hw:0,3" -> used by Sox' play
|
|
output_device_name = self.p.get_device_info_by_index(output_device_idx)['name'].split("(",1)[1][:-1]
|
|
# get eg: "hw:1" or "hw:0" -> used by alsaaudio.Mixer(device=..)
|
|
output_card_name = output_device_name.split(",",1)[0]
|
|
except IndexError as e:
|
|
output_device_name = None
|
|
output_card_name = None
|
|
try:
|
|
input_device_name = self.p.get_device_info_by_index(input_device_idx)['name'].split("(",1)[1][:-1]
|
|
input_card_name = input_device_name.split(",",1)[0]
|
|
except IndexError as e:
|
|
input_device_name = None
|
|
input_card_name = None
|
|
|
|
logger.debug("Output: {}, Input: {}".format(output_device_name, input_device_name))
|
|
|
|
|
|
return {
|
|
'input': {
|
|
'idx': input_device_idx,
|
|
'device': input_device_name,
|
|
'card': input_card_name
|
|
},
|
|
'output': {
|
|
'idx': output_device_idx,
|
|
'device': output_device_name,
|
|
'card': output_card_name
|
|
}
|
|
}
|
|
|
|
|
|
#
|
|
# def get_output_idxs(self):
|
|
# pass
|
|
#
|
|
# def get_input_idx(self):
|
|
# input_device_idx = None
|
|
# # input_device_idx = 6
|
|
# # input_device_idx = 0
|
|
# devices_count = self.p.get_device_count()
|
|
# for i in range(devices_count):
|
|
# dev = self.p.get_device_info_by_index(i)
|
|
# if input_device_idx is None and dev['maxInputChannels'] > 0:
|
|
# if (self.input_name and self.input_name in dev['name']) or \
|
|
# (not self.input_name and dev['name'] != 'default'):
|
|
# input_device_idx = dev['index']
|
|
# logger.info("Use device {0}: {1}".format(
|
|
# dev['index'], dev['name']))
|
|
# logger.debug("{} {:0d} {}".format(
|
|
# "* " if input_device_idx == i else "- ", i, dev['name']))
|
|
# return input_device_idx
|
|
|
|
def onBuffer(self, in_data, frame_count, time_info, status):
|
|
if self.input_rate == self.target_rate:
|
|
f = in_data
|
|
else:
|
|
# chunk 4096, with 2 bytes per frame gives len(in_data) of 8192
|
|
# rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192)
|
|
f, self.laststate = audioop.ratecv(
|
|
in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
|
|
|
|
try:
|
|
if self.hugvey.cmd_server.muteMic:
|
|
# logger.log(LOG_BS, 'block recording {}' .format(
|
|
# self.hugvey.cmd_server.muteMic))
|
|
|
|
# multiply by 0 to disable audio recording while playback
|
|
f = audioop.mul(f, 2, 0)
|
|
|
|
|
|
self.loop.call_soon_threadsafe(self.voice_socket.send, f)
|
|
except Exception as e:
|
|
logger.warn("Error sending to {}".format(e))
|
|
pass
|
|
return (None, pyaudio.paContinue)
|
|
|
|
async def start(self):
|
|
FORMAT = pyaudio.paInt16
|
|
CHANNELS = 1
|
|
CHUNK = 4096
|
|
|
|
|
|
self.stopped = False
|
|
|
|
|
|
if 'alsaaudio' in sys.modules:
|
|
if self.config['voice']['input_mixer'] and self.config['voice']['input_volume'] and self.info['input']['card']:
|
|
logger.info("Set input volume on {}/{} to {}".format(
|
|
self.config['voice']['input_mixer'],
|
|
self.info['input']['card'],
|
|
self.config['voice']['input_volume']
|
|
))
|
|
alsaaudio.Mixer(self.config['voice']['input_mixer'], device=self.info['input']['card']).setvolume(
|
|
self.config['voice']['input_volume'])
|
|
|
|
if self.config['voice']['output_mixer'] and self.config['voice']['output_volume'] and self.info['output']['card']:
|
|
logger.info("Set output volume on {}/{} to {}".format(
|
|
self.config['voice']['output_mixer'],
|
|
self.info['output']['card'],
|
|
self.config['voice']['output_volume']
|
|
))
|
|
alsaaudio.Mixer(self.config['voice']['output_mixer'], device=self.info['output']['card']).setvolume(
|
|
self.config['voice']['output_volume'])
|
|
|
|
stream = self.p.open(
|
|
format=FORMAT,
|
|
channels=CHANNELS,
|
|
rate=self.input_rate,
|
|
input=True,
|
|
frames_per_buffer=CHUNK,
|
|
stream_callback=self.onBuffer,
|
|
input_device_index=self.info['input']['idx']
|
|
)
|
|
|
|
while not self.stopped:
|
|
try:
|
|
address = "tcp://*:{}".format(self.config['voice']['port'] + self.hugvey.id)
|
|
self.voice_socket = self.ctx.socket(zmq.PUB)
|
|
self.voice_socket.set_hwm(15)
|
|
self.voice_socket.bind(address)
|
|
|
|
logger.info(
|
|
"Waiting for voice connections on {}".format(address))
|
|
while not self.stopped:
|
|
await asyncio.sleep(1)
|
|
|
|
logger.info("Stop recording & streaming")
|
|
self.voice_socket.close()
|
|
# stop Recording
|
|
stream.stop_stream()
|
|
stream.close()
|
|
self.p.terminate()
|
|
except Exception as e:
|
|
logging.critical("Socket Exception {}".format(e))
|
|
self.voice_socket.close()
|
|
time.sleep(.5)
|
|
|
|
def stop(self):
|
|
self.stopped = True
|
|
|
|
async def asyncStart(self, loop):
|
|
future = loop.run_in_executor(None, self.start)
|
|
r = await future
|
|
|
|
|
|
class CommandHandler(object):
|
|
def __init__(self, hugvey_id, cmd_address, publish_address, file_address, play_audiodev = None, play_audiodriver=None, remote_ip='8.8.8.8'):
|
|
self.eventQueue = []
|
|
self.ctx = Context.instance()
|
|
self.hugvey_id = hugvey_id
|
|
self.cmd_address = cmd_address
|
|
self.publish_address = publish_address
|
|
self.muteMic = False
|
|
self.playPopen = None
|
|
self.file_address = file_address
|
|
self.playingMsgId = None
|
|
self.play_audiodev = play_audiodev
|
|
self.play_audiodriver = play_audiodriver
|
|
self.remote_ip = remote_ip
|
|
# self.showMyself() # queue message for connection request
|
|
|
|
def handle(self, cmd):
|
|
print('handle', cmd)
|
|
# self.sendMessage({'reply':'test'})
|
|
if not 'action' in cmd:
|
|
logger.critical("Invalid command: {}".format(cmd))
|
|
return
|
|
|
|
logger.info("Received {}".format(cmd))
|
|
|
|
if cmd['action'] == 'show_yourself':
|
|
self.showMyself()
|
|
if cmd['action'] == 'prepare':
|
|
self.muteMic = True
|
|
if cmd['action'] == 'play':
|
|
self.cmdPlay(cmd)
|
|
if cmd['action'] == 'stop':
|
|
self.cmdStop(cmd['id'])
|
|
|
|
def cmdPlay(self, cmd):
|
|
self.muteMic = True
|
|
|
|
msgId = cmd['id']
|
|
pitch = cmd['pitch'] if 'pitch' in cmd else 50
|
|
file = cmd['file'] if 'file' in cmd else None
|
|
text = cmd['msg'] if 'msg' in cmd else None
|
|
params = cmd['params'] if 'params' in cmd else {}
|
|
# use duration for timing the popen duration (and redo it if needed)
|
|
duration = cmd['duration'] if 'duration' in cmd else None
|
|
self.playingMsgId = msgId
|
|
|
|
if self.playPopen:
|
|
logger.info("Interrupting playback of {}".format(self.playingMsgId))
|
|
self.playPopen.kill()
|
|
|
|
err = None
|
|
if file is None and text is None:
|
|
logger.critical("No file nor text given: {}".format(cmd))
|
|
else:
|
|
if file is not None:
|
|
logger.info("Play: {}".format(file))
|
|
file = self.file_address + "/" + file
|
|
# logger.debug(['play', file])
|
|
playCmd = ['play', file]
|
|
|
|
for param, value in params.items():
|
|
if not value:
|
|
continue
|
|
playCmd.append(param)
|
|
print(param, value)
|
|
if value is True:
|
|
continue
|
|
playCmd.append(str(value))
|
|
environment_vars = dict(os.environ)
|
|
if self.play_audiodriver is not None:
|
|
environment_vars['AUDIODRIVER'] = self.play_audiodriver
|
|
elif self.play_audiodev is not None:
|
|
environment_vars['AUDIODEV'] = self.play_audiodev
|
|
|
|
logger.debug(playCmd)
|
|
|
|
t = None
|
|
if duration is not None:
|
|
t = threading.Timer(duration+3, self.checkPopen, (msgId,))
|
|
t.start()
|
|
|
|
self.playPopen = subprocess.Popen(
|
|
playCmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=environment_vars)
|
|
self.sendMessage({
|
|
'event': 'playbackStart',
|
|
'msgId': msgId
|
|
})
|
|
out, err = self.playPopen.communicate()
|
|
returnCode = self.playPopen.returncode if self.playPopen else 0
|
|
logger.debug('finished')
|
|
self.playPopen = None
|
|
|
|
if t is not None:
|
|
t.cancel()
|
|
|
|
else:
|
|
logger.info("Speak: {}".format(text))
|
|
playCmd = ['espeak', '-p', '{0}'.format(pitch), text]
|
|
self.playPopen = subprocess.Popen(
|
|
playCmd, stdout=subprocess.PIPE)
|
|
out, err = self.playPopen.communicate()
|
|
returnCode = self.playPopen.returncode
|
|
self.playPopen = None
|
|
|
|
if returnCode:
|
|
logger.critical("Had returncode {} on play: {}\n\n{}\n{}".format(returnCode, playCmd, out, err))
|
|
else:
|
|
logger.debug("Finished playback.")
|
|
|
|
self.playingMsgId = None
|
|
self.muteMic = False
|
|
self.sendMessage({
|
|
'event': 'playbackFinish',
|
|
'msgId': msgId
|
|
})
|
|
|
|
def checkPopen(self, msgId):
|
|
if self.playingMsgId != msgId:
|
|
return
|
|
|
|
if self.playPopen is None:
|
|
return
|
|
|
|
# prevent a lock of the story, no repeat or anything for now
|
|
logger.critical("Interrupting playback after timeout: {}".format(self.playingMsgId))
|
|
self.playPopen.kill()
|
|
|
|
def cmdStop(self, msgId):
|
|
if self.playPopen and self.playingMsgId == msgId:
|
|
logger.info("Interrupting playback")
|
|
try:
|
|
self.playPopen.terminate()
|
|
except Exception as e:
|
|
logger.critical("Could not stop playback: {}".format(e))
|
|
else:
|
|
logger.warn("Interrupt ignored")
|
|
|
|
def showMyself(self):
|
|
"""Publish about this hugvey to central command
|
|
"""
|
|
self.sendMessage({
|
|
'event': 'connection',
|
|
'id': self.hugvey_id,
|
|
'host': socket.gethostname(),
|
|
'ip': self.getIp(),
|
|
})
|
|
|
|
def getIp(self):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
# TODO: make it a local ip, eg. 192.168.1.1
|
|
s.connect((self.remote_ip, 80))
|
|
ip = s.getsockname()[0]
|
|
s.close()
|
|
return ip
|
|
|
|
def sendMessage(self, msg):
|
|
self.eventQueue.append(msg)
|
|
|
|
async def command_listener(self):
|
|
s = self.ctx.socket(zmq.SUB)
|
|
s.set_hwm(15)
|
|
s.connect(self.cmd_address)
|
|
topic = getTopic(self.hugvey_id)
|
|
s.subscribe(topic)
|
|
logger.info("Subscribed to commands for {} on {}".format(
|
|
topic, self.cmd_address))
|
|
while True:
|
|
hugvey_id, cmd = await zmqReceive(s)
|
|
# print("GOGOG", hugvey_id, cmd)
|
|
t = threading.Thread(target=self.handle, args=(cmd,))
|
|
t.start()
|
|
# topic, msg = await s.recv_multipart()
|
|
# print('received', msg, time.time())
|
|
s.close()
|
|
|
|
async def event_sender(self):
|
|
s = self.ctx.socket(zmq.PUB)
|
|
s.set_hwm(15)
|
|
s.connect(self.publish_address)
|
|
logger.info("Publish on: {}".format(self.publish_address))
|
|
# For some reason, sending only one message is lost, perhaps due
|
|
# to connect() rather than bind() ??
|
|
|
|
await asyncio.sleep(1) # wait for connection to be proper set
|
|
|
|
self.showMyself()
|
|
|
|
while True:
|
|
for i in range(len(self.eventQueue)):
|
|
zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
|
|
if len(self.eventQueue) == 0:
|
|
await asyncio.sleep(0.05)
|
|
|
|
s.close()
|
|
|
|
async def heartbeat(self):
|
|
while True:
|
|
self.showMyself()
|
|
await asyncio.sleep(3)
|
|
|
|
|
|
class Hugvey(object):
|
|
"""The Hugvey client, to be ran on the Raspberry Pi's
|
|
"""
|
|
|
|
def __init__(self, id = None):
|
|
self.id = self.getId() if id is None else id
|
|
setLogger(self.id)
|
|
|
|
def getId(self) -> int:
|
|
"""Get Hugvey ID from hostname"""
|
|
try:
|
|
h = socket.gethostname()
|
|
id = int(re.findall('\d+', h)[0])
|
|
except Exception:
|
|
logger.critical("No automatic ID, fall back to 1")
|
|
id = 1
|
|
|
|
return id
|
|
|
|
def loadConfig(self, filename):
|
|
with open(filename, 'r') as fp:
|
|
logger.debug('Load config from {}'.format(filename))
|
|
self.config = yaml.safe_load(fp)
|
|
|
|
async def startCommandListener(self):
|
|
return await self.cmd_server.command_listener()
|
|
|
|
def start(self):
|
|
logger.debug('Hugvey {}, reporting'.format(self.id))
|
|
|
|
self.loop = asyncio.get_event_loop()
|
|
|
|
self.voice_server = VoiceServer(
|
|
loop=self.loop,
|
|
hugvey=self,
|
|
config=self.config
|
|
)
|
|
|
|
remote_ip = self.config['events']['remote_ip'] if 'remote_ip' in self.config['events'] else '8.8.8.8'
|
|
logger.debug("Using remote_ip for getIp: {}".format(remote_ip))
|
|
self.cmd_server = CommandHandler(
|
|
hugvey_id=self.id,
|
|
cmd_address=self.config['events']['cmd_address'],
|
|
publish_address=self.config['events']['publish_address'],
|
|
file_address=self.config['voice']['file_address'],
|
|
play_audiodev=self.voice_server.info['output']['device'],
|
|
play_audiodriver=self.config['voice']['output_driver'] if 'output_driver' in self.config['voice'] else None,
|
|
remote_ip=remote_ip,
|
|
)
|
|
|
|
logger.info('start')
|
|
# self.voice_server.asyncStart(loop)
|
|
# loop.run_until_complete(self.voice_server.start())
|
|
asyncio.ensure_future(self.catchException(self.voice_server.start()))
|
|
asyncio.ensure_future(self.catchException(self.cmd_server.command_listener()))
|
|
asyncio.ensure_future(self.catchException(self.cmd_server.event_sender()))
|
|
asyncio.ensure_future(self.catchException(self.cmd_server.heartbeat()))
|
|
self.loop.run_forever()
|
|
logger.info('done')
|
|
|
|
async def catchException(self, awaitable):
|
|
try:
|
|
await awaitable
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
logger.critical("Hugvey quiting")
|
|
# self.loop.stop() # not fully quits program for reboot
|
|
exit()
|