hugvey/hugvey/client.py

179 lines
5.8 KiB
Python

import pyaudio
import socket
import select
import audioop
import threading
import logging
import time
import zmq
import asyncio
from zmq.asyncio import Context
from .communication import zmqReceive, zmqSend, getTopic
logger = logging.getLogger("client")
class VoiceServer(object):
"""A UDP server, providing mic data at 16 kHz"""
def __init__(self, voice_port, input_rate, input_name = None, target_rate = 16000):
self.voice_port = voice_port
self.input_rate = input_rate
self.target_rate = target_rate
self.stopped = True
self.clients = []
self.laststate = None
self.input_name = input_name
def get_input_idx(self):
input_device_idx = None
# input_device_idx = 6
# input_device_idx = 0
devices_count = self.p.get_device_count()
for i in range(devices_count):
dev = self.p.get_device_info_by_index(i)
if input_device_idx is None and dev['maxInputChannels'] > 0:
if (self.input_name and self.input_name in dev['name']) or \
(not self.input_name and dev['name'] != 'default'):
input_device_idx = dev['index']
logger.info("Use device {0}: {1}".format(dev['index'],dev['name']))
logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name']))
return input_device_idx
def onBuffer(self, in_data, frame_count, time_info, status):
if self.input_rate == self.target_rate:
f = in_data
else:
f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
for s in self.clients:
try:
s.send(f)
except Exception as e:
self.clients.remove(s)
logger.warn("Error sending to {}".format(s.getsockname()))
pass
return (None, pyaudio.paContinue)
async def start(self):
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK = 4096
self.p = pyaudio.PyAudio()
self.stopped = False
stream = self.p.open(
format=FORMAT,
channels=CHANNELS,
rate=self.input_rate,
input=True,
frames_per_buffer=CHUNK,
stream_callback=self.onBuffer,
input_device_index=self.get_input_idx()
)
while not self.stopped:
try:
self.voice_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.voice_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.voice_socket.bind(('', self.voice_port))
self.voice_socket.listen(5)
read_list = [self.voice_socket]
logger.info( "Waiting for connections")
while not self.stopped:
(clientsocket, address) = self.voice_socket.accept()
self.clients.append(clientsocket)
logger.info( "Stop recording & streaming")
self.voice_socket.close()
# stop Recording
stream.stop_stream()
stream.close()
self.p.terminate()
except Exception as e:
logging.critical("Socket Exception {}".format(e))
self.voice_socket.close()
time.sleep(.5)
def stop(self):
self.stopped = True
class CommandHandler(object):
def __init__(self, hugvey_id, event_address = "tcp://127.0.0.1:5555"):
self.eventQueue = []
self.ctx = Context.instance()
self.hugvey_id = hugvey_id
self.event_address = event_address
def handle(self, cmd):
if not 'action' in cmd:
logger.critical("Invalid command: {}".format(cmd))
return
logger.info("Received {}".format(cmd))
if cmd['action'] == 'play':
self.cmdPlay(cmd['id'], cmd['msg'])
def cmdPlay(self, msgId, msgText):
# espeak(msgText)
logger.inof("Play: {}".format(msgText))
time.sleep(2)
sendMessage({
'event': 'playbackFinish',
'msgId': msgId
})
def sendMessage(self, msg):
self.eventQueue.append(msg)
async def command_listener(self):
s = self.ctx.socket(zmq.SUB)
s.connect(self.event_address)
queueName = 'hv{}'.format(self.hugvey_id)
s.subscribe(queueName)
logger.info("Subscribed to commands on {}".format(queueName))
while True:
hugvey_id, msg = await zmqReceive(s)
# topic, msg = await s.recv_multipart()
print('received', msg)
s.close()
async def event_sender(port):
s = self.ctx.socket(zmq.PUB)
s.connect(self.event_send_address)
topic = getTopic(self.hugvey_id)
s.subscribe(topic)
logger.info("Subscribed to commands on {}".format(topic))
while True:
for i in range(len(self.eventQueue)):
hugvey_id, msg = await zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
if len(self.eventQueue) == 0:
await asyncio.sleep(0.05)
s.close()
class Hugvey(object):
"""The Hugvey client, to be ran on the Raspberry Pi's
"""
def __init__(self):
pass
def loadConfig(self, filename):
# filename
pass
async def startCommandListener():
return await self.cmd_server.command_listener()
def start(self):
self.voice_server = VoiceServer(4444, 44100)
self.cmd_server = CommandHandler(1)
loop = asyncio.get_event_loop()
logger.info('start')
loop.run_until_complete(self.voice_server.start())
loop.run_until_complete(self.cmd_server.command_listener())
loop.run_until_complete(self.cmd_server.command_listener())
logger.info('done')