74 lines
No EOL
2.3 KiB
Python
74 lines
No EOL
2.3 KiB
Python
"""
|
|
Consume a given Hugvey audio socket, and stream into the given services to emit events to the given server
|
|
"""
|
|
import socket
|
|
import logging
|
|
from zmq.asyncio import Context
|
|
import zmq
|
|
|
|
mainLogger = logging.getLogger("hugvey")
|
|
logger = mainLogger.getChild("streamer")
|
|
|
|
class AudioStreamer(object):
|
|
def __init__(self, chunk, address: str, port: int, hv_id: int):
|
|
self.logger = mainLogger.getChild(f"{hv_id}").getChild('streamer')
|
|
|
|
self.consumers = []
|
|
|
|
self.chunk = chunk
|
|
self.address = address
|
|
self.port = port
|
|
|
|
self.isRunning = False
|
|
|
|
def addConsumer(self, consumer):
|
|
self.consumers.append(consumer)
|
|
|
|
async def run(self):
|
|
self.isRunning = True
|
|
|
|
address = "tcp://{}:{}".format(self.address, self.port)
|
|
self.ctx = Context.instance()
|
|
self.socket = self.ctx.socket(zmq.SUB)
|
|
self.socket.setsockopt(zmq.RCVTIMEO, 8000) # timeout: 8 sec
|
|
self.socket.subscribe('')
|
|
# self.socket.setsockopt(zmq.CONFLATE, 1)
|
|
self.socket.connect(address)
|
|
|
|
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.logger.info("0mq SUBSCRIBE for audio stream on {}:{}".format(self.address, self.port))
|
|
# s.connect((self.address, self.port))
|
|
#
|
|
while self.isRunning:
|
|
try:
|
|
data = await self.socket.recv()
|
|
# self.logger.debug('chunk received')
|
|
self.process(data)
|
|
except zmq.error.Again as timeout_e:
|
|
self.logger.warn("Timeout of audiostream. Hugvey shutdown?")
|
|
|
|
self.logger.info("Close socket on {}:{}".format(self.address, self.port))
|
|
self.socket.close()
|
|
|
|
def stop(self):
|
|
self.isRunning = False
|
|
|
|
for consumer in self.consumers:
|
|
consumer.shutdown()
|
|
|
|
self.consumers = []
|
|
|
|
|
|
def process(self, chunk):
|
|
# self.logger.debug("Received chunk")
|
|
for consumer in self.consumers:
|
|
consumer.receive(chunk)
|
|
|
|
def triggerStart(self):
|
|
# start a (new) run on the hugvey. Send it to the consumers that need it
|
|
for consumer in self.consumers:
|
|
consumer.triggerStart()
|
|
|
|
|
|
def __del__(self):
|
|
self.logger.warn("Destroyed streamer object") |