301 lines
10 KiB
Python
301 lines
10 KiB
Python
import asyncio
|
|
import audioop
|
|
import logging
|
|
import pyaudio
|
|
import re
|
|
import socket
|
|
import threading
|
|
import time
|
|
import yaml
|
|
import zmq
|
|
from zmq.asyncio import Context
|
|
|
|
try:
|
|
import alsaaudio
|
|
except ImportError:
|
|
print("No volume settings available")
|
|
|
|
from .communication import zmqReceive, zmqSend, getTopic
|
|
import subprocess
|
|
|
|
|
|
logger = logging.getLogger("client")
|
|
|
|
class VoiceServer(object):
|
|
"""A UDP server, providing mic data at 16 kHz"""
|
|
def __init__(self, loop, hugvey, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000):
|
|
self.voice_port = voice_port
|
|
self.input_rate = input_rate
|
|
self.target_rate = target_rate
|
|
self.stopped = True
|
|
self.clients = []
|
|
self.laststate = None
|
|
self.input_name = input_name
|
|
self.ctx = Context.instance()
|
|
self.loop = loop
|
|
self.hugvey = hugvey
|
|
|
|
def get_input_idx(self):
|
|
input_device_idx = None
|
|
# input_device_idx = 6
|
|
# input_device_idx = 0
|
|
devices_count = self.p.get_device_count()
|
|
for i in range(devices_count):
|
|
dev = self.p.get_device_info_by_index(i)
|
|
if input_device_idx is None and dev['maxInputChannels'] > 0:
|
|
if (self.input_name and self.input_name in dev['name']) or \
|
|
(not self.input_name and dev['name'] != 'default'):
|
|
input_device_idx = dev['index']
|
|
logger.info("Use device {0}: {1}".format(dev['index'],dev['name']))
|
|
logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name']))
|
|
return input_device_idx
|
|
|
|
|
|
def onBuffer(self, in_data, frame_count, time_info, status):
|
|
if self.input_rate == self.target_rate:
|
|
f = in_data
|
|
else:
|
|
# chunk 4096, with 2 bytes per frame gives len(in_data) of 8192
|
|
# rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192)
|
|
f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
|
|
|
|
try:
|
|
if self.hugvey.cmd_server.playPopen is not None:
|
|
logger.debug('block recording {}' .format( self.hugvey.cmd_server.playPopen))
|
|
# multiply by 0 to disable audio recording while playback
|
|
f = audioop.mul(f, 2, 0)
|
|
|
|
self.loop.call_soon_threadsafe( self.voice_socket.send, f )
|
|
except Exception as e:
|
|
logger.warn("Error sending to {}".format(e))
|
|
pass
|
|
return (None, pyaudio.paContinue)
|
|
|
|
async def start(self):
|
|
FORMAT = pyaudio.paInt16
|
|
CHANNELS = 1
|
|
CHUNK = 4096
|
|
|
|
self.p = pyaudio.PyAudio()
|
|
self.stopped = False
|
|
|
|
stream = self.p.open(
|
|
format=FORMAT,
|
|
channels=CHANNELS,
|
|
rate=self.input_rate,
|
|
input=True,
|
|
frames_per_buffer=CHUNK,
|
|
stream_callback=self.onBuffer,
|
|
input_device_index=self.get_input_idx()
|
|
)
|
|
|
|
while not self.stopped:
|
|
try:
|
|
address = "tcp://*:{}".format(self.voice_port)
|
|
self.voice_socket = self.ctx.socket(zmq.PUB)
|
|
self.voice_socket.bind(address)
|
|
|
|
logger.info( "Waiting for voice connections on {}".format(address) )
|
|
while not self.stopped:
|
|
await asyncio.sleep(1)
|
|
|
|
logger.info( "Stop recording & streaming")
|
|
self.voice_socket.close()
|
|
# stop Recording
|
|
stream.stop_stream()
|
|
stream.close()
|
|
self.p.terminate()
|
|
except Exception as e:
|
|
logging.critical("Socket Exception {}".format(e))
|
|
self.voice_socket.close()
|
|
time.sleep(.5)
|
|
|
|
def stop(self):
|
|
self.stopped = True
|
|
|
|
async def asyncStart(self, loop):
|
|
future = loop.run_in_executor(None, self.start)
|
|
r = await future
|
|
|
|
class CommandHandler(object):
|
|
def __init__(self, hugvey_id, cmd_address, publish_address, file_address):
|
|
self.eventQueue = []
|
|
self.ctx = Context.instance()
|
|
self.hugvey_id = hugvey_id
|
|
self.cmd_address = cmd_address
|
|
self.publish_address = publish_address
|
|
self.playPopen = None
|
|
self.file_address = file_address
|
|
self.playingMsgId = None
|
|
# self.showMyself() # queue message for connection request
|
|
|
|
def handle(self, cmd):
|
|
print('handle', cmd)
|
|
# self.sendMessage({'reply':'test'})
|
|
if not 'action' in cmd:
|
|
logger.critical("Invalid command: {}".format(cmd))
|
|
return
|
|
|
|
logger.info("Received {}".format(cmd))
|
|
if cmd['action'] == 'show_yourself':
|
|
self.showMyself()
|
|
if cmd['action'] == 'play':
|
|
self.cmdPlay(cmd)
|
|
if cmd['action'] == 'stop':
|
|
self.cmdPlay(cmd, cmd['id'])
|
|
|
|
|
|
def cmdPlay(self, cmd):
|
|
msgId= cmd['id']
|
|
pitch = cmd['pitch'] if 'pitch' in cmd else 50
|
|
file = cmd['file'] if 'file' in cmd else None
|
|
text = cmd['msg'] if 'msg' in cmd else None
|
|
self.playingMsgId = msgId
|
|
|
|
if file is None and text is None:
|
|
logger.critical("No file nor text given: {}".format(cmd))
|
|
else:
|
|
if file is not None:
|
|
logger.info("Play: {}".format(file))
|
|
file = self.file_address + "/" + file
|
|
logger.debug(['play', file])
|
|
self.playPopen = subprocess.Popen(['play', file], stdout=subprocess.PIPE)
|
|
returnCode = self.playPopen.wait()
|
|
logger.debug('finished')
|
|
self.playPopen = None
|
|
else:
|
|
logger.info("Speak: {}".format(text))
|
|
self.playPopen = subprocess.Popen(['espeak', '-p','{0}'.format(pitch), text], stdout=subprocess.PIPE)
|
|
returnCode = self.playPopen.wait()
|
|
self.playPopen = None
|
|
|
|
if returnCode:
|
|
logger.warn("Had returncode on play: {}".format(returnCode))
|
|
else:
|
|
logger.debug("Finished playback. Return code: {}".format(returnCode))
|
|
|
|
self.playingMsgId = None
|
|
self.sendMessage({
|
|
'event': 'playbackFinish',
|
|
'msgId': msgId
|
|
})
|
|
|
|
def cmdStop(self, msgId):
|
|
if self.playPopen and self.playingMsgId == msgId:
|
|
logger.info("Interrupting playback")
|
|
try:
|
|
self.playPopen.terminate()
|
|
except Exception as e:
|
|
logger.critical("Could not stop playback: {}".format(e))
|
|
else:
|
|
logger.warn("Interrupt ignored")
|
|
|
|
def showMyself(self):
|
|
"""Publish about this hugvey to central command
|
|
"""
|
|
self.sendMessage({
|
|
'event': 'connection',
|
|
'id': self.hugvey_id,
|
|
'host': socket.gethostname(),
|
|
'ip': self.getIp(),
|
|
})
|
|
|
|
@staticmethod
|
|
def getIp():
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("185.66.250.60", 80))
|
|
return s.getsockname()[0]
|
|
|
|
|
|
def sendMessage(self, msg):
|
|
self.eventQueue.append(msg)
|
|
|
|
async def command_listener(self):
|
|
s = self.ctx.socket(zmq.SUB)
|
|
s.connect(self.cmd_address)
|
|
topic = getTopic(self.hugvey_id)
|
|
s.subscribe(topic)
|
|
logger.info("Subscribed to commands for {} on {}".format(topic, self.cmd_address))
|
|
while True:
|
|
hugvey_id, cmd = await zmqReceive(s)
|
|
# print("GOGOG", hugvey_id, cmd)
|
|
t = threading.Thread(target=self.handle, args=(cmd,))
|
|
t.start()
|
|
# topic, msg = await s.recv_multipart()
|
|
# print('received', msg, time.time())
|
|
s.close()
|
|
|
|
async def event_sender(self):
|
|
s = self.ctx.socket(zmq.PUB)
|
|
s.connect(self.publish_address)
|
|
logger.info("Publish on: {}".format(self.publish_address))
|
|
# For some reason, sending only one message is lost, perhaps due
|
|
# to connect() rather than bind() ??
|
|
|
|
await asyncio.sleep(1) # wait for connection to be proper set
|
|
|
|
self.showMyself()
|
|
|
|
while True:
|
|
for i in range(len(self.eventQueue)):
|
|
zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
|
|
if len(self.eventQueue) == 0:
|
|
await asyncio.sleep(0.05)
|
|
|
|
s.close()
|
|
|
|
class Hugvey(object):
|
|
"""The Hugvey client, to be ran on the Raspberry Pi's
|
|
"""
|
|
def __init__(self):
|
|
self.id = self.getId()
|
|
pass
|
|
|
|
def getId(self) -> int:
|
|
"""Get Hugvey ID from hostname"""
|
|
try:
|
|
h = socket.gethostname()
|
|
id = int(re.findall('\d+', h )[0])
|
|
except Exception:
|
|
logger.critical("No automatic ID, fall back to 1")
|
|
id = 1
|
|
|
|
return id
|
|
|
|
def loadConfig(self, filename):
|
|
with open(filename, 'r') as fp:
|
|
logger.debug('Load config from {}'.format(filename))
|
|
self.config = yaml.safe_load(fp)
|
|
|
|
async def startCommandListener(self):
|
|
return await self.cmd_server.command_listener()
|
|
|
|
def start(self):
|
|
loop = asyncio.get_event_loop()
|
|
|
|
if self.config['voice']['play_device'] and 'alsaaudio' in sys.modules:
|
|
alsaaudio.Mixer(self.config['voice']['play_device']).setvolume(self.config['voice']['play_volume'])
|
|
|
|
self.cmd_server = CommandHandler(
|
|
hugvey_id = self.id,
|
|
cmd_address = self.config['events']['cmd_address'],
|
|
publish_address = self.config['events']['publish_address'],
|
|
file_address = self.config['voice']['file_address']
|
|
)
|
|
self.voice_server = VoiceServer(
|
|
loop = loop,
|
|
hugvey = self,
|
|
voice_port = int(self.config['voice']['port']),
|
|
input_rate = int(self.config['voice']['input_rate']),
|
|
input_name = self.config['voice']['input_name'],
|
|
target_rate = int(self.config['voice']['target_rate']),
|
|
)
|
|
logger.info('start')
|
|
# self.voice_server.asyncStart(loop)
|
|
# loop.run_until_complete(self.voice_server.start())
|
|
asyncio.ensure_future(self.voice_server.start())
|
|
asyncio.ensure_future(self.cmd_server.command_listener())
|
|
asyncio.ensure_future(self.cmd_server.event_sender())
|
|
self.cmd_server.showMyself()
|
|
loop.run_forever()
|
|
logger.info('done')
|