import pyaudio import socket import select import audioop import threading import logging import time import zmq import asyncio from zmq.asyncio import Context import yaml import re from .communication import zmqReceive, zmqSend, getTopic logger = logging.getLogger("client") class VoiceServer(object): """A UDP server, providing mic data at 16 kHz""" def __init__(self, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000): self.voice_port = voice_port self.input_rate = input_rate self.target_rate = target_rate self.stopped = True self.clients = [] self.laststate = None self.input_name = input_name def get_input_idx(self): input_device_idx = None # input_device_idx = 6 # input_device_idx = 0 devices_count = self.p.get_device_count() for i in range(devices_count): dev = self.p.get_device_info_by_index(i) if input_device_idx is None and dev['maxInputChannels'] > 0: if (self.input_name and self.input_name in dev['name']) or \ (not self.input_name and dev['name'] != 'default'): input_device_idx = dev['index'] logger.info("Use device {0}: {1}".format(dev['index'],dev['name'])) logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name'])) return input_device_idx def onBuffer(self, in_data, frame_count, time_info, status): if self.input_rate == self.target_rate: f = in_data else: f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate) for s in self.clients: try: s.send(f) except Exception as e: self.clients.remove(s) logger.warn("Error sending to {}".format(s.getsockname())) pass return (None, pyaudio.paContinue) def start(self): FORMAT = pyaudio.paInt16 CHANNELS = 1 CHUNK = 4096 self.p = pyaudio.PyAudio() self.stopped = False stream = self.p.open( format=FORMAT, channels=CHANNELS, rate=self.input_rate, input=True, frames_per_buffer=CHUNK, stream_callback=self.onBuffer, input_device_index=self.get_input_idx() ) while not self.stopped: try: self.voice_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.voice_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.voice_socket.bind(('', self.voice_port)) self.voice_socket.listen(5) read_list = [self.voice_socket] logger.info( "Waiting for connections") while not self.stopped: (clientsocket, address) = self.voice_socket.accept() self.clients.append(clientsocket) logger.info( "Stop recording & streaming") self.voice_socket.close() # stop Recording stream.stop_stream() stream.close() self.p.terminate() except Exception as e: logging.critical("Socket Exception {}".format(e)) self.voice_socket.close() time.sleep(.5) def stop(self): self.stopped = True async def asyncStart(self, loop): future = loop.run_in_executor(None, self.start) r = await future # await self.start() class CommandHandler(object): def __init__(self, hugvey_id, cmd_address = "tcp://127.0.0.1:5555", publish_address = "tcp://0.0.0.0:5555"): self.eventQueue = [] self.ctx = Context.instance() self.hugvey_id = hugvey_id self.cmd_address = cmd_address self.publish_address = publish_address # self.showMyself() # queue message for connection request def handle(self, cmd): # self.sendMessage({'reply':'test'}) if not 'action' in cmd: logger.critical("Invalid command: {}".format(cmd)) return logger.info("Received {}".format(cmd)) if cmd['action'] == 'show_yourself': self.showMyself() if cmd['action'] == 'play': self.cmdPlay(cmd['id'], cmd['msg']) def cmdPlay(self, msgId, msgText): # espeak(msgText) # TODO kill if playing & play wave file # preferably a cat (local)/curl (remote) pipe into player logger.info("Play: {}".format(msgText)) time.sleep(2) self.sendMessage({ 'event': 'playbackFinish', 'msgId': msgId }) def showMyself(self): """Publish about this hugvey to central command """ self.sendMessage({ 'event': 'connection', 'id': self.hugvey_id, 'host': socket.gethostname(), 'ip': self.getIp(), }) @staticmethod def getIp(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("185.66.250.60", 80)) return s.getsockname()[0] def sendMessage(self, msg): self.eventQueue.append(msg) async def command_listener(self): s = self.ctx.socket(zmq.SUB) s.connect(self.cmd_address) topic = getTopic(self.hugvey_id) s.subscribe(topic) logger.info("Subscribed to commands for {} on {}".format(topic, self.cmd_address)) while True: hugvey_id, cmd = await zmqReceive(s) self.handle(cmd) # topic, msg = await s.recv_multipart() # print('received', msg, time.time()) s.close() async def event_sender(self): s = self.ctx.socket(zmq.PUB) s.connect(self.publish_address) logger.info("Publish on: {}".format(self.publish_address)) # For some reason, sending only one message is lost, perhaps due # to connect() rather than bind() ?? self.showMyself() while True: for i in range(len(self.eventQueue)): zmqSend(s, self.hugvey_id, self.eventQueue.pop(0)) if len(self.eventQueue) == 0: await asyncio.sleep(0.05) s.close() class Hugvey(object): """The Hugvey client, to be ran on the Raspberry Pi's """ def __init__(self): self.id = self.getId() pass def getId(self) -> int: """Get Hugvey ID from hostname""" h = socket.gethostname() return int(re.findall('\d+', h )[0]) def loadConfig(self, filename): with open(filename, 'r') as fp: logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) async def startCommandListener(): return await self.cmd_server.command_listener() def start(self): self.voice_server = VoiceServer( voice_port = int(self.config['voice']['port']), input_rate = int(self.config['voice']['input_rate']), input_name = self.config['voice']['input_name'], target_rate = int(self.config['voice']['target_rate']), ) self.cmd_server = CommandHandler( hugvey_id = self.id, cmd_address = self.config['events']['cmd_address'], publish_address = self.config['events']['publish_address'], ) loop = asyncio.get_event_loop() logger.info('start') # self.voice_server.asyncStart(loop) # loop.run_until_complete(self.voice_server.start()) asyncio.ensure_future(self.voice_server.asyncStart(loop)) asyncio.ensure_future(self.cmd_server.command_listener()) asyncio.ensure_future(self.cmd_server.event_sender()) self.cmd_server.showMyself() loop.run_forever() logger.info('done')