hugvey/hugvey/client.py

209 lines
7.1 KiB
Python

import pyaudio
import socket
import select
import audioop
import threading
import logging
import time
import zmq
import asyncio
from zmq.asyncio import Context
import yaml
import re
from .communication import zmqReceive, zmqSend, getTopic
logger = logging.getLogger("client")
class VoiceServer(object):
"""A UDP server, providing mic data at 16 kHz"""
def __init__(self, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000):
self.voice_port = voice_port
self.input_rate = input_rate
self.target_rate = target_rate
self.stopped = True
self.clients = []
self.laststate = None
self.input_name = input_name
def get_input_idx(self):
input_device_idx = None
# input_device_idx = 6
# input_device_idx = 0
devices_count = self.p.get_device_count()
for i in range(devices_count):
dev = self.p.get_device_info_by_index(i)
if input_device_idx is None and dev['maxInputChannels'] > 0:
if (self.input_name and self.input_name in dev['name']) or \
(not self.input_name and dev['name'] != 'default'):
input_device_idx = dev['index']
logger.info("Use device {0}: {1}".format(dev['index'],dev['name']))
logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name']))
return input_device_idx
def onBuffer(self, in_data, frame_count, time_info, status):
if self.input_rate == self.target_rate:
f = in_data
else:
f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
for s in self.clients:
try:
s.send(f)
except Exception as e:
self.clients.remove(s)
logger.warn("Error sending to {}".format(s.getsockname()))
pass
return (None, pyaudio.paContinue)
def start(self):
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK = 4096
self.p = pyaudio.PyAudio()
self.stopped = False
stream = self.p.open(
format=FORMAT,
channels=CHANNELS,
rate=self.input_rate,
input=True,
frames_per_buffer=CHUNK,
stream_callback=self.onBuffer,
input_device_index=self.get_input_idx()
)
while not self.stopped:
try:
self.voice_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.voice_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.voice_socket.bind(('', self.voice_port))
self.voice_socket.listen(5)
read_list = [self.voice_socket]
logger.info( "Waiting for connections")
while not self.stopped:
(clientsocket, address) = self.voice_socket.accept()
self.clients.append(clientsocket)
logger.info( "Stop recording & streaming")
self.voice_socket.close()
# stop Recording
stream.stop_stream()
stream.close()
self.p.terminate()
except Exception as e:
logging.critical("Socket Exception {}".format(e))
self.voice_socket.close()
time.sleep(.5)
def stop(self):
self.stopped = True
async def asyncStart(self, loop):
future = loop.run_in_executor(None, self.start)
r = await future
# await self.start()
class CommandHandler(object):
def __init__(self, hugvey_id, cmd_address = "tcp://127.0.0.1:5555", publish_address = "tcp://0.0.0.0:5555"):
self.eventQueue = []
self.ctx = Context.instance()
self.hugvey_id = hugvey_id
self.cmd_address = cmd_address
self.publish_address = publish_address
def handle(self, cmd):
# self.sendMessage({'reply':'test'})
if not 'action' in cmd:
logger.critical("Invalid command: {}".format(cmd))
return
logger.info("Received {}".format(cmd))
if cmd['action'] == 'play':
self.cmdPlay(cmd['id'], cmd['msg'])
def cmdPlay(self, msgId, msgText):
# espeak(msgText)
# TODO kill if playing & play wave file
# preferably a cat (local)/curl (remote) pipe into player
logger.info("Play: {}".format(msgText))
time.sleep(2)
self.sendMessage({
'event': 'playbackFinish',
'msgId': msgId
})
def sendMessage(self, msg):
self.eventQueue.append(msg)
async def command_listener(self):
s = self.ctx.socket(zmq.SUB)
s.connect(self.cmd_address)
topic = getTopic(self.hugvey_id)
s.subscribe(topic)
logger.info("Subscribed to commands for {} on {}".format(topic, self.cmd_address))
while True:
hugvey_id, cmd = await zmqReceive(s)
self.handle(cmd)
# topic, msg = await s.recv_multipart()
# print('received', msg, time.time())
s.close()
async def event_sender(self):
s = self.ctx.socket(zmq.PUB)
# TODO: see if we can connect() here. So we can PUSH(??) the ip
s.bind(self.publish_address)
logger.info("Publish on: {}".format(self.publish_address))
while True:
for i in range(len(self.eventQueue)):
await zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
if len(self.eventQueue) == 0:
await asyncio.sleep(0.05)
s.close()
class Hugvey(object):
"""The Hugvey client, to be ran on the Raspberry Pi's
"""
def __init__(self):
self.id = self.getId()
pass
def getId(self) -> int:
"""Get Hugvey ID from hostname"""
h = socket.gethostname()
return int(re.findall('\d+', h )[0])
def loadConfig(self, filename):
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
async def startCommandListener():
return await self.cmd_server.command_listener()
def start(self):
self.voice_server = VoiceServer(
voice_port = int(self.config['voice']['port']),
input_rate = int(self.config['voice']['input_rate']),
input_name = self.config['voice']['input_name'],
target_rate = int(self.config['voice']['target_rate']),
)
self.cmd_server = CommandHandler(
hugvey_id = self.id,
cmd_address = self.config['events']['cmd_address'],
publish_address = self.config['events']['publish_address'],
)
loop = asyncio.get_event_loop()
logger.info('start')
# self.voice_server.asyncStart(loop)
# loop.run_until_complete(self.voice_server.start())
asyncio.ensure_future(self.voice_server.asyncStart(loop))
asyncio.ensure_future(self.cmd_server.command_listener())
asyncio.ensure_future(self.cmd_server.event_sender())
loop.run_forever()
logger.info('done')