hugvey/hugvey/client.py

453 lines
16 KiB
Python

import asyncio
import audioop
import logging
import pyaudio
import re
import socket
import threading
import time
import yaml
import zmq
from zmq.asyncio import Context
import sys
from hugvey.communication import LOG_BS
import os
import collections
import math
try:
import alsaaudio
except ImportError:
print("No volume settings available")
from .communication import zmqReceive, zmqSend, getTopic
import subprocess
logger = logging.getLogger("client")
def setLogger(hv_id):
global logger
logger = logging.getLogger("hugvey").getChild("{}".format(hv_id)).getChild("client")
class VoiceServer(object):
"""A UDP server, providing mic data at 16 kHz"""
def __init__(self, loop, hugvey, config):
self.config = config
self.input_rate = self.config['voice']['input_rate']
self.target_rate = self.config['voice']['target_rate']
self.stopped = True
self.clients = []
self.laststate = None
self.ctx = Context.instance()
self.loop = loop
self.hugvey = hugvey
self.chunk = 4096
self.mic_prerol_sec = .3
self.prerol_frame_count = math.ceil(self.input_rate / self.chunk)
self.prerol_frames = collections.deque(maxlen = self.prerol_frame_count)
self.p = pyaudio.PyAudio()
# wait a sec for the input devices to come up
logger.debug('wait for mic')
time.sleep(3)
logger.debug('done waiting for mic')
self.info = self.get_card_info()
def get_card_info(self):
output_device_idx = None
input_device_idx = None
devices_count = self.p.get_device_count()
for i in range(devices_count):
dev = self.p.get_device_info_by_index(i)
if output_device_idx is None and dev['maxOutputChannels'] > 0:
if (self.config['voice']['output_name'] and self.config['voice']['output_name'] in dev['name']) or \
(not self.config['voice']['output_name'] and dev['name'] != 'default'):
output_device_idx = dev['index']
logger.info("Use output device {0}: {1}".format(
dev['index'], dev['name']))
if input_device_idx is None and dev['maxInputChannels'] > 0:
if (self.config['voice']['input_name'] and self.config['voice']['input_name'] in dev['name']) or \
(not self.config['voice']['input_name'] and dev['name'] != 'default'):
input_device_idx = dev['index']
logger.info("Use input device {0}: {1}".format(
dev['index'], dev['name']))
logger.debug("{} {:0d} {} (i: {}, o: {})".format(
"< " if output_device_idx == i else "> " if input_device_idx == i else "- ", i, dev['name'],
dev['maxInputChannels'],
dev['maxOutputChannels']))
# Don't continue without pyAudio indexes
if input_device_idx is None:
raise Exception("Input device is not found: {}".format(self.config['voice']['input_name']))
if output_device_idx is None:
raise Exception("Output device is not found: {}".format(self.config['voice']['output_name']))
try:
# get eg: "hw:1,0" or "hw:0,3" -> used by Sox' play
output_device_name = self.p.get_device_info_by_index(output_device_idx)['name'].split("(",1)[1][:-1]
# get eg: "hw:1" or "hw:0" -> used by alsaaudio.Mixer(device=..)
output_card_name = output_device_name.split(",",1)[0]
except IndexError as e:
output_device_name = None
output_card_name = None
try:
input_device_name = self.p.get_device_info_by_index(input_device_idx)['name'].split("(",1)[1][:-1]
input_card_name = input_device_name.split(",",1)[0]
except IndexError as e:
input_device_name = None
input_card_name = None
print(output_device_name, input_device_name)
return {
'input': {
'idx': input_device_idx,
'device': input_device_name,
'card': input_card_name
},
'output': {
'idx': output_device_idx,
'device': output_device_name,
'card': output_card_name
}
}
#
# def get_output_idxs(self):
# pass
#
# def get_input_idx(self):
# input_device_idx = None
# # input_device_idx = 6
# # input_device_idx = 0
# devices_count = self.p.get_device_count()
# for i in range(devices_count):
# dev = self.p.get_device_info_by_index(i)
# if input_device_idx is None and dev['maxInputChannels'] > 0:
# if (self.input_name and self.input_name in dev['name']) or \
# (not self.input_name and dev['name'] != 'default'):
# input_device_idx = dev['index']
# logger.info("Use device {0}: {1}".format(
# dev['index'], dev['name']))
# logger.debug("{} {:0d} {}".format(
# "* " if input_device_idx == i else "- ", i, dev['name']))
# return input_device_idx
def onBuffer(self, in_data, frame_count, time_info, status):
if self.input_rate == self.target_rate:
f = in_data
else:
# chunk 4096, with 2 bytes per frame gives len(in_data) of 8192
# rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192)
f, self.laststate = audioop.ratecv(
in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
try:
if self.hugvey.cmd_server.playPopen is not None:
logger.debug('block recording {}' .format(
self.hugvey.cmd_server.playPopen))
# if recording is blocked, store the latest n frames. So we can release
# this buffer to the mic stream
self.prerol_frames.append(f)
# multiply by 0 to disable audio recording while playback
f = audioop.mul(f, 2, 0)
elif len(self.prerol_frames) > 0:
self.prerol_frames.append(f)
f = b''.join(self.prerol_frames)
self.prerol_frames.clear()
logger.info('used buffer, len now {}'.format(len(f)))
self.loop.call_soon_threadsafe(self.voice_socket.send, f)
except Exception as e:
logger.warn("Error sending to {}".format(e))
pass
return (None, pyaudio.paContinue)
async def start(self):
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK = 4096
self.stopped = False
if 'alsaaudio' in sys.modules:
if self.config['voice']['input_mixer'] and self.config['voice']['input_volume'] and self.info['input']['card']:
logger.info("Set input volume on {}/{} to {}".format(
self.config['voice']['input_mixer'],
self.info['input']['card'],
self.config['voice']['input_volume']
))
alsaaudio.Mixer(self.config['voice']['input_mixer'], device=self.info['input']['card']).setvolume(
self.config['voice']['input_volume'])
if self.config['voice']['output_mixer'] and self.config['voice']['output_volume'] and self.info['output']['card']:
logger.info("Set output volume on {}/{} to {}".format(
self.config['voice']['output_mixer'],
self.info['output']['card'],
self.config['voice']['output_volume']
))
alsaaudio.Mixer(self.config['voice']['output_mixer'], device=self.info['output']['card']).setvolume(
self.config['voice']['output_volume'])
stream = self.p.open(
format=FORMAT,
channels=CHANNELS,
rate=self.input_rate,
input=True,
frames_per_buffer=CHUNK,
stream_callback=self.onBuffer,
input_device_index=self.info['input']['idx']
)
while not self.stopped:
try:
address = "tcp://*:{}".format(self.config['voice']['port'] + self.hugvey.id)
self.voice_socket = self.ctx.socket(zmq.PUB)
self.voice_socket.bind(address)
logger.info(
"Waiting for voice connections on {}".format(address))
while not self.stopped:
await asyncio.sleep(1)
logger.info("Stop recording & streaming")
self.voice_socket.close()
# stop Recording
stream.stop_stream()
stream.close()
self.p.terminate()
except Exception as e:
logging.critical("Socket Exception {}".format(e))
self.voice_socket.close()
time.sleep(.5)
def stop(self):
self.stopped = True
async def asyncStart(self, loop):
future = loop.run_in_executor(None, self.start)
r = await future
class CommandHandler(object):
def __init__(self, hugvey_id, cmd_address, publish_address, file_address, play_audiodev = None):
self.eventQueue = []
self.ctx = Context.instance()
self.hugvey_id = hugvey_id
self.cmd_address = cmd_address
self.publish_address = publish_address
self.playPopen = None
self.file_address = file_address
self.playingMsgId = None
self.play_audiodev = play_audiodev
# self.showMyself() # queue message for connection request
def handle(self, cmd):
print('handle', cmd)
# self.sendMessage({'reply':'test'})
if not 'action' in cmd:
logger.critical("Invalid command: {}".format(cmd))
return
logger.info("Received {}".format(cmd))
if cmd['action'] == 'show_yourself':
self.showMyself()
if cmd['action'] == 'play':
self.cmdPlay(cmd)
if cmd['action'] == 'stop':
self.cmdPlay(cmd, cmd['id'])
def cmdPlay(self, cmd):
msgId = cmd['id']
pitch = cmd['pitch'] if 'pitch' in cmd else 50
file = cmd['file'] if 'file' in cmd else None
text = cmd['msg'] if 'msg' in cmd else None
params = cmd['params'] if 'params' in cmd else {}
self.playingMsgId = msgId
if file is None and text is None:
logger.critical("No file nor text given: {}".format(cmd))
else:
if file is not None:
logger.info("Play: {}".format(file))
file = self.file_address + "/" + file
# logger.debug(['play', file])
playCmd = ['play', file]
for param, value in params.items():
if not value:
continue
playCmd.append(param)
print(param, value)
if value is True:
continue
playCmd.append(str(value))
environment_vars = dict(os.environ)
if self.play_audiodev is not None:
environment_vars['AUDIODEV'] = self.play_audiodev
logger.debug(playCmd)
self.playPopen = subprocess.Popen(
playCmd, stdout=subprocess.PIPE, env=environment_vars)
returnCode = self.playPopen.wait()
logger.debug('finished')
self.playPopen = None
else:
logger.info("Speak: {}".format(text))
self.playPopen = subprocess.Popen(
['espeak', '-p', '{0}'.format(pitch), text], stdout=subprocess.PIPE)
returnCode = self.playPopen.wait()
self.playPopen = None
if returnCode:
logger.critical("Had returncode on play: {}".format(returnCode))
else:
logger.debug(
"Finished playback. Return code: {}".format(returnCode))
self.playingMsgId = None
self.sendMessage({
'event': 'playbackFinish',
'msgId': msgId
})
def cmdStop(self, msgId):
if self.playPopen and self.playingMsgId == msgId:
logger.info("Interrupting playback")
try:
self.playPopen.terminate()
except Exception as e:
logger.critical("Could not stop playback: {}".format(e))
else:
logger.warn("Interrupt ignored")
def showMyself(self):
"""Publish about this hugvey to central command
"""
self.sendMessage({
'event': 'connection',
'id': self.hugvey_id,
'host': socket.gethostname(),
'ip': self.getIp(),
})
@staticmethod
def getIp():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("185.66.250.60", 80))
return s.getsockname()[0]
def sendMessage(self, msg):
self.eventQueue.append(msg)
async def command_listener(self):
s = self.ctx.socket(zmq.SUB)
s.connect(self.cmd_address)
topic = getTopic(self.hugvey_id)
s.subscribe(topic)
logger.info("Subscribed to commands for {} on {}".format(
topic, self.cmd_address))
while True:
hugvey_id, cmd = await zmqReceive(s)
# print("GOGOG", hugvey_id, cmd)
t = threading.Thread(target=self.handle, args=(cmd,))
t.start()
# topic, msg = await s.recv_multipart()
# print('received', msg, time.time())
s.close()
async def event_sender(self):
s = self.ctx.socket(zmq.PUB)
s.connect(self.publish_address)
logger.info("Publish on: {}".format(self.publish_address))
# For some reason, sending only one message is lost, perhaps due
# to connect() rather than bind() ??
await asyncio.sleep(1) # wait for connection to be proper set
self.showMyself()
while True:
for i in range(len(self.eventQueue)):
zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
if len(self.eventQueue) == 0:
await asyncio.sleep(0.05)
s.close()
class Hugvey(object):
"""The Hugvey client, to be ran on the Raspberry Pi's
"""
def __init__(self, id = None):
self.id = self.getId() if id is None else id
setLogger(self.id)
def getId(self) -> int:
"""Get Hugvey ID from hostname"""
try:
h = socket.gethostname()
id = int(re.findall('\d+', h)[0])
except Exception:
logger.critical("No automatic ID, fall back to 1")
id = 1
return id
def loadConfig(self, filename):
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
async def startCommandListener(self):
return await self.cmd_server.command_listener()
def start(self):
logger.debug('Hugvey {}, reporting'.format(self.id))
loop = asyncio.get_event_loop()
self.voice_server = VoiceServer(
loop=loop,
hugvey=self,
config=self.config
)
self.cmd_server = CommandHandler(
hugvey_id=self.id,
cmd_address=self.config['events']['cmd_address'],
publish_address=self.config['events']['publish_address'],
file_address=self.config['voice']['file_address'],
play_audiodev=self.voice_server.info['output']['device']
)
logger.info('start')
# self.voice_server.asyncStart(loop)
# loop.run_until_complete(self.voice_server.start())
asyncio.ensure_future(self.voice_server.start())
asyncio.ensure_future(self.cmd_server.command_listener())
asyncio.ensure_future(self.cmd_server.event_sender())
self.cmd_server.showMyself()
loop.run_forever()
logger.info('done')