906 lines
33 KiB
Python
906 lines
33 KiB
Python
"""
|
|
"Conscript reporting"
|
|
This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server.
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import yaml
|
|
import zmq
|
|
from zmq.asyncio import Context
|
|
|
|
import asyncio
|
|
from hugvey.communication import getTopic, zmqSend, zmqReceive, LOG_BS
|
|
from hugvey.panopticon import Panopticon
|
|
from hugvey.story import Story, Stopwatch
|
|
from hugvey.speech.google import GoogleVoiceClient
|
|
from hugvey.speech.player import Player
|
|
from hugvey.speech.streamer import AudioStreamer
|
|
import json
|
|
import logging
|
|
import queue
|
|
import threading
|
|
from hugvey.voice import VoiceStorage
|
|
import multiprocessing
|
|
from hugvey.speech.recorder import Recorder
|
|
from pythonosc import udp_client, osc_server, dispatcher
|
|
import copy
|
|
from pythonosc.osc_server import AsyncIOOSCUDPServer
|
|
from hugvey.variablestore import VariableStore
|
|
import datetime
|
|
|
|
mainLogger = logging.getLogger("hugvey")
|
|
|
|
logger = mainLogger.getChild("command")
|
|
|
|
eventLogger = logging.getLogger("events")
|
|
eventLogger.setLevel(logging.DEBUG)
|
|
|
|
# def exceptionEmitter(a):
|
|
# print(a)
|
|
# def decorate(func):
|
|
# print('decorate')
|
|
# async def call(*args, **kwargs):
|
|
# print('call')
|
|
# # pre(func, *args, **kwargs)
|
|
# try:
|
|
# result = await func(*args, **kwargs)
|
|
# except Exception as e:
|
|
# logger.critical(e, "in", func)
|
|
# raise e
|
|
# # post(func, *args, **kwargs)
|
|
# return result
|
|
# return call
|
|
# return decorate
|
|
|
|
|
|
class CentralCommand(object):
|
|
"""docstring for CentralCommand."""
|
|
|
|
def __init__(self, args = {}, debug_mode=False):
|
|
self.debug = debug_mode
|
|
self.eventQueue = asyncio.Queue()
|
|
self.commandQueue = asyncio.Queue()
|
|
self.lightQueue = asyncio.Queue()
|
|
self.isRunning = threading.Event()
|
|
self.logQueue = multiprocessing.Queue()
|
|
self.hugveys = {}
|
|
self.ctx = Context.instance()
|
|
self.hugveyLock = asyncio.Lock()
|
|
self.start_time = time.time()
|
|
self.languageFiles = {}
|
|
self.languageConfig = {}
|
|
self.args = args # cli args
|
|
|
|
self.timer = Stopwatch()
|
|
|
|
eventLogger.addHandler(logging.handlers.QueueHandler(self.logQueue))
|
|
|
|
def loadConfig(self, filename):
|
|
if hasattr(self, 'config'):
|
|
raise Exception("Overriding config not supported yet")
|
|
|
|
with open(filename, 'r') as fp:
|
|
logger.debug('Load config from {}'.format(filename))
|
|
self.config = yaml.safe_load(fp)
|
|
for arg in vars(self.args):
|
|
if arg in self.config:
|
|
logger.debug("Override argument {}".format(arg))
|
|
self.config[arg] = getattr(self.args,arg)
|
|
|
|
self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])]
|
|
|
|
self.loadLanguages()
|
|
|
|
|
|
voice_dir = os.path.join(self.config['web']['files_dir'], 'voices')
|
|
self.voiceStorage = VoiceStorage(voice_dir, self.languageConfig)
|
|
varDb = os.path.join(
|
|
self.config['voice']['record_dir'],
|
|
'hugvey_variable_store.db'
|
|
)
|
|
self.variableStore = VariableStore(varDb)
|
|
|
|
self.panopticon = Panopticon(self, self.config, self.voiceStorage)
|
|
|
|
|
|
def loadLanguages(self):
|
|
logger.debug('load language files')
|
|
self.languages = {}
|
|
self.languageCache = {}
|
|
|
|
for lang in self.config['languages']:
|
|
lang_filename = os.path.join(self.config['web']['files_dir'], lang['file'])
|
|
self.languageFiles[lang['code']] = lang['file']
|
|
self.languageConfig[lang['code']] = lang
|
|
with open(lang_filename, 'r') as fp:
|
|
self.languages[lang['code']] = json.load(fp)
|
|
|
|
|
|
def getHugveyStatus(self, hv_id, isSelected = False):
|
|
status = {'id': hv_id}
|
|
if not hv_id in self.hugveys:
|
|
status['status'] = 'off'
|
|
return status
|
|
|
|
hv = self.hugveys[hv_id]
|
|
# if not hv.story:
|
|
# status['status'] = 'off'
|
|
# return status
|
|
|
|
#: :type hv: HugveyState
|
|
status['status'] = hv.getStatus()
|
|
status['light_on'] = bool(hv.lightStatus)
|
|
status['language'] = hv.language_code
|
|
status['light_id'] = hv.lightId
|
|
status['available'] = hv.isAvailable()
|
|
status['msg'] = hv.story.currentMessage.id if hv.story and hv.story.currentMessage else None
|
|
# status['finished'] = hv.story.isFinished()
|
|
status['history'] = {} if isSelected is False or not hv.story else hv.story.getLogSummary()
|
|
# status['history'] = hv.story.getLogSummary() # disabled as it is a bit slow. We now have eventLog
|
|
# status['counts'] = {t: len(a) for t, a in status['history'].items() if t != 'directions' }
|
|
status['counts'] = {} if not hv.story else hv.story.getLogCounts()
|
|
status['duration'] = 0 if not hv.story else hv.story.timer.getElapsed()
|
|
status['has_state'] = Story.hugveyHasSavedState(hv.lightId)
|
|
status['variables'] = {} if not isSelected or not hv.story else hv.story.variableValues
|
|
|
|
if not hv.story:
|
|
status['time_since_hugvey_spoke'] = '-'
|
|
status['time_since_visitor_spoke'] = '-'
|
|
else:
|
|
if not hv.story.lastMsgStartTime:
|
|
status['time_since_hugvey_spoke'] = '?'
|
|
elif not hv.story.lastMsgFinishTime:
|
|
status['time_since_hugvey_spoke'] = 'speaking'
|
|
else:
|
|
status['time_since_hugvey_spoke'] = str(datetime.timedelta(seconds=int(hv.story.timer.getElapsed() - hv.story.lastMsgFinishTime)))
|
|
|
|
if not hv.story.timer.hasMark('last_speech'):
|
|
status['time_since_visitor_spoke'] = 'never'
|
|
else:
|
|
status['time_since_visitor_spoke'] = str(datetime.timedelta(seconds=int(hv.story.timer.getElapsed('last_speech'))))
|
|
|
|
return status
|
|
|
|
def getStatusSummary(self, selected_ids = []):
|
|
status = {
|
|
'uptime': "-" if not self.start_time else (time.time() - self.start_time),
|
|
'loop_timer': self.timer.getElapsed(),
|
|
'languages': self.config['languages'],
|
|
'hugvey_ids': self.hugvey_ids,
|
|
'hugveys': [],
|
|
'logbookId': None,
|
|
'logbook': [],
|
|
}
|
|
|
|
#use this to test if any threads stay open
|
|
# eg. after killing/dying of a hugvey
|
|
# print(threading.enumerate())
|
|
|
|
for hv_id in self.hugvey_ids:
|
|
status['hugveys'].append(self.getHugveyStatus(hv_id, hv_id in selected_ids))
|
|
|
|
status['hugveys'].sort(key=lambda hv: hv['light_id'] if 'light_id' in hv else hv['id'])
|
|
# if selected_id and selected_id in self.hugveys:
|
|
# if self.hugveys[selected_id].recorder:
|
|
# status['logbook'] = self.hugveys[selected_id].recorder.currentLog
|
|
# status['logbookId'] = selected_id
|
|
|
|
return status
|
|
|
|
def setLoopTime(self, secondsAgo: int):
|
|
self.timer.setMark('start', time.time() - secondsAgo)
|
|
|
|
def commandHugvey(self, hv_id, msg):
|
|
"""
|
|
prepare command to be picked up by the sender
|
|
"""
|
|
logging.debug(f"COmmand {hv_id}: {msg}")
|
|
if threading.current_thread().getName() != 'MainThread':
|
|
# Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse)
|
|
# won't trigger asyncios queue.get() so we have to do this thread
|
|
# safe, in the right loop
|
|
self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg)
|
|
else:
|
|
self._queueCommand(hv_id, msg)
|
|
|
|
def _queueCommand(self, hv_id, msg):
|
|
self.commandQueue.put_nowait((hv_id, msg))
|
|
|
|
|
|
def commandLight(self, route, data):
|
|
"""
|
|
Buffer light commands
|
|
"""
|
|
logging.debug(f"Light: {route} {data}")
|
|
if threading.current_thread().getName() != 'MainThread':
|
|
# Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse)
|
|
# won't trigger asyncios queue.get() so we have to do this thread
|
|
# safe, in the right loop
|
|
self.loop.call_soon_threadsafe(self._queueLightCommand, route, data)
|
|
else:
|
|
self._queueLightCommand(route, data)
|
|
|
|
def _queueLightCommand(self, route, data):
|
|
self.lightQueue.put_nowait((route, data))
|
|
|
|
def commandAllHugveys(self, msg):
|
|
for hv_id in self.hugvey_ids:
|
|
self.commandHugvey(hv_id, msg)
|
|
|
|
def commandAllActiveHugveys(self, msg):
|
|
for hv_id in self.hugveys:
|
|
self.commandHugvey(hv_id, msg)
|
|
|
|
async def commandSender(self):
|
|
s = self.ctx.socket(zmq.PUB)
|
|
s.bind(self.config['events']['cmd_address'])
|
|
|
|
self.commandAllHugveys({'action': 'show_yourself'})
|
|
|
|
# sleep to allow pending connections to connect
|
|
await asyncio.sleep(1)
|
|
logger.info("Ready to publish commands on: {}".format(
|
|
self.config['events']['cmd_address']))
|
|
logger.debug('Already {} items in queue'.format(
|
|
self.commandQueue.qsize()))
|
|
|
|
while self.isRunning.is_set():
|
|
hv_id, cmd = await self.commandQueue.get()
|
|
logger.debug('Got command to send: {} {}'.format(hv_id, cmd))
|
|
zmqSend(s, hv_id, cmd)
|
|
|
|
logger.warn('Stopping command sender')
|
|
s.close()
|
|
|
|
|
|
async def lightSender(self):
|
|
lightConn = udp_client.SimpleUDPClient(
|
|
self.config['light']['ip'],
|
|
self.config['light']['port'])
|
|
|
|
logger.info(f"Ready to send light commands to: {self.config['light']['ip']}:{self.config['light']['port']}")
|
|
|
|
while self.isRunning.is_set():
|
|
route, data = await self.lightQueue.get()
|
|
logger.debug('Got light to send: {} {}'.format(route, data))
|
|
lightConn.send_message(route, data)
|
|
await asyncio.sleep(.06)
|
|
|
|
logger.warn('Stopping light sender')
|
|
lightConn._sock.close()
|
|
|
|
def restartTimerHandler(self, address, *args):
|
|
"""
|
|
See self.oscListener
|
|
"""
|
|
logger.warn(f"Restart loop timer")
|
|
self.timer.reset()
|
|
if len(args) > 0 and float(args[0]) > 0:
|
|
print(args, args[0])
|
|
logger.warn(f"Set timer to custom time: {float(args[0])} seconds ago")
|
|
self.timer.setMark('start', time.time() - float(args[0]))
|
|
|
|
async def oscListener(self):
|
|
"""
|
|
OSC server, listens for loop restarts
|
|
"""
|
|
dispatch = dispatcher.Dispatcher()
|
|
dispatch.map("/loop", self.restartTimerHandler)
|
|
|
|
server = osc_server.AsyncIOOSCUDPServer(
|
|
("0.0.0.0", 9000), dispatch, asyncio.get_event_loop()
|
|
)
|
|
logger.info('Start OSC server to receive loop re-starts')
|
|
# await server.serve()
|
|
transport, protocol = await server.create_serve_endpoint()
|
|
# logger.critical(f"{transport}, {protocol}")
|
|
# transport.close()
|
|
|
|
async def redLightController(self):
|
|
"""
|
|
Every second, check if no hugveys are available. If so, the red light should be
|
|
overruled to be on. If any is available, send a 0 to release the override.
|
|
"""
|
|
currentCode = None
|
|
while self.isRunning.is_set():
|
|
# statusses = [hv.getStatus() for hv in self.hugveys.values()]
|
|
# lightOn = HugveyState.STATE_AVAILABLE not in statusses
|
|
statusses = [hv.isAvailable() for hv in self.hugveys.values()]
|
|
lightOn = True not in statusses
|
|
lightCode = 1 if lightOn else 0
|
|
if lightCode != currentCode:
|
|
self.commandLight('/red', [lightCode])
|
|
currentCode = lightCode
|
|
await asyncio.sleep(1)
|
|
|
|
logger.warn('Stopping red light controller')
|
|
|
|
def instantiateHugvey(self, hugvey_id):
|
|
'''
|
|
Start a HugveyState, according to a show_yourself reply
|
|
|
|
'event': 'connection',
|
|
'id': self.hugvey_id,
|
|
'host': socket.gethostname(),
|
|
'ip': self.getIp(),
|
|
'''
|
|
# async with self.hugveyLock: # lock to prevent duplicates on creation
|
|
if not hugvey_id in self.hugveys:
|
|
thread = threading.Thread(
|
|
target=self.hugveyStateRunner, args=(hugvey_id,), name=f"hugvey#{hugvey_id}")
|
|
thread.start()
|
|
|
|
def hugveyStateRunner(self, hugvey_id):
|
|
while self.isRunning.is_set():
|
|
logger.info(f'Instantiate hugvey #{hugvey_id}')
|
|
h = HugveyState(hugvey_id, self)
|
|
# h.config(msg['host'], msg['ip'])
|
|
self.hugveys[hugvey_id] = h
|
|
r = h.run()
|
|
self.hugveys.pop(hugvey_id)
|
|
if not r:
|
|
# stop if False, ie. when stream has gone
|
|
return
|
|
|
|
logger.critical(f'Hugvey stopped (crashed?). Reinstantiate after 5 sec')
|
|
time.sleep(5)
|
|
|
|
async def timerEmitter(self):
|
|
"""
|
|
This is fixed: a one hour loop with a collective moment 10-15 minutes,
|
|
30-35 minutes and 50-55 minutes
|
|
"""
|
|
loop_duration = 60 * 60 # one hour loop
|
|
intervals = [
|
|
{
|
|
'start_time': 10*60,
|
|
'duration': 5 * 60,
|
|
},
|
|
{
|
|
'start_time': 30*60,
|
|
'duration': 5 * 60,
|
|
},
|
|
{
|
|
'start_time': 50*60,
|
|
'duration': 5 * 60,
|
|
}
|
|
]
|
|
self.start_time = time.time()
|
|
|
|
# TODO: emit start event
|
|
|
|
while self.isRunning.is_set():
|
|
|
|
pass
|
|
|
|
async def eventListener(self):
|
|
s = self.ctx.socket(zmq.SUB)
|
|
s.bind(self.config['events']['listen_address'])
|
|
logger.debug("Listen for events on: {}".format(
|
|
self.config['events']['listen_address']))
|
|
|
|
for id in self.hugvey_ids:
|
|
s.subscribe(getTopic(id))
|
|
|
|
while self.isRunning.is_set():
|
|
try:
|
|
hugvey_id, msg = await zmqReceive(s)
|
|
|
|
if hugvey_id not in self.hugvey_ids:
|
|
logger.critical(
|
|
"Message from alien Hugvey: {}".format(hugvey_id))
|
|
continue
|
|
elif hugvey_id not in self.hugveys:
|
|
# if msg['event'] == 'connection':
|
|
# # Create a hugvey
|
|
# self.instantiateHugvey(hugvey_id, msg)
|
|
# else:
|
|
logger.warning(
|
|
"Message from uninstantiated Hugvey {}".format(hugvey_id))
|
|
logger.debug("Message contains: {}".format(msg))
|
|
continue
|
|
else:
|
|
self.hugveys[hugvey_id].queueEvent(msg)
|
|
# await self.hugveys[hugvey_id].eventQueue.put(msg)
|
|
except Exception as e:
|
|
logger.critical(f"Exception while running event loop:")
|
|
logger.exception(e)
|
|
|
|
async def voiceListener(self, hugvey_id):
|
|
s = self.ctx.socket(zmq.REP) #: :type s: zmq.sugar.Socket
|
|
voiceAddr = f"ipc://voice{hugvey_id}"
|
|
s.bind(voiceAddr)
|
|
logger.debug("Listen for voice requests on: {}".format(voiceAddr))
|
|
|
|
while self.isRunning.is_set():
|
|
try:
|
|
r = await s.recv_json()
|
|
isVariable = bool(r['variable'])
|
|
text = r['text']
|
|
hv = self.hugveys[hugvey_id] #: :type hv: HugveyState
|
|
fn = await self.voiceStorage.requestFile(hv.language_code, text, isVariable)
|
|
if fn is None:
|
|
eventLogger.getChild(f"{hugvey_id}").critical("error: No voice file fetched, check logs.")
|
|
fn = 'local/crash.wav'
|
|
# TODO: trigger a repeat/crash event.
|
|
await s.send_string(fn)
|
|
except Exception as e:
|
|
logger.critical(f"Exception while running voice loop:")
|
|
logger.exception(e)
|
|
|
|
def start(self):
|
|
self.isRunning.set()
|
|
self.loop = asyncio.get_event_loop()
|
|
# self.panopticon_loop = asyncio.new_event_loop()
|
|
|
|
self.tasks = {} # collect tasks so we can cancel in case of error
|
|
self.tasks['eventListener'] = self.loop.create_task(
|
|
self.catchException(self.eventListener()))
|
|
self.tasks['commandSender'] = self.loop.create_task(
|
|
self.catchException(self.commandSender()))
|
|
self.tasks['lightSender'] = self.loop.create_task(
|
|
self.catchException(self.lightSender()))
|
|
self.tasks['oscListener'] = self.loop.create_task(
|
|
self.catchException(self.oscListener()))
|
|
self.tasks['redLightController'] = self.loop.create_task(
|
|
self.catchException(self.redLightController()))
|
|
self.tasks['variableStore'] = self.loop.create_task(
|
|
self.catchException(self.variableStore.queueProcessor()))
|
|
|
|
for hid in self.hugvey_ids:
|
|
self.tasks['voiceListener'] = self.loop.create_task(
|
|
self.catchException(self.voiceListener(hid)))
|
|
self.instantiateHugvey(hid)
|
|
|
|
# we want the web interface in a separate thread
|
|
self.panopticon_thread = threading.Thread(
|
|
target=self.panopticon.start, name="Panopticon")
|
|
self.panopticon_thread.start()
|
|
|
|
self.loop.run_forever()
|
|
|
|
def stop(self):
|
|
self.isRunning.clear()
|
|
|
|
async def catchException(self, awaitable):
|
|
try:
|
|
# print(awaitable)
|
|
await awaitable
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
logger.critical(f"Hugvey restart might be required but not implemented yet")
|
|
|
|
|
|
class HugveyState(object):
|
|
"""Represents the state of a Hugvey client on the server.
|
|
Manages server connections & voice parsing etc.
|
|
"""
|
|
|
|
# all statusses can only go up or down, except for gone, which is an error state:
|
|
# off <-> blocked <-> available <-> running <-> paused
|
|
STATE_OFF = "off"
|
|
STATE_BLOCKED = "blocked"
|
|
STATE_AVAILABLE = "available"
|
|
STATE_RUNNING = "running"
|
|
STATE_PAUSE = "paused"
|
|
STATE_GONE = "gone"
|
|
|
|
def __init__(self, id: int, command: CentralCommand):
|
|
self.id = id
|
|
self.lightId = id
|
|
self.command = command
|
|
self.logger = mainLogger.getChild(f"{self.id}").getChild("command")
|
|
self.loop = asyncio.new_event_loop()
|
|
self.isConfigured = None
|
|
self.isRunning = asyncio.Event(loop=self.loop)
|
|
self.isRunning.clear()
|
|
|
|
self.eventQueue = None
|
|
self.language_code = 'en-GB'
|
|
self.story = None
|
|
self.streamer = None
|
|
self.google = None
|
|
self.player = None
|
|
self.recorder = None
|
|
self.notShuttingDown = True # TODO: allow shutdown of object
|
|
self.startMsgId = None
|
|
self.lightStatus = 0
|
|
self.eventLogger = eventLogger.getChild(f"{self.id}")
|
|
|
|
self.blockRestart = False
|
|
|
|
self.setStatus(self.STATE_GONE)
|
|
|
|
self.requireRestartAfterStop = None
|
|
|
|
def __del__(self):
|
|
self.logger.warn("Destroying hugvey object")
|
|
|
|
def isAvailable(self):
|
|
if self.command.config['story']['loop']:
|
|
if (self.status == self.STATE_RUNNING or self.status == self.STATE_PAUSE) and self.story:
|
|
if self.story.currentMessage:
|
|
if self.story.currentMessage.id == self.story.startMessage.id:
|
|
return True
|
|
return self.status == self.STATE_AVAILABLE
|
|
|
|
def getStatus(self):
|
|
return self.status
|
|
|
|
def setStatus(self, status):
|
|
self.status = status
|
|
|
|
# if the story is looping, light should not go off when the story starts
|
|
if status != self.STATE_RUNNING or self.command.config['story']['loop'] is False:
|
|
lightOn = status in [self.STATE_AVAILABLE, self.STATE_PAUSE]
|
|
intensity = self.command.config['light']['on_intensity'] if lightOn else self.command.config['light']['off_intensity']
|
|
duration = self.command.config['light']['fade_duration_id']
|
|
self.transitionLight(intensity, duration)
|
|
|
|
self.eventLogger.info(f"status: {self.status}")
|
|
|
|
def config(self, hostname, ip):
|
|
self.ip = ip
|
|
self.hostname = hostname
|
|
|
|
if self.isConfigured is not None:
|
|
# a reconfiguration/reconnection
|
|
pass
|
|
else:
|
|
self.logger.info(
|
|
f"Hugvey {self.id} at {self.ip}, host: {self.hostname}")
|
|
|
|
if self.status == self.STATE_GONE:
|
|
# turn on :-)
|
|
self.setStatus(self.STATE_BLOCKED)
|
|
|
|
self.isConfigured = time.time()
|
|
|
|
def sendCommand(self, msg):
|
|
"""
|
|
Send message or command to hugvey
|
|
@param msg: The message to be sent. Probably a dict()
|
|
"""
|
|
self.command.commandHugvey(self.id, msg)
|
|
|
|
def run(self):
|
|
self.logger.info(f"Await hugvey #{self.id}")
|
|
tasks = asyncio.gather(
|
|
self.catchException(self.processAudio()),
|
|
self.catchException(self.handleEvents()),
|
|
self.catchException(self.playStory()),
|
|
loop=self.loop)
|
|
self.loop.run_until_complete(tasks)
|
|
self.logger.warning(f"FINISHED RUNNING {self.id}")
|
|
return self.requireRestartAfterStop
|
|
|
|
async def catchException(self, awaitable):
|
|
try:
|
|
# print(awaitable)
|
|
await awaitable
|
|
except Exception as e:
|
|
self.logger.exception(e)
|
|
self.logger.critical(f"Hugvey crash")
|
|
self.eventLogger.critical(f"error: {e}")
|
|
|
|
# restart
|
|
# TODO: test proper functioning
|
|
self.shutdown()
|
|
|
|
|
|
def queueEvent(self, msg):
|
|
if 'time' not in msg:
|
|
# add time, so we can track time passed
|
|
msg['time'] = time.time()
|
|
if not self.eventQueue:
|
|
self.logger.critical("No event queue to put {}".format(msg))
|
|
else:
|
|
# Allow for both the Hugvey Command, or the Story handle the event.
|
|
self.loop.call_soon_threadsafe(self._queueEvent, msg)
|
|
|
|
def _queueEvent(self, msg):
|
|
"""
|
|
Put event in both the event loop for the story as well as the Hugvey State handler
|
|
"""
|
|
# self.logger.debug(f"Queue event in hugvey loop: {msg}") # a little less logging please :-)
|
|
self.eventQueue.put_nowait(msg)
|
|
|
|
# connection events don't need to go to the story
|
|
if msg['event'] == 'connection':
|
|
return
|
|
|
|
if self.story:
|
|
self.story.events.append(msg)
|
|
else:
|
|
self.logger.critical("Cannot queue event, as no story is present.")
|
|
|
|
async def handleEvents(self):
|
|
self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues
|
|
while self.notShuttingDown:
|
|
try:
|
|
event = await asyncio.wait_for(self.eventQueue.get(), 2)
|
|
except asyncio.futures.TimeoutError as e:
|
|
# detect missing heartbeat:
|
|
if self.isConfigured and time.time() - self.isConfigured > 15:
|
|
self.logger.error("Hugvey did not send heartbeat.")
|
|
# self.gone()
|
|
self.shutdown()
|
|
continue
|
|
|
|
self.logger.debug("Received: {}".format(event))
|
|
if event['event'] == 'connection':
|
|
# 'event': 'connection',
|
|
# 'id': self.hugvey_id,
|
|
# 'host': socket.gethostname(),
|
|
# 'ip': self.getIp(),
|
|
self.config(event['host'], event['ip'])
|
|
|
|
|
|
if event['event'] == 'language':
|
|
self.setLanguage(event['code'])
|
|
|
|
if event['event'] == 'pause':
|
|
self.pause()
|
|
if event['event'] == 'block':
|
|
self.block()
|
|
if event['event'] == 'unblock':
|
|
self.available()
|
|
if event['event'] == 'restart':
|
|
self.restart()
|
|
if event['event'] == 'finish':
|
|
self.blockRestart = True
|
|
self.story._finish() # finish story AND hugvey state
|
|
if event['event'] == 'resume':
|
|
self.resume()
|
|
|
|
if event['event'] == 'change_language':
|
|
self.setLanguage(event['lang_code'])
|
|
if event['event'] == 'change_language_if_available':
|
|
if self.isAvailable() or self.status == self.STATE_BLOCKED:
|
|
self.setLanguage(event['lang_code'])
|
|
if event['event'] == 'change_light':
|
|
self.setLightId(event['light_id'])
|
|
if event['event'] == 'change_light_status':
|
|
self.setLightStatus(event['status'])
|
|
if event['event'] == 'play_msg':
|
|
self.logger.info(f"Play given message {event['msg_id']}")
|
|
if not self.story:
|
|
self.logger.critical("No story to play message in")
|
|
else:
|
|
if self.story is None:
|
|
return
|
|
|
|
if event['reloadStory']:
|
|
self.startMsgId = event['msg_id']
|
|
self.logger.debug(f"Restart from {self.startMsgId}")
|
|
self.restart()
|
|
else:
|
|
msg = self.story.get(event['msg_id'])
|
|
await self.story.setCurrentMessage(msg)
|
|
|
|
self.eventQueue = None
|
|
|
|
def setLanguage(self, language_code):
|
|
self.configureLanguage(language_code)
|
|
|
|
if self.isRunning.is_set():
|
|
self.restart()
|
|
|
|
def configureLanguage(self, language_code):
|
|
if language_code not in self.command.languages:
|
|
raise Exception("Invalid language {}".format(language_code))
|
|
|
|
self.logger.info(f"set language: {language_code}")
|
|
self.language_code = language_code
|
|
|
|
if self.google:
|
|
self.google.setLanguage(language_code)
|
|
|
|
def pause(self, log = True):
|
|
if log:
|
|
self.logger.info('Pause')
|
|
if self.google:
|
|
self.google.pause()
|
|
if self.story:
|
|
self.story.pause()
|
|
self.isRunning.clear()
|
|
self.setStatus(self.STATE_PAUSE)
|
|
|
|
def resume(self, log = True):
|
|
"""Start playing without reset, also used to play from a saved state"""
|
|
if log:
|
|
self.logger.info('Resume')
|
|
if self.google:
|
|
self.google.resume()
|
|
if self.story:
|
|
self.story.resume()
|
|
self.isRunning.set()
|
|
self.setStatus(self.STATE_RUNNING)
|
|
|
|
def restart(self):
|
|
"""Start playing with reset"""
|
|
self.logger.info('Restart')
|
|
if Story.hugveyHasSavedState(self.lightId):
|
|
Story.clearSavedState(self.lightId)
|
|
if self.story:
|
|
self.story.stop()
|
|
self.resume(log=False)
|
|
|
|
def block(self):
|
|
"""Block a hugvey"""
|
|
self.logger.info('block')
|
|
if self.google:
|
|
self.google.pause()
|
|
if self.story:
|
|
self.story.finish()
|
|
self.isRunning.clear()
|
|
self.setStatus(self.STATE_BLOCKED)
|
|
|
|
def available(self):
|
|
"""Put in available mode"""
|
|
self.logger.info('Finish/Await')
|
|
# TODO: Toggle running if config says so, but turn light on
|
|
self.pause(log=False)
|
|
self.setStatus(self.STATE_AVAILABLE)
|
|
|
|
def setLightStatus(self, on):
|
|
self.lightStatus = 1 if on else 0
|
|
self.logger.log(LOG_BS, f"Send /hugvey {self.lightStatus}")
|
|
|
|
self.command.commandLight('/hugvey', [self.lightId, self.lightStatus])
|
|
|
|
def transitionLight(self, intensity, duration):
|
|
"""
|
|
Intensity: 0-255
|
|
duration: an integer between 0-92 indicating the lanbox fade times
|
|
"""
|
|
self.lightIntensity = intensity
|
|
self.logger.log(LOG_BS, f"Send /hugvey_fade {self.lightIntensity} {duration}")
|
|
self.command.commandLight('/hugvey_fade', [self.lightId, intensity, int(duration)])
|
|
|
|
def setLightId(self, id):
|
|
"""
|
|
Connect hugvey to another light
|
|
"""
|
|
self.lightId = id
|
|
|
|
def gone(self):
|
|
'''Status to 'gone' as in, shutdown/crashed/whatever
|
|
'''
|
|
self.pause(log=False)
|
|
if self.story:
|
|
self.story.stop()
|
|
|
|
self.logger.warn('Gone')
|
|
self.eventLogger.warn("Gone")
|
|
self.isConfigured = None
|
|
self.setStatus(self.STATE_GONE)
|
|
|
|
def shutdown(self, definitive = False):
|
|
self.logger.info(f"Start shutdown sequence {definitive}")
|
|
self.eventLogger.critical(f"error: shutting down")
|
|
if self.streamer:
|
|
self.streamer.stop()
|
|
if self.story:
|
|
self.story.shutdown()
|
|
self.story = None
|
|
|
|
# shutdown for stream consumers already ran. Only clear references
|
|
if self.google:
|
|
self.google = None
|
|
if self.player:
|
|
self.player = None
|
|
if self.recorder:
|
|
self.recorder = None
|
|
|
|
if self.requireRestartAfterStop is None:
|
|
# prevent double setting of the same variable
|
|
# first call sometimes triggers second
|
|
self.requireRestartAfterStop = not definitive
|
|
|
|
self.notShuttingDown = False
|
|
|
|
|
|
async def playStory(self):
|
|
while self.notShuttingDown:
|
|
try:
|
|
await asyncio.wait_for(self.isRunning.wait(), 1)
|
|
except asyncio.futures.TimeoutError as e:
|
|
# timeout + catch so we can shutdown if needed without infinite await
|
|
continue
|
|
else:
|
|
# new story instance on each run
|
|
port = self.command.config['web']['port']
|
|
|
|
resuming = False
|
|
if Story.hugveyHasSavedState(self.lightId):
|
|
self.logger.info(f"Recovering from state :-)")
|
|
self.story = Story.loadStoryFromState(self)
|
|
resuming = True
|
|
if self.story.language_code != self.language_code:
|
|
self.logger.info("Changing language")
|
|
self.configureLanguage(self.story.language_code)
|
|
else:
|
|
self.story = Story(self, port)
|
|
self.story.setStoryData(copy.deepcopy(self.command.languages[self.language_code]), self.language_code)
|
|
|
|
|
|
if not self.streamer:
|
|
await asyncio.sleep(1)
|
|
|
|
self.streamer.triggerStart()
|
|
|
|
startMsgId = self.startMsgId
|
|
self.startMsgId = None # use only once, reset before 'run'
|
|
if not startMsgId and self.story.currentMessage:
|
|
startMsgId = self.story.currentMessage.id
|
|
|
|
self.logger.info(f"Starting from {startMsgId}")
|
|
|
|
self.setLightStatus(False)
|
|
await self.story.run(startMsgId, resuming)
|
|
|
|
if self.command.config['story']['loop']:
|
|
if not self.blockRestart:
|
|
self.logger.info("Loop story")
|
|
self.restart()
|
|
else:
|
|
self.logger.info("Don't loop on manual finish")
|
|
|
|
# reset a potential setting of blockRestart
|
|
self.blockRestart = False
|
|
# self.story = None
|
|
|
|
def getStreamer(self):
|
|
if not self.streamer:
|
|
self.streamer = AudioStreamer(
|
|
self.command.config['voice']['chunk'],
|
|
self.ip,
|
|
int(self.command.config['voice']['port']) + self.id,
|
|
self.id)
|
|
|
|
if self.command.config['voyeur']:
|
|
self.logger.warn("Debug on: Connecting Audio player")
|
|
self.player = Player(
|
|
self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
|
|
self.streamer.addConsumer(self.player)
|
|
|
|
if self.command.config['voice']['record_dir']:
|
|
self.logger.warn("Record Audio of conversation")
|
|
self.recorder = Recorder( self.id,
|
|
self.command.config['voice']['src_rate'], self.command.config['voice']['record_dir'],
|
|
self.command.config['voice']['record_voice'] if 'record_voice' in self.command.config['voice'] else False)
|
|
|
|
self.streamer.addConsumer(self.recorder)
|
|
|
|
self.logger.debug("Start Speech")
|
|
self.google = GoogleVoiceClient(
|
|
hugvey=self,
|
|
src_rate=self.command.config['voice']['src_rate'],
|
|
credential_file=self.command.config['voice']['google_credentials'],
|
|
language_code=self.language_code
|
|
)
|
|
self.streamer.addConsumer(self.google)
|
|
return self.streamer
|
|
|
|
async def processAudio(self):
|
|
'''
|
|
Start the audio streamer service
|
|
'''
|
|
|
|
self.logger.debug("Start audio loop")
|
|
|
|
while self.notShuttingDown:
|
|
try:
|
|
await asyncio.wait_for(self.isRunning.wait(), 1)
|
|
except asyncio.futures.TimeoutError as e:
|
|
# timeout + catch so we can shutdown if needed without infinite await
|
|
continue
|
|
else:
|
|
self.logger.debug("Start audio stream")
|
|
await self.getStreamer().run()
|
|
self.logger.critical(f"stream has left the building from {self.ip}")
|
|
self.eventLogger.critical(f"error: stream has left the building from {self.ip}")
|
|
# if we end up here, the streamer finished, probably meaning hte hugvey shutdown
|
|
# self.gone()
|