""" Consume a given Hugvey audio socket, and stream into the given services to emit events to the given server """ import socket import logging from zmq.asyncio import Context import zmq mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("streamer") class AudioStreamer(object): def __init__(self, chunk, address: str, port: int, hv_id: int): self.logger = mainLogger.getChild(f"{hv_id}").getChild('streamer') self.consumers = [] self.chunk = chunk self.address = address self.port = port self.isRunning = False def addConsumer(self, consumer): self.consumers.append(consumer) async def run(self): self.isRunning = True address = "tcp://{}:{}".format(self.address, self.port) self.ctx = Context.instance() self.socket = self.ctx.socket(zmq.SUB) self.socket.setsockopt(zmq.RCVTIMEO, 4000) # timeout: 8 sec self.socket.subscribe('') # self.socket.setsockopt(zmq.CONFLATE, 1) self.socket.connect(address) # s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.logger.info("Attempt connection on {}:{}".format(self.address, self.port)) # s.connect((self.address, self.port)) # try: while self.isRunning: data = await self.socket.recv() # self.logger.debug('chunk received') self.process(data) except zmq.error.Again as timeout_e: self.logger.warn("Timeout of audiostream. Hugvey shutdown?") finally: self.logger.info("Close socket on {}:{}".format(self.address, self.port)) self.socket.close() def stop(self): self.isRunning = False for consumer in self.consumers: consumer.shutdown() def process(self, chunk): # self.logger.debug("Received chunk") for consumer in self.consumers: consumer.receive(chunk) def triggerStart(self): # start a (new) run on the hugvey. Send it to the consumers that need it for consumer in self.consumers: consumer.triggerStart()