import asyncio import audioop import logging import pyaudio import re import socket import threading import time import yaml import alsaaudio import zmq from zmq.asyncio import Context from .communication import zmqReceive, zmqSend, getTopic import subprocess logger = logging.getLogger("client") class VoiceServer(object): """A UDP server, providing mic data at 16 kHz""" def __init__(self, loop, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000): self.voice_port = voice_port self.input_rate = input_rate self.target_rate = target_rate self.stopped = True self.clients = [] self.laststate = None self.input_name = input_name self.ctx = Context.instance() self.loop = loop def get_input_idx(self): input_device_idx = None # input_device_idx = 6 # input_device_idx = 0 devices_count = self.p.get_device_count() for i in range(devices_count): dev = self.p.get_device_info_by_index(i) if input_device_idx is None and dev['maxInputChannels'] > 0: if (self.input_name and self.input_name in dev['name']) or \ (not self.input_name and dev['name'] != 'default'): input_device_idx = dev['index'] logger.info("Use device {0}: {1}".format(dev['index'],dev['name'])) logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name'])) return input_device_idx def onBuffer(self, in_data, frame_count, time_info, status): if self.input_rate == self.target_rate: f = in_data else: # chunk 4096, with 2 bytes per frame gives len(in_data) of 8192 # rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192) f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate) # for s in self.clients: try: # self.loop.call_soon_threadsafe() self.loop.call_soon_threadsafe( self.voice_socket.send, f ) # s.send(f) except Exception as e: # self.clients.remove(s) logger.warn("Error sending to {}".format(e)) pass return (None, pyaudio.paContinue) async def start(self): FORMAT = pyaudio.paInt16 CHANNELS = 1 CHUNK = 4096 self.p = pyaudio.PyAudio() self.stopped = False stream = self.p.open( format=FORMAT, channels=CHANNELS, rate=self.input_rate, input=True, frames_per_buffer=CHUNK, stream_callback=self.onBuffer, input_device_index=self.get_input_idx() ) while not self.stopped: try: address = "tcp://*:{}".format(self.voice_port) self.voice_socket = self.ctx.socket(zmq.PUB) self.voice_socket.bind(address) logger.info( "Waiting for voice connections on {}".format(address) ) while not self.stopped: await asyncio.sleep(1) logger.info( "Stop recording & streaming") self.voice_socket.close() # stop Recording stream.stop_stream() stream.close() self.p.terminate() except Exception as e: logging.critical("Socket Exception {}".format(e)) self.voice_socket.close() time.sleep(.5) def stop(self): self.stopped = True async def asyncStart(self, loop): future = loop.run_in_executor(None, self.start) r = await future class CommandHandler(object): def __init__(self, hugvey_id, cmd_address, publish_address, file_address): self.eventQueue = [] self.ctx = Context.instance() self.hugvey_id = hugvey_id self.cmd_address = cmd_address self.publish_address = publish_address self.playPopen = None self.file_address = file_address # self.showMyself() # queue message for connection request def handle(self, cmd): print('handle', cmd) # self.sendMessage({'reply':'test'}) if not 'action' in cmd: logger.critical("Invalid command: {}".format(cmd)) return logger.info("Received {}".format(cmd)) if cmd['action'] == 'show_yourself': self.showMyself() if cmd['action'] == 'play': self.cmdPlay(cmd) def cmdPlay(self, cmd): msgId= cmd['id'] pitch = cmd['pitch'] if 'pitch' in cmd else 50 file = cmd['file'] if 'file' in cmd else None text = cmd['msg'] if 'msg' in cmd else None if file is None and text is None: logger.critical("No file nor text given: {}".format(cmd)) else: if file is not None: logger.info("Play: {}".format(file)) file = self.file_address + "/" + file self.playPopen = subprocess.Popen(['play', file], stdout=subprocess.PIPE) returnCode = self.playPopen.wait() self.playPopen = None else: logger.info("Speak: {}".format(text)) self.playPopen = subprocess.Popen(['espeak', '-p','{0}'.format(pitch), text], stdout=subprocess.PIPE) returnCode = self.playPopen.wait() self.playPopen = None if returnCode: logger.warn("Had returncode on play: {}".format(returnCode)) else: logger.debug("Finished playback. Return code: {}".format(returnCode)) self.sendMessage({ 'event': 'playbackFinish', 'msgId': msgId }) def cmdStop(self, msgId): if self.playPopen: logger.info("Interrupting playback") try: self.playPopen.terminate() except Exception as e: logger.critical("Could not stop playback: {}".format(e)) def showMyself(self): """Publish about this hugvey to central command """ self.sendMessage({ 'event': 'connection', 'id': self.hugvey_id, 'host': socket.gethostname(), 'ip': self.getIp(), }) @staticmethod def getIp(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("185.66.250.60", 80)) return s.getsockname()[0] def sendMessage(self, msg): self.eventQueue.append(msg) async def command_listener(self): s = self.ctx.socket(zmq.SUB) s.connect(self.cmd_address) topic = getTopic(self.hugvey_id) s.subscribe(topic) logger.info("Subscribed to commands for {} on {}".format(topic, self.cmd_address)) while True: hugvey_id, cmd = await zmqReceive(s) # print("GOGOG", hugvey_id, cmd) t = threading.Thread(target=self.handle, args=(cmd,)) t.start() # topic, msg = await s.recv_multipart() # print('received', msg, time.time()) s.close() async def event_sender(self): s = self.ctx.socket(zmq.PUB) s.connect(self.publish_address) logger.info("Publish on: {}".format(self.publish_address)) # For some reason, sending only one message is lost, perhaps due # to connect() rather than bind() ?? await asyncio.sleep(1) # wait for connection to be proper set self.showMyself() while True: for i in range(len(self.eventQueue)): zmqSend(s, self.hugvey_id, self.eventQueue.pop(0)) if len(self.eventQueue) == 0: await asyncio.sleep(0.05) s.close() class Hugvey(object): """The Hugvey client, to be ran on the Raspberry Pi's """ def __init__(self): self.id = self.getId() pass def getId(self) -> int: """Get Hugvey ID from hostname""" h = socket.gethostname() return int(re.findall('\d+', h )[0]) def loadConfig(self, filename): with open(filename, 'r') as fp: logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) async def startCommandListener(self): return await self.cmd_server.command_listener() def start(self): loop = asyncio.get_event_loop() if self.config['voice']['play_device']: alsaaudio.Mixer(self.config['voice']['play_device']).setvolume(self.config['voice']['play_volume']) self.voice_server = VoiceServer( loop = loop, voice_port = int(self.config['voice']['port']), input_rate = int(self.config['voice']['input_rate']), input_name = self.config['voice']['input_name'], target_rate = int(self.config['voice']['target_rate']), ) self.cmd_server = CommandHandler( hugvey_id = self.id, cmd_address = self.config['events']['cmd_address'], publish_address = self.config['events']['publish_address'], file_address = self.config['voice']['file_address'] ) logger.info('start') # self.voice_server.asyncStart(loop) # loop.run_until_complete(self.voice_server.start()) asyncio.ensure_future(self.voice_server.start()) asyncio.ensure_future(self.cmd_server.command_listener()) asyncio.ensure_future(self.cmd_server.event_sender()) self.cmd_server.showMyself() loop.run_forever() logger.info('done')