hugvey/hugvey/speech/streamer.py

62 lines
1.9 KiB
Python

"""
Consume a given Hugvey audio socket, and stream into the given services to emit events to the given server
"""
import socket
import logging
from zmq.asyncio import Context
import zmq
logger = logging.getLogger("streamer")
class AudioStreamer(object):
def __init__(self, chunk, address: str, port: int):
self.consumers = []
self.chunk = chunk
self.address = address
self.port = port
self.isRunning = False
def addConsumer(self, consumer):
self.consumers.append(consumer)
async def run(self):
self.isRunning = True
address = "tcp://{}:{}".format(self.address, self.port)
self.ctx = Context.instance()
self.socket = self.ctx.socket(zmq.SUB)
self.socket.setsockopt(zmq.RCVTIMEO, 4000) # timeout: 8 sec
self.socket.subscribe('')
# self.socket.setsockopt(zmq.CONFLATE, 1)
self.socket.connect(address)
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
logger.info("Attempt connection on {}:{}".format(self.address, self.port))
# s.connect((self.address, self.port))
#
try:
while self.isRunning:
data = await self.socket.recv()
# logger.debug('chunk received')
self.process(data)
except zmq.error.Again as timeout_e:
logger.warn("Timeout of audiostream. Hugvey shutdown?")
finally:
logger.info("Close socket on {}:{}".format(self.address, self.port))
self.socket.close()
def stop(self):
self.isRunning = False
for consumer in self.consumers:
consumer.shutdown()
def process(self, chunk):
# logger.debug("Received chunk")
for consumer in self.consumers:
consumer.receive(chunk)