From e8bfa8a6da6de8d3c06b26c6936bb94c50fd3b76 Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Thu, 17 Jan 2019 21:23:05 +0100 Subject: [PATCH] Use PUB/Sub for voice sending (auto socket recovery_ --- hugvey/client.py | 53 +++++++++++++++++++++++++--------------- hugvey/voice/streamer.py | 19 ++++++++++---- 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/hugvey/client.py b/hugvey/client.py index dd54c83..da3a0a3 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -14,7 +14,7 @@ logger = logging.getLogger("client") class VoiceServer(object): """A UDP server, providing mic data at 16 kHz""" - def __init__(self, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000): + def __init__(self, loop, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000): self.voice_port = voice_port self.input_rate = input_rate self.target_rate = target_rate @@ -22,6 +22,8 @@ class VoiceServer(object): self.clients = [] self.laststate = None self.input_name = input_name + self.ctx = Context.instance() + self.loop = loop def get_input_idx(self): input_device_idx = None @@ -46,17 +48,20 @@ class VoiceServer(object): # chunk 4096, with 2 bytes per frame gives len(in_data) of 8192 # rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192) f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate) - - for s in self.clients: - try: - s.send(f) - except Exception as e: - self.clients.remove(s) - logger.warn("Error sending to {}, {}".format(s.getsockname(), e)) - pass + + +# for s in self.clients: + try: +# self.loop.call_soon_threadsafe() + self.loop.call_soon_threadsafe( self.voice_socket.send, f ) +# s.send(f) + except Exception as e: +# self.clients.remove(s) + logger.warn("Error sending to {}".format(e)) + pass return (None, pyaudio.paContinue) - def start(self): + async def start(self): FORMAT = pyaudio.paInt16 CHANNELS = 1 CHUNK = 4096 @@ -76,18 +81,24 @@ class VoiceServer(object): while not self.stopped: try: - self.voice_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.voice_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - address = ('', self.voice_port) + address = "tcp://*:{}".format(self.voice_port) + self.voice_socket = self.ctx.socket(zmq.PUB) +# self.voice_socket.setsockopt(zmq.CONFLATE, 1) self.voice_socket.bind(address) - self.voice_socket.listen(5) + +# self.voice_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# self.voice_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +# address = ('', self.voice_port) +# self.voice_socket.bind(address) +# self.voice_socket.listen(5) - read_list = [self.voice_socket] +# read_list = [self.voice_socket] logger.info( "Waiting for voice connections on {}".format(address) ) while not self.stopped: - (clientsocket, address) = self.voice_socket.accept() - logger.info( "Got voice connection from {}".format(address)) - self.clients.append(clientsocket) + await asyncio.sleep(1) +# (clientsocket, address) = self.voice_socket.accept() +# logger.info( "Got voice connection from {}".format(address)) +# self.clients.append(clientsocket) logger.info( "Stop recording & streaming") self.voice_socket.close() @@ -214,7 +225,10 @@ class Hugvey(object): return await self.cmd_server.command_listener() def start(self): + loop = asyncio.get_event_loop() + self.voice_server = VoiceServer( + loop = loop, voice_port = int(self.config['voice']['port']), input_rate = int(self.config['voice']['input_rate']), input_name = self.config['voice']['input_name'], @@ -225,11 +239,10 @@ class Hugvey(object): cmd_address = self.config['events']['cmd_address'], publish_address = self.config['events']['publish_address'], ) - loop = asyncio.get_event_loop() logger.info('start') # self.voice_server.asyncStart(loop) # loop.run_until_complete(self.voice_server.start()) - asyncio.ensure_future(self.voice_server.asyncStart(loop)) + asyncio.ensure_future(self.voice_server.start()) asyncio.ensure_future(self.cmd_server.command_listener()) asyncio.ensure_future(self.cmd_server.event_sender()) self.cmd_server.showMyself() diff --git a/hugvey/voice/streamer.py b/hugvey/voice/streamer.py index ea2a159..570fd6f 100644 --- a/hugvey/voice/streamer.py +++ b/hugvey/voice/streamer.py @@ -3,6 +3,8 @@ Consume a given Hugvey audio socket, and stream into the given services to emit """ import socket import logging +from zmq.asyncio import Context +import zmq logger = logging.getLogger("streamer") @@ -22,17 +24,24 @@ class AudioStreamer(object): async def run(self): self.isRunning = True - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + address = "tcp://{}:{}".format(self.address, self.port) + self.ctx = Context.instance() + self.socket = self.ctx.socket(zmq.SUB) + self.socket.subscribe('') +# self.socket.setsockopt(zmq.CONFLATE, 1) + self.socket.connect(address) + +# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) logger.info("Attempt connection on {}:{}".format(self.address, self.port)) - s.connect((self.address, self.port)) - +# s.connect((self.address, self.port)) +# while self.isRunning: - data = s.recv(self.chunk) + data = await self.socket.recv() # logger.debug('chunk received') self.process(data) logger.info("Close socket on {}:{}".format(self.address, self.port)) - s.close() + self.socket.close() def stop(self): self.isRunning = False