192 lines
6.7 KiB
Python
192 lines
6.7 KiB
Python
"""
|
|
"Conscript reporting"
|
|
This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server.
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
import logging
|
|
from pandas.conftest import ip
|
|
import threading
|
|
import yaml
|
|
import zmq
|
|
from zmq.asyncio import Context
|
|
|
|
from hugvey import panopticon
|
|
from hugvey.communication import getTopic, zmqSend, zmqReceive
|
|
from hugvey.voice.google import GoogleVoiceClient
|
|
from hugvey.voice.player import Player
|
|
from hugvey.voice.streamer import AudioStreamer
|
|
import uuid
|
|
|
|
|
|
logger = logging.getLogger("command")
|
|
|
|
class CentralCommand(object):
|
|
"""docstring for CentralCommand."""
|
|
def __init__(self, debug_mode = False):
|
|
self.debug = debug_mode
|
|
self.eventQueue = asyncio.Queue()
|
|
self.commandQueue = asyncio.Queue()
|
|
self.isRunning = threading.Event()
|
|
self.hugveys = {}
|
|
self.ctx = Context.instance()
|
|
self.hugveyLock = asyncio.Lock()
|
|
|
|
|
|
def loadConfig(self, filename):
|
|
with open(filename, 'r') as fp:
|
|
logger.debug('Load config from {}'.format(filename))
|
|
self.config = yaml.safe_load(fp)
|
|
|
|
|
|
|
|
self.hugvey_ids = [i+1 for i in range(self.config['hugveys'])]
|
|
|
|
def commandHugvey(self, hv_id, msg):
|
|
self.commandQueue.put_nowait((hv_id, msg))
|
|
|
|
def commandAllHugveys(self, msg):
|
|
for hv_id in self.hugvey_ids:
|
|
self.commandHugvey(hv_id, msg)
|
|
|
|
def commandAllActiveHugveys(self, msg):
|
|
for hv_id in self.hugveys:
|
|
self.commandHugvey(hv_id, msg)
|
|
|
|
async def commandSender(self):
|
|
s = self.ctx.socket(zmq.PUB)
|
|
s.bind(self.config['events']['cmd_address'])
|
|
|
|
|
|
self.commandAllHugveys({'action': 'show_yourself'})
|
|
|
|
# sleep to allow pending connections to connect
|
|
await asyncio.sleep(1)
|
|
logger.info("Ready to publish commands on: {}".format(self.config['events']['cmd_address']))
|
|
logger.debug('Already {} items in queue'.format(self.commandQueue.qsize()))
|
|
|
|
while self.isRunning.is_set():
|
|
hv_id, cmd = await self.commandQueue.get()
|
|
zmqSend(s, hv_id, cmd)
|
|
|
|
s.close()
|
|
|
|
async def instantiateHugvey(self, hugvey_id, msg):
|
|
'''
|
|
Start a HugveyState, according to a show_yourself reply
|
|
|
|
'event': 'connection',
|
|
'id': self.hugvey_id,
|
|
'host': socket.gethostname(),
|
|
'ip': self.getIp(),
|
|
'''
|
|
async with self.hugveyLock: # lock to prevent duplicates on creation
|
|
if not hugvey_id in self.hugveys:
|
|
logger.info(f'Instantiate hugvey #{hugvey_id}')
|
|
h = HugveyState(hugvey_id, self)
|
|
h.config(msg['host'],msg['ip'])
|
|
self.hugveys[hugvey_id] = h
|
|
thread = threading.Thread(target=h.start, name=f"hugvey#{hugvey_id}")
|
|
thread.start()
|
|
else:
|
|
logger.info(f'Reconfigure hugvey #{hugvey_id}')
|
|
# (re)configure exisitng hugveys
|
|
h.config(msg['host'],msg['ip'])
|
|
|
|
|
|
|
|
async def eventListener(self):
|
|
s = self.ctx.socket(zmq.SUB)
|
|
s.bind(self.config['events']['listen_address'])
|
|
logger.info("Listen for events on: {}".format(self.config['events']['listen_address']))
|
|
|
|
for id in self.hugvey_ids:
|
|
s.subscribe(getTopic(id))
|
|
|
|
while self.isRunning.is_set():
|
|
hugvey_id, msg = await zmqReceive(s)
|
|
|
|
if hugvey_id not in self.hugvey_ids:
|
|
logger.critical("Message from alien Hugvey: {}".format(hugvey_id))
|
|
continue
|
|
elif hugvey_id not in self.hugveys:
|
|
if msg['event'] == 'connection':
|
|
# Create a hugvey
|
|
await self.instantiateHugvey(hugvey_id, msg)
|
|
else:
|
|
logger.warning("Message from uninstantiated Hugvey {}".format(hugvey_id))
|
|
logger.debug("Message contains: {}".format(msg))
|
|
continue
|
|
else:
|
|
pass
|
|
|
|
def start(self):
|
|
self.isRunning.set()
|
|
self.loop = asyncio.get_event_loop()
|
|
self.tasks = {} # collect tasks so we can cancel in case of error
|
|
self.tasks['eventListener'] = self.loop.create_task(self.eventListener())
|
|
self.tasks['commandSender'] = self.loop.create_task(self.commandSender())
|
|
# self.tasks['commandSender'] = self.loop.create_task(self.commandSender())
|
|
self.loop.run_forever()
|
|
|
|
def stop(self):
|
|
self.isRunning.clear()
|
|
|
|
|
|
|
|
class HugveyState(object):
|
|
"""Represents the state of a Hugvey client on the server.
|
|
Manages server connections & voice parsing etc.
|
|
"""
|
|
def __init__(self, id: int, command: CentralCommand):
|
|
super(HugveyState, self).__init__()
|
|
self.id = id
|
|
self.command = command
|
|
self.logger = logging.getLogger(f"hugvey{self.id}")
|
|
self.loop = asyncio.new_event_loop()
|
|
self.isConfigured = False
|
|
|
|
def config(self, hostname, ip):
|
|
self.ip = ip
|
|
self.hostname = hostname
|
|
self.logger.info(f"Hugvey {self.id} at {self.ip}, host: {self.hostname}")
|
|
|
|
if self.isConfigured == True:
|
|
# a reconfiguration/reconnection
|
|
pass
|
|
|
|
self.isConfigured = True
|
|
|
|
def start(self):
|
|
# stop on isRunning.is_set() or wait()
|
|
# self.loop.create_task(self.startAudioProcessing())
|
|
tasks = asyncio.gather(self.startAudioProcessing(), loop=self.loop)
|
|
self.loop.run_until_complete(tasks)
|
|
# asyncio.run_coroutine_threadsafe(self._start(), self.loop)
|
|
|
|
async def startAudioProcessing(self):
|
|
'''
|
|
Start the audio streamer service
|
|
'''
|
|
self.logger.info("Start audio stream")
|
|
streamer = AudioStreamer(
|
|
self.command.config['voice']['chunk'],
|
|
self.ip,
|
|
int(self.command.config['voice']['port']))
|
|
|
|
if self.command.debug:
|
|
self.logger.warn("Debug on: Connecting Audio player")
|
|
player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
|
|
streamer.addConsumer(player)
|
|
|
|
self.logger.info("Start Speech")
|
|
google = GoogleVoiceClient(
|
|
hugvey_id=self.id,
|
|
src_rate=self.command.config['voice']['src_rate'],
|
|
credential_file=self.command.config['voice']['google_credentials'],
|
|
language_code='en-US'
|
|
)
|
|
streamer.addConsumer(google)
|
|
|
|
await streamer.run()
|