hugvey/hugvey/client.py

261 lines
8.8 KiB
Python

import asyncio
import audioop
import logging
import pyaudio
import re
import socket
import threading
import time
import yaml
import zmq
from zmq.asyncio import Context
from .communication import zmqReceive, zmqSend, getTopic
import subprocess
logger = logging.getLogger("client")
class VoiceServer(object):
"""A UDP server, providing mic data at 16 kHz"""
def __init__(self, loop, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000):
self.voice_port = voice_port
self.input_rate = input_rate
self.target_rate = target_rate
self.stopped = True
self.clients = []
self.laststate = None
self.input_name = input_name
self.ctx = Context.instance()
self.loop = loop
def get_input_idx(self):
input_device_idx = None
# input_device_idx = 6
# input_device_idx = 0
devices_count = self.p.get_device_count()
for i in range(devices_count):
dev = self.p.get_device_info_by_index(i)
if input_device_idx is None and dev['maxInputChannels'] > 0:
if (self.input_name and self.input_name in dev['name']) or \
(not self.input_name and dev['name'] != 'default'):
input_device_idx = dev['index']
logger.info("Use device {0}: {1}".format(dev['index'],dev['name']))
logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name']))
return input_device_idx
def onBuffer(self, in_data, frame_count, time_info, status):
if self.input_rate == self.target_rate:
f = in_data
else:
# chunk 4096, with 2 bytes per frame gives len(in_data) of 8192
# rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192)
f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate)
# for s in self.clients:
try:
# self.loop.call_soon_threadsafe()
self.loop.call_soon_threadsafe( self.voice_socket.send, f )
# s.send(f)
except Exception as e:
# self.clients.remove(s)
logger.warn("Error sending to {}".format(e))
pass
return (None, pyaudio.paContinue)
async def start(self):
FORMAT = pyaudio.paInt16
CHANNELS = 1
CHUNK = 4096
self.p = pyaudio.PyAudio()
self.stopped = False
stream = self.p.open(
format=FORMAT,
channels=CHANNELS,
rate=self.input_rate,
input=True,
frames_per_buffer=CHUNK,
stream_callback=self.onBuffer,
input_device_index=self.get_input_idx()
)
while not self.stopped:
try:
address = "tcp://*:{}".format(self.voice_port)
self.voice_socket = self.ctx.socket(zmq.PUB)
self.voice_socket.bind(address)
logger.info( "Waiting for voice connections on {}".format(address) )
while not self.stopped:
await asyncio.sleep(1)
logger.info( "Stop recording & streaming")
self.voice_socket.close()
# stop Recording
stream.stop_stream()
stream.close()
self.p.terminate()
except Exception as e:
logging.critical("Socket Exception {}".format(e))
self.voice_socket.close()
time.sleep(.5)
def stop(self):
self.stopped = True
async def asyncStart(self, loop):
future = loop.run_in_executor(None, self.start)
r = await future
class CommandHandler(object):
def __init__(self, hugvey_id, cmd_address = "tcp://127.0.0.1:5555", publish_address = "tcp://0.0.0.0:5555"):
self.eventQueue = []
self.ctx = Context.instance()
self.hugvey_id = hugvey_id
self.cmd_address = cmd_address
self.publish_address = publish_address
self.playPopen = None
# self.showMyself() # queue message for connection request
def handle(self, cmd):
print('handle', cmd)
# self.sendMessage({'reply':'test'})
if not 'action' in cmd:
logger.critical("Invalid command: {}".format(cmd))
return
logger.info("Received {}".format(cmd))
if cmd['action'] == 'show_yourself':
self.showMyself()
if cmd['action'] == 'play':
self.cmdPlay(cmd['id'], cmd['msg'])
def cmdPlay(self, msgId, msgText, pitch=50):
logger.info("Play: {}".format(msgText))
self.playPopen = subprocess.Popen(['espeak', '-p','{0}'.format(pitch), msgText], stdout=subprocess.PIPE)
returnCode = self.playPopen.wait()
self.playPopen = None
if returnCode:
logger.warn("Had returncode on play: {}".format(returnCode))
else:
logger.debug("Finished playback. Return code: {}".format(returnCode))
self.sendMessage({
'event': 'playbackFinish',
'msgId': msgId
})
def cmdStop(self, msgId):
if self.playPopen:
logger.info("Interrupting playback")
try:
self.playPopen.terminate()
except Exception as e:
logger.critical("Could not stop playback: {}".format(e))
def showMyself(self):
"""Publish about this hugvey to central command
"""
self.sendMessage({
'event': 'connection',
'id': self.hugvey_id,
'host': socket.gethostname(),
'ip': self.getIp(),
})
@staticmethod
def getIp():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("185.66.250.60", 80))
return s.getsockname()[0]
def sendMessage(self, msg):
self.eventQueue.append(msg)
async def command_listener(self):
s = self.ctx.socket(zmq.SUB)
s.connect(self.cmd_address)
topic = getTopic(self.hugvey_id)
s.subscribe(topic)
logger.info("Subscribed to commands for {} on {}".format(topic, self.cmd_address))
while True:
hugvey_id, cmd = await zmqReceive(s)
# print("GOGOG", hugvey_id, cmd)
t = threading.Thread(target=self.handle, args=(cmd,))
t.start()
# topic, msg = await s.recv_multipart()
# print('received', msg, time.time())
s.close()
async def event_sender(self):
s = self.ctx.socket(zmq.PUB)
s.connect(self.publish_address)
logger.info("Publish on: {}".format(self.publish_address))
# For some reason, sending only one message is lost, perhaps due
# to connect() rather than bind() ??
await asyncio.sleep(1) # wait for connection to be proper set
self.showMyself()
while True:
for i in range(len(self.eventQueue)):
zmqSend(s, self.hugvey_id, self.eventQueue.pop(0))
if len(self.eventQueue) == 0:
await asyncio.sleep(0.05)
s.close()
class Hugvey(object):
"""The Hugvey client, to be ran on the Raspberry Pi's
"""
def __init__(self):
self.id = self.getId()
pass
def getId(self) -> int:
"""Get Hugvey ID from hostname"""
h = socket.gethostname()
return int(re.findall('\d+', h )[0])
def loadConfig(self, filename):
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
async def startCommandListener(self):
return await self.cmd_server.command_listener()
def start(self):
loop = asyncio.get_event_loop()
self.voice_server = VoiceServer(
loop = loop,
voice_port = int(self.config['voice']['port']),
input_rate = int(self.config['voice']['input_rate']),
input_name = self.config['voice']['input_name'],
target_rate = int(self.config['voice']['target_rate']),
)
self.cmd_server = CommandHandler(
hugvey_id = self.id,
cmd_address = self.config['events']['cmd_address'],
publish_address = self.config['events']['publish_address'],
)
logger.info('start')
# self.voice_server.asyncStart(loop)
# loop.run_until_complete(self.voice_server.start())
asyncio.ensure_future(self.voice_server.start())
asyncio.ensure_future(self.cmd_server.command_listener())
asyncio.ensure_future(self.cmd_server.event_sender())
self.cmd_server.showMyself()
loop.run_forever()
logger.info('done')