""" "Conscript reporting" This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server. """ import asyncio import logging import yaml import zmq from hugvey import panopticon from hugvey.voice.streamer import AudioStreamer from hugvey.voice.player import Player from hugvey.communication import getTopic, zmqSend, zmqReceive from pandas.conftest import ip from zmq.asyncio import Context import threading logger = logging.getLogger("command") class CentralCommand(object): """docstring for CentralCommand.""" def __init__(self, debug_mode = False): self.debug = debug_mode self.eventQueue = asyncio.Queue() self.commandQueue = asyncio.Queue() self.isRunning = threading.Event() self.hugveys = {} self.ctx = Context.instance() def loadConfig(self, filename): with open(filename, 'r') as fp: logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) self.hugvey_ids = [i+1 for i in range(self.config['hugveys'])] def commandHugvey(self, hv_id, msg): self.commandQueue.put_nowait((hv_id, msg)) def commandAllHugveys(self, msg): for hv_id in self.hugvey_ids: self.commandHugvey(hv_id, msg) def commandAllActiveHugveys(self, msg): for hv_id in self.hugveys: self.commandHugvey(hv_id, msg) async def commandSender(self): s = self.ctx.socket(zmq.PUB) s.bind(self.config['events']['cmd_address']) self.commandAllHugveys({'action': 'show_yourself'}) # sleep to allow pending connections to connect await asyncio.sleep(1) logger.info("Ready to publish commands on: {}".format(self.config['events']['cmd_address'])) logger.debug('Already {} items in queue'.format(self.commandQueue.qsize())) while self.isRunning.is_set(): hv_id, cmd = await self.commandQueue.get() zmqSend(s, hv_id, cmd) s.close() def instantiateHugvey(self, hugvey_id, msg): ''' Start a HugveyState, according to a show_yourself reply 'event': 'connection', 'id': self.hugvey_id, 'host': socket.gethostname(), 'ip': self.getIp(), ''' # def startHugvey(): h = HugveyState(hugvey_id, self) h.config(msg['host'],msg['ip']) thread = threading.Thread(target=h.start) thread.start() # self.tasks['hv{}'.format(hugvey_id)] = self.loop.create_task(h.start()) async def eventListener(self): s = self.ctx.socket(zmq.SUB) s.bind(self.config['events']['listen_address']) logger.info("Listen for events on: {}".format(self.config['events']['listen_address'])) for id in self.hugvey_ids: s.subscribe(getTopic(id)) while self.isRunning.is_set(): hugvey_id, msg = await zmqReceive(s) if hugvey_id not in self.hugvey_ids: logger.critical("Message from alien Hugvey: {}".format(hugvey_id)) continue elif hugvey_id not in self.hugveys: if msg['event'] == 'connection': # Create a hugvey self.instantiateHugvey(hugvey_id, msg) else: logger.warning("Message from uninstantiated Hugvey {}".format(hugvey_id)) logger.debug("Message contains: {}".format(msg)) continue else: pass def start(self): self.isRunning.set() self.loop = asyncio.get_event_loop() self.tasks = {} # collect tasks so we can cancel in case of error self.tasks['eventListener'] = self.loop.create_task(self.eventListener()) self.tasks['commandSender'] = self.loop.create_task(self.commandSender()) # self.tasks['commandSender'] = self.loop.create_task(self.commandSender()) self.loop.run_forever() def stop(self): self.isRunning.clear() class HugveyState(object): """Represents the state of a Hugvey client on the server. Manages server connections & voice parsing etc. """ def __init__(self, id: int, command: CentralCommand): super(HugveyState, self).__init__() self.id = id self.command = command self.logger = logging.getLogger(f"hugvey{self.id}") self.loop = asyncio.new_event_loop() def config(self, hostname, ip): self.ip = ip self.hostname = hostname self.logger.info(f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") def start(self): # stop on isRunning.is_set() or wait() # self.loop.create_task(self.startAudio()) tasks = asyncio.gather(self.startAudio(), loop=self.loop) self.loop.run_until_complete(tasks) # asyncio.run_coroutine_threadsafe(self._start(), self.loop) async def startAudio(self): ''' Start the audio streamer service ''' self.logger.info("Start audio stream") streamer = AudioStreamer( self.command.config['voice']['chunk'], self.ip, int(self.command.config['voice']['port'])) if self.command.debug: self.logger.warn("Debug on: Connecting Audio player") player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) streamer.addConsumer(player) await streamer.run()