hugvey/hugvey/central_command.py

193 lines
6.7 KiB
Python
Raw Normal View History

"""
"Conscript reporting"
This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server.
"""
import asyncio
import logging
2019-01-17 20:28:55 +01:00
from pandas.conftest import ip
import threading
import yaml
import zmq
2019-01-17 20:28:55 +01:00
from zmq.asyncio import Context
from hugvey import panopticon
from hugvey.communication import getTopic, zmqSend, zmqReceive
2019-01-17 20:28:55 +01:00
from hugvey.voice.google import GoogleVoiceClient
from hugvey.voice.player import Player
from hugvey.voice.streamer import AudioStreamer
import uuid
logger = logging.getLogger("command")
class CentralCommand(object):
"""docstring for CentralCommand."""
def __init__(self, debug_mode = False):
self.debug = debug_mode
self.eventQueue = asyncio.Queue()
self.commandQueue = asyncio.Queue()
self.isRunning = threading.Event()
self.hugveys = {}
self.ctx = Context.instance()
2019-01-17 20:28:55 +01:00
self.hugveyLock = asyncio.Lock()
def loadConfig(self, filename):
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
self.hugvey_ids = [i+1 for i in range(self.config['hugveys'])]
def commandHugvey(self, hv_id, msg):
self.commandQueue.put_nowait((hv_id, msg))
def commandAllHugveys(self, msg):
for hv_id in self.hugvey_ids:
self.commandHugvey(hv_id, msg)
def commandAllActiveHugveys(self, msg):
for hv_id in self.hugveys:
self.commandHugvey(hv_id, msg)
async def commandSender(self):
s = self.ctx.socket(zmq.PUB)
s.bind(self.config['events']['cmd_address'])
self.commandAllHugveys({'action': 'show_yourself'})
# sleep to allow pending connections to connect
await asyncio.sleep(1)
logger.info("Ready to publish commands on: {}".format(self.config['events']['cmd_address']))
logger.debug('Already {} items in queue'.format(self.commandQueue.qsize()))
while self.isRunning.is_set():
hv_id, cmd = await self.commandQueue.get()
zmqSend(s, hv_id, cmd)
s.close()
2019-01-17 20:28:55 +01:00
async def instantiateHugvey(self, hugvey_id, msg):
'''
Start a HugveyState, according to a show_yourself reply
'event': 'connection',
'id': self.hugvey_id,
'host': socket.gethostname(),
'ip': self.getIp(),
'''
2019-01-17 20:28:55 +01:00
async with self.hugveyLock: # lock to prevent duplicates on creation
if not hugvey_id in self.hugveys:
logger.info(f'Instantiate hugvey #{hugvey_id}')
h = HugveyState(hugvey_id, self)
h.config(msg['host'],msg['ip'])
self.hugveys[hugvey_id] = h
thread = threading.Thread(target=h.start, name=f"hugvey#{hugvey_id}")
thread.start()
else:
logger.info(f'Reconfigure hugvey #{hugvey_id}')
# (re)configure exisitng hugveys
h.config(msg['host'],msg['ip'])
async def eventListener(self):
s = self.ctx.socket(zmq.SUB)
s.bind(self.config['events']['listen_address'])
logger.info("Listen for events on: {}".format(self.config['events']['listen_address']))
for id in self.hugvey_ids:
s.subscribe(getTopic(id))
while self.isRunning.is_set():
hugvey_id, msg = await zmqReceive(s)
if hugvey_id not in self.hugvey_ids:
logger.critical("Message from alien Hugvey: {}".format(hugvey_id))
continue
elif hugvey_id not in self.hugveys:
if msg['event'] == 'connection':
# Create a hugvey
2019-01-17 20:28:55 +01:00
await self.instantiateHugvey(hugvey_id, msg)
else:
logger.warning("Message from uninstantiated Hugvey {}".format(hugvey_id))
logger.debug("Message contains: {}".format(msg))
continue
else:
pass
def start(self):
self.isRunning.set()
self.loop = asyncio.get_event_loop()
self.tasks = {} # collect tasks so we can cancel in case of error
self.tasks['eventListener'] = self.loop.create_task(self.eventListener())
self.tasks['commandSender'] = self.loop.create_task(self.commandSender())
# self.tasks['commandSender'] = self.loop.create_task(self.commandSender())
self.loop.run_forever()
def stop(self):
self.isRunning.clear()
class HugveyState(object):
"""Represents the state of a Hugvey client on the server.
Manages server connections & voice parsing etc.
"""
def __init__(self, id: int, command: CentralCommand):
super(HugveyState, self).__init__()
self.id = id
self.command = command
self.logger = logging.getLogger(f"hugvey{self.id}")
self.loop = asyncio.new_event_loop()
2019-01-17 20:28:55 +01:00
self.isConfigured = False
def config(self, hostname, ip):
self.ip = ip
self.hostname = hostname
self.logger.info(f"Hugvey {self.id} at {self.ip}, host: {self.hostname}")
2019-01-17 20:28:55 +01:00
if self.isConfigured == True:
# a reconfiguration/reconnection
pass
self.isConfigured = True
def start(self):
# stop on isRunning.is_set() or wait()
2019-01-17 20:28:55 +01:00
# self.loop.create_task(self.startAudioProcessing())
tasks = asyncio.gather(self.startAudioProcessing(), loop=self.loop)
self.loop.run_until_complete(tasks)
# asyncio.run_coroutine_threadsafe(self._start(), self.loop)
2019-01-17 20:28:55 +01:00
async def startAudioProcessing(self):
'''
Start the audio streamer service
'''
self.logger.info("Start audio stream")
streamer = AudioStreamer(
self.command.config['voice']['chunk'],
self.ip,
int(self.command.config['voice']['port']))
if self.command.debug:
self.logger.warn("Debug on: Connecting Audio player")
player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
streamer.addConsumer(player)
2019-01-17 20:28:55 +01:00
self.logger.info("Start Speech")
google = GoogleVoiceClient(
hugvey_id=self.id,
src_rate=self.command.config['voice']['src_rate'],
credential_file=self.command.config['voice']['google_credentials'],
language_code='en-US'
)
streamer.addConsumer(google)
await streamer.run()