hugvey/hugvey/client.py

534 lines
20 KiB
Python
Raw Normal View History

import asyncio
2019-01-15 21:40:44 +01:00
import audioop
import logging
import pyaudio
import re
import socket
import threading
2019-01-15 21:40:44 +01:00
import time
import yaml
2019-01-15 21:40:44 +01:00
import zmq
from zmq.asyncio import Context
import sys
2019-02-28 18:58:03 +01:00
from hugvey.communication import LOG_BS
import os
import collections
import math
2019-11-13 11:19:21 +01:00
import urllib.request
import io
2019-01-28 17:27:38 +01:00
try:
2019-02-28 18:58:03 +01:00
import alsaaudio
2019-01-28 17:27:38 +01:00
except ImportError:
2019-02-28 18:58:03 +01:00
print("No volume settings available")
2019-01-28 17:27:38 +01:00
2019-01-15 21:40:44 +01:00
from .communication import zmqReceive, zmqSend, getTopic
import subprocess
2019-01-15 21:40:44 +01:00
logger = logging.getLogger("client")
2019-04-10 11:13:42 +02:00
def setLogger(hv_id):
global logger
logger = logging.getLogger("hugvey").getChild("{}".format(hv_id)).getChild("client")
2019-02-28 18:58:03 +01:00
2019-01-15 21:40:44 +01:00
class VoiceServer(object):
"""A UDP server, providing mic data at 16 kHz"""
2019-02-28 18:58:03 +01:00
def __init__(self, loop, hugvey, config):
self.config = config
self.input_rate = self.config['voice']['input_rate']
self.target_rate = self.config['voice']['target_rate']
2019-01-15 21:40:44 +01:00
self.stopped = True
self.clients = []
self.laststate = None
self.ctx = Context.instance()
self.loop = loop
self.hugvey = hugvey
self.chunk = 4096
2019-04-12 11:56:47 +02:00
self.mic_prerol_sec = .2
self.prerol_frame_count = math.ceil((self.input_rate / self.chunk) * self.mic_prerol_sec)
self.prerol_frames = collections.deque(maxlen = self.prerol_frame_count)
self.p = pyaudio.PyAudio()
# wait a sec for the input devices to come up
2019-04-12 11:56:47 +02:00
logger.debug("Use a mic prerol of {} frames".format(self.prerol_frame_count))
logger.debug('wait for mic')
time.sleep(3)
logger.debug('done waiting for mic')
self.info = self.get_card_info()
def get_card_info(self):
output_device_idx = None
2019-01-15 21:40:44 +01:00
input_device_idx = None
2019-01-15 21:40:44 +01:00
devices_count = self.p.get_device_count()
for i in range(devices_count):
dev = self.p.get_device_info_by_index(i)
if output_device_idx is None and dev['maxOutputChannels'] > 0:
if (self.config['voice']['output_name'] and self.config['voice']['output_name'] in dev['name']) or \
(not self.config['voice']['output_name'] and dev['name'] != 'default'):
output_device_idx = dev['index']
logger.info("Use output device {0}: {1}".format(
dev['index'], dev['name']))
if input_device_idx is None and dev['maxInputChannels'] > 0:
if (self.config['voice']['input_name'] and self.config['voice']['input_name'] in dev['name']) or \
(not self.config['voice']['input_name'] and dev['name'] != 'default'):
input_device_idx = dev['index']
logger.info("Use input device {0}: {1}".format(
2019-02-28 18:58:03 +01:00
dev['index'], dev['name']))
logger.debug("{} {:0d} {} (i: {}, o: {})".format(
"< " if output_device_idx == i else "> " if input_device_idx == i else "- ", i, dev['name'],
dev['maxInputChannels'],
dev['maxOutputChannels']))
# Don't continue without pyAudio indexes
if input_device_idx is None:
raise Exception("Input device is not found: {}".format(self.config['voice']['input_name']))
if output_device_idx is None:
raise Exception("Output device is not found: {}".format(self.config['voice']['output_name']))
try:
# get eg: "hw:1,0" or "hw:0,3" -> used by Sox' play
output_device_name = self.p.get_device_info_by_index(output_device_idx)['name'].split("(",1)[1][:-1]
# get eg: "hw:1" or "hw:0" -> used by alsaaudio.Mixer(device=..)
output_card_name = output_device_name.split(",",1)[0]
except IndexError as e:
output_device_name = None
output_card_name = None
try:
input_device_name = self.p.get_device_info_by_index(input_device_idx)['name'].split("(",1)[1][:-1]
input_card_name = input_device_name.split(",",1)[0]
except IndexError as e:
input_device_name = None
input_card_name = None
logger.debug("Output: {}, Input: {}".format(output_device_name, input_device_name))
return {
'input': {
'idx': input_device_idx,
'device': input_device_name,
'card': input_card_name
},
'output': {
'idx': output_device_idx,
'device': output_device_name,
'card': output_card_name
}
}
#
# def get_output_idxs(self):
# pass
#
# def get_input_idx(self):
# input_device_idx = None
# # input_device_idx = 6
# # input_device_idx = 0
# devices_count = self.p.get_device_count()
# for i in range(devices_count):
# dev = self.p.get_device_info_by_index(i)
# if input_device_idx is None and dev['maxInputChannels'] > 0:
# if (self.input_name and self.input_name in dev['name']) or \
# (not self.input_name and dev['name'] != 'default'):
# input_device_idx = dev['index']
# logger.info("Use device {0}: {1}".format(
# dev['index'], dev['name']))
# logger.debug("{} {:0d} {}".format(
# "* " if input_device_idx == i else "- ", i, dev['name']))
# return input_device_idx
2019-01-15 21:40:44 +01:00
def onBuffer(self, in_data, frame_count, time_info, status):
if self.input_rate == self.target_rate:
f = in_data
else:
# chunk 4096, with 2 bytes per frame gives len(in_data) of 8192
# rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192)
2019-02-28 18:58:03 +01:00
f, self.laststate = audioop.ratecv(
in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
try:
if self.hugvey.cmd_server.muteMic:
# logger.log(LOG_BS, 'block recording {}' .format(
# self.hugvey.cmd_server.muteMic))
2019-02-28 18:58:03 +01:00
# multiply by 0 to disable audio recording while playback
f = audioop.mul(f, 2, 0)
self.loop.call_soon_threadsafe(self.voice_socket.send, f)
except Exception as e:
logger.warn("Error sending to {}".format(e))
pass
2019-01-15 21:40:44 +01:00
return (None, pyaudio.paContinue)
async def start(self):
2019-01-15 21:40:44 +01:00
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK = 4096
2019-01-15 21:40:44 +01:00
self.stopped = False
if 'alsaaudio' in sys.modules:
if self.config['voice']['input_mixer'] and self.config['voice']['input_volume'] and self.info['input']['card']:
logger.info("Set input volume on {}/{} to {}".format(
self.config['voice']['input_mixer'],
self.info['input']['card'],
self.config['voice']['input_volume']
))
alsaaudio.Mixer(self.config['voice']['input_mixer'], device=self.info['input']['card']).setvolume(
self.config['voice']['input_volume'])
if self.config['voice']['output_mixer'] and self.config['voice']['output_volume'] and self.info['output']['card']:
logger.info("Set output volume on {}/{} to {}".format(
self.config['voice']['output_mixer'],
self.info['output']['card'],
self.config['voice']['output_volume']
))
alsaaudio.Mixer(self.config['voice']['output_mixer'], device=self.info['output']['card']).setvolume(
self.config['voice']['output_volume'])
2019-01-15 21:40:44 +01:00
stream = self.p.open(
format=FORMAT,
channels=CHANNELS,
rate=self.input_rate,
input=True,
frames_per_buffer=CHUNK,
stream_callback=self.onBuffer,
input_device_index=self.info['input']['idx']
2019-02-28 18:58:03 +01:00
)
2019-01-15 21:40:44 +01:00
while not self.stopped:
try:
address = "tcp://*:{}".format(self.config['voice']['port'] + self.hugvey.id)
self.voice_socket = self.ctx.socket(zmq.PUB)
self.voice_socket.set_hwm(15)
self.voice_socket.bind(address)
2019-02-28 18:58:03 +01:00
logger.info(
"Waiting for voice connections on {}".format(address))
2019-01-15 21:40:44 +01:00
while not self.stopped:
await asyncio.sleep(1)
2019-01-15 21:40:44 +01:00
2019-02-28 18:58:03 +01:00
logger.info("Stop recording & streaming")
2019-01-15 21:40:44 +01:00
self.voice_socket.close()
# stop Recording
stream.stop_stream()
stream.close()
self.p.terminate()
except Exception as e:
logging.critical("Socket Exception {}".format(e))
self.voice_socket.close()
time.sleep(.5)
def stop(self):
self.stopped = True
2019-01-15 23:34:59 +01:00
async def asyncStart(self, loop):
future = loop.run_in_executor(None, self.start)
r = await future
2019-02-28 18:58:03 +01:00
2019-01-15 21:40:44 +01:00
class CommandHandler(object):
2019-05-14 18:33:37 +02:00
def __init__(self, hugvey_id, cmd_address, publish_address, file_address, play_audiodev = None, play_audiodriver=None, remote_ip='8.8.8.8'):
2019-01-15 21:40:44 +01:00
self.eventQueue = []
self.ctx = Context.instance()
self.hugvey_id = hugvey_id
2019-01-15 23:34:59 +01:00
self.cmd_address = cmd_address
self.publish_address = publish_address
self.muteMic = False
self.playPopen = None
self.file_address = file_address
self.playingMsgId = None
self.play_audiodev = play_audiodev
self.play_audiodriver = play_audiodriver
2019-05-14 18:33:37 +02:00
self.remote_ip = remote_ip
2019-01-16 09:00:49 +01:00
# self.showMyself() # queue message for connection request
2019-01-15 21:40:44 +01:00
def handle(self, cmd):
2019-11-13 11:19:21 +01:00
try:
print('handle', cmd)
# self.sendMessage({'reply':'test'})
if not 'action' in cmd:
logger.critical("Invalid command: {}".format(cmd))
return
logger.info("Received {}".format(cmd))
if cmd['action'] == 'show_yourself':
self.showMyself()
if cmd['action'] == 'prepare':
self.muteMic = True
if cmd['action'] == 'play':
self.cmdPlay(cmd)
if cmd['action'] == 'stop':
self.cmdStop(cmd['id'])
except Exception as e:
logger.critical("Exception during handling command: {}".format(cmd))
logger.exception(e)
2019-01-15 21:40:44 +01:00
def cmdPlay(self, cmd):
self.muteMic = True
2019-02-28 18:58:03 +01:00
msgId = cmd['id']
pitch = cmd['pitch'] if 'pitch' in cmd else 50
2019-11-13 11:19:21 +01:00
filepath = cmd['file'] if 'file' in cmd else None
text = cmd['msg'] if 'msg' in cmd else None
2019-02-28 18:58:03 +01:00
params = cmd['params'] if 'params' in cmd else {}
# use duration for timing the popen duration (and redo it if needed)
duration = cmd['duration'] if 'duration' in cmd else None
self.playingMsgId = msgId
if self.playPopen:
logger.info("Interrupting playback of {}".format(self.playingMsgId))
self.playPopen.kill()
2019-02-28 18:58:03 +01:00
2019-04-26 18:49:30 +02:00
err = None
2019-11-13 11:19:21 +01:00
if filepath is None and text is None:
logger.critical("No file nor text given: {}".format(cmd))
else:
2019-11-13 11:19:21 +01:00
if filepath is not None:
file = self.file_address + "/" + filepath
logger.debug("Fetch to play: {}".format(filepath))
start = time.time()
#: var response: http.client.HTTPResponse
response = urllib.request.urlopen(file, timeout=4)
fetchend = time.time()
logger.info("Fetched {} in {}s".format(file, fetchend-start))
if fetchend-start > 1:
logger.warning("Super slow fetching of {} in {}s".format(file, fetchend-start))
if response.getcode() != 200:
logger.critical("Error fetching: {} - {}".format(file, response))
else:
audioFile = io.BytesIO(response.read())
logger.info("Play: {}".format(filepath))
# logger.debug(['play', file])
playCmd = ['play', '-']
for param, value in params.items():
if not value:
continue
playCmd.append(param)
print(param, value)
if value is True:
continue
playCmd.append(str(value))
environment_vars = dict(os.environ)
if self.play_audiodriver is not None:
environment_vars['AUDIODRIVER'] = self.play_audiodriver
elif self.play_audiodev is not None:
environment_vars['AUDIODEV'] = self.play_audiodev
logger.debug(playCmd)
t = None
if duration is not None:
t = threading.Timer(duration+3, self.checkPopen, (msgId,duration+3))
t.start()
self.playPopen = subprocess.Popen(
playCmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=environment_vars)
self.sendMessage({
'event': 'playbackStart',
'msgId': msgId
})
out, err = self.playPopen.communicate(input=audioFile.getvalue())
playend = time.time()
returnCode = self.playPopen.returncode if self.playPopen else 0
logger.info('finished playing {} in {:.4f}s (duration: {}s)'.format(filepath, playend-fetchend, duration))
self.playPopen = None
if t is not None:
t.cancel()
2019-02-28 18:58:03 +01:00
else:
logger.info("Speak: {}".format(text))
playCmd = ['espeak', '-p', '{0}'.format(pitch), text]
2019-02-28 18:58:03 +01:00
self.playPopen = subprocess.Popen(
playCmd, stdout=subprocess.PIPE)
2019-04-16 12:51:06 +02:00
out, err = self.playPopen.communicate()
returnCode = self.playPopen.returncode
self.playPopen = None
2019-02-28 18:58:03 +01:00
if returnCode:
2019-04-16 12:51:06 +02:00
logger.critical("Had returncode {} on play: {}\n\n{}\n{}".format(returnCode, playCmd, out, err))
else:
logger.debug("Finished playback.")
2019-02-28 18:58:03 +01:00
self.playingMsgId = None
self.muteMic = False
2019-01-15 23:34:59 +01:00
self.sendMessage({
2019-01-15 21:40:44 +01:00
'event': 'playbackFinish',
'msgId': msgId
})
2019-11-12 16:13:57 +01:00
def checkPopen(self, msgId, duration):
if self.playingMsgId != msgId:
return
if self.playPopen is None:
return
# prevent a lock of the story, no repeat or anything for now
2019-11-12 16:13:57 +01:00
logger.critical("Interrupting playback after timeout of {}: {}".format(str(duration), self.playingMsgId))
self.playPopen.kill()
2019-02-28 18:58:03 +01:00
def cmdStop(self, msgId):
if self.playPopen and self.playingMsgId == msgId:
logger.info("Interrupting playback")
try:
self.playPopen.terminate()
except Exception as e:
logger.critical("Could not stop playback: {}".format(e))
else:
logger.warn("Interrupt ignored")
2019-01-15 21:40:44 +01:00
2019-01-16 09:00:49 +01:00
def showMyself(self):
"""Publish about this hugvey to central command
"""
self.sendMessage({
'event': 'connection',
'id': self.hugvey_id,
'host': socket.gethostname(),
'ip': self.getIp(),
})
2019-05-14 18:33:37 +02:00
def getIp(self):
2019-01-16 09:00:49 +01:00
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2019-04-27 11:51:11 +02:00
# TODO: make it a local ip, eg. 192.168.1.1
2019-05-14 18:33:37 +02:00
s.connect((self.remote_ip, 80))
ip = s.getsockname()[0]
s.close()
return ip
2019-01-16 09:00:49 +01:00
2019-01-15 21:40:44 +01:00
def sendMessage(self, msg):
self.eventQueue.append(msg)
async def command_listener(self):
s = self.ctx.socket(zmq.SUB)
s.set_hwm(15)
2019-01-15 23:34:59 +01:00
s.connect(self.cmd_address)
topic = getTopic(self.hugvey_id)
s.subscribe(topic)
2019-02-28 18:58:03 +01:00
logger.info("Subscribed to commands for {} on {}".format(
topic, self.cmd_address))
2019-01-15 21:40:44 +01:00
while True:
2019-01-15 23:34:59 +01:00
hugvey_id, cmd = await zmqReceive(s)
# print("GOGOG", hugvey_id, cmd)
t = threading.Thread(target=self.handle, args=(cmd,))
t.start()
2019-01-15 21:40:44 +01:00
# topic, msg = await s.recv_multipart()
2019-01-15 23:34:59 +01:00
# print('received', msg, time.time())
2019-01-15 21:40:44 +01:00
s.close()
2019-01-15 23:34:59 +01:00
async def event_sender(self):
2019-01-15 21:40:44 +01:00
s = self.ctx.socket(zmq.PUB)
s.set_hwm(15)
2019-01-16 09:00:49 +01:00
s.connect(self.publish_address)
2019-01-15 23:34:59 +01:00
logger.info("Publish on: {}".format(self.publish_address))
2019-01-16 09:00:49 +01:00
# For some reason, sending only one message is lost, perhaps due
# to connect() rather than bind() ??
2019-02-28 18:58:03 +01:00
await asyncio.sleep(1) # wait for connection to be proper set
2019-01-16 09:00:49 +01:00
self.showMyself()
2019-02-28 18:58:03 +01:00
2019-01-15 21:40:44 +01:00
while True:
for i in range(len(self.eventQueue)):
2019-01-16 09:00:49 +01:00
zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
2019-01-15 21:40:44 +01:00
if len(self.eventQueue) == 0:
await asyncio.sleep(0.05)
s.close()
2019-04-27 11:51:11 +02:00
async def heartbeat(self):
while True:
self.showMyself()
await asyncio.sleep(3)
2019-01-15 21:40:44 +01:00
2019-02-28 18:58:03 +01:00
2019-01-15 21:40:44 +01:00
class Hugvey(object):
"""The Hugvey client, to be ran on the Raspberry Pi's
"""
2019-02-28 18:58:03 +01:00
2019-03-07 21:34:50 +01:00
def __init__(self, id = None):
self.id = self.getId() if id is None else id
2019-04-10 11:13:42 +02:00
setLogger(self.id)
2019-01-15 21:40:44 +01:00
2019-01-15 23:34:59 +01:00
def getId(self) -> int:
"""Get Hugvey ID from hostname"""
2019-01-28 17:27:38 +01:00
try:
h = socket.gethostname()
2019-02-28 18:58:03 +01:00
id = int(re.findall('\d+', h)[0])
2019-01-28 17:27:38 +01:00
except Exception:
logger.critical("No automatic ID, fall back to 1")
id = 1
return id
2019-01-15 23:34:59 +01:00
2019-01-15 21:40:44 +01:00
def loadConfig(self, filename):
2019-01-15 23:34:59 +01:00
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
2019-01-15 21:40:44 +01:00
async def startCommandListener(self):
2019-01-15 21:40:44 +01:00
return await self.cmd_server.command_listener()
def start(self):
logger.debug('Hugvey {}, reporting'.format(self.id))
self.loop = asyncio.get_event_loop()
self.voice_server = VoiceServer(
loop=self.loop,
hugvey=self,
config=self.config
)
2019-05-14 18:33:37 +02:00
remote_ip = self.config['events']['remote_ip'] if 'remote_ip' in self.config['events'] else '8.8.8.8'
logger.debug("Using remote_ip for getIp: {}".format(remote_ip))
self.cmd_server = CommandHandler(
2019-02-28 18:58:03 +01:00
hugvey_id=self.id,
cmd_address=self.config['events']['cmd_address'],
publish_address=self.config['events']['publish_address'],
file_address=self.config['voice']['file_address'],
play_audiodev=self.voice_server.info['output']['device'],
play_audiodriver=self.config['voice']['output_driver'] if 'output_driver' in self.config['voice'] else None,
2019-05-14 18:33:37 +02:00
remote_ip=remote_ip,
2019-02-28 18:58:03 +01:00
)
2019-01-15 21:40:44 +01:00
logger.info('start')
2019-01-15 23:34:59 +01:00
# self.voice_server.asyncStart(loop)
# loop.run_until_complete(self.voice_server.start())
asyncio.ensure_future(self.catchException(self.voice_server.start()))
asyncio.ensure_future(self.catchException(self.cmd_server.command_listener()))
asyncio.ensure_future(self.catchException(self.cmd_server.event_sender()))
asyncio.ensure_future(self.catchException(self.cmd_server.heartbeat()))
self.loop.run_forever()
2019-01-15 21:40:44 +01:00
logger.info('done')
async def catchException(self, awaitable):
try:
await awaitable
except Exception as e:
logger.exception(e)
logger.critical("Hugvey quiting")
# self.loop.stop() # not fully quits program for reboot
exit()