hugvey/hugvey/central_command.py

795 lines
28 KiB
Python
Raw Normal View History

"""
"Conscript reporting"
This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server.
"""
2019-01-24 14:27:04 +01:00
import os
2019-01-18 19:39:35 +01:00
import time
import yaml
import zmq
2019-01-17 20:28:55 +01:00
from zmq.asyncio import Context
2019-01-24 14:27:04 +01:00
import asyncio
from hugvey.communication import getTopic, zmqSend, zmqReceive, LOG_BS
2019-01-18 19:39:35 +01:00
from hugvey.panopticon import Panopticon
from hugvey.story import Story
from hugvey.speech.google import GoogleVoiceClient
from hugvey.speech.player import Player
from hugvey.speech.streamer import AudioStreamer
2019-01-24 14:27:04 +01:00
import json
import logging
import queue
2019-01-24 14:27:04 +01:00
import threading
2019-02-18 20:38:54 +01:00
from hugvey.voice import VoiceStorage
import multiprocessing
from hugvey.speech.recorder import Recorder
from pythonosc import udp_client
2019-05-11 15:23:55 +02:00
import copy
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("command")
eventLogger = logging.getLogger("events")
eventLogger.setLevel(logging.DEBUG)
# def exceptionEmitter(a):
# print(a)
# def decorate(func):
# print('decorate')
# async def call(*args, **kwargs):
# print('call')
# # pre(func, *args, **kwargs)
# try:
# result = await func(*args, **kwargs)
# except Exception as e:
# logger.critical(e, "in", func)
# raise e
# # post(func, *args, **kwargs)
# return result
# return call
# return decorate
class CentralCommand(object):
"""docstring for CentralCommand."""
def __init__(self, args = {}, debug_mode=False):
self.debug = debug_mode
self.eventQueue = asyncio.Queue()
self.commandQueue = asyncio.Queue()
self.lightQueue = asyncio.Queue()
self.isRunning = threading.Event()
self.logQueue = multiprocessing.Queue()
self.hugveys = {}
self.ctx = Context.instance()
2019-01-17 20:28:55 +01:00
self.hugveyLock = asyncio.Lock()
self.start_time = time.time()
2019-01-23 22:38:27 +01:00
self.languageFiles = {}
2019-04-09 09:40:50 +02:00
self.languageConfig = {}
self.args = args # cli args
2019-05-11 15:23:55 +02:00
eventLogger.addHandler(logging.handlers.QueueHandler(self.logQueue))
def loadConfig(self, filename):
2019-01-18 19:39:35 +01:00
if hasattr(self, 'config'):
raise Exception("Overriding config not supported yet")
with open(filename, 'r') as fp:
logger.debug('Load config from {}'.format(filename))
self.config = yaml.safe_load(fp)
for arg in vars(self.args):
if arg in self.config:
logger.debug("Override argument {}".format(arg))
self.config[arg] = getattr(self.args,arg)
self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])]
self.loadLanguages()
2019-05-11 15:23:55 +02:00
voice_dir = os.path.join(self.config['web']['files_dir'], 'voices')
2019-04-09 09:40:50 +02:00
self.voiceStorage = VoiceStorage(voice_dir, self.languageConfig)
2019-05-11 15:23:55 +02:00
self.panopticon = Panopticon(self, self.config, self.voiceStorage)
2019-05-11 15:23:55 +02:00
def loadLanguages(self):
logger.debug('load language files')
2019-01-18 12:42:50 +01:00
self.languages = {}
self.languageCache = {}
2019-01-18 12:42:50 +01:00
for lang in self.config['languages']:
2019-01-23 15:26:44 +01:00
lang_filename = os.path.join(self.config['web']['files_dir'], lang['file'])
2019-01-23 22:38:27 +01:00
self.languageFiles[lang['code']] = lang['file']
2019-04-09 09:40:50 +02:00
self.languageConfig[lang['code']] = lang
2019-01-23 15:26:44 +01:00
with open(lang_filename, 'r') as fp:
2019-01-24 14:27:04 +01:00
self.languages[lang['code']] = json.load(fp)
def getHugveyStatus(self, hv_id, isSelected = False):
status = {'id': hv_id}
if not hv_id in self.hugveys:
status['status'] = 'off'
return status
hv = self.hugveys[hv_id]
# if not hv.story:
# status['status'] = 'off'
# return status
2019-05-11 15:23:55 +02:00
status['status'] = hv.getStatus()
status['language'] = hv.language_code
status['light_id'] = hv.lightId
status['msg'] = hv.story.currentMessage.id if hv.story and hv.story.currentMessage else None
# status['finished'] = hv.story.isFinished()
status['history'] = {} if isSelected is False or not hv.story else hv.story.getLogSummary()
# status['history'] = hv.story.getLogSummary() # disabled as it is a bit slow. We now have eventLog
# status['counts'] = {t: len(a) for t, a in status['history'].items() if t != 'directions' }
status['counts'] = {} if not hv.story else hv.story.getLogCounts()
2019-05-11 15:23:55 +02:00
status['duration'] = 0 if not hv.story else hv.story.timer.getElapsed()
status['has_state'] = Story.hugveyHasSavedState(hv.lightId)
status['variables'] = {} if not isSelected or not hv.story else hv.story.variableValues
return status
2019-05-17 16:39:53 +02:00
def getStatusSummary(self, selected_ids = []):
status = {
'uptime': "-" if not self.start_time else (time.time() - self.start_time),
'languages': self.config['languages'],
'hugvey_ids': self.hugvey_ids,
'hugveys': [],
'logbookId': None,
'logbook': [],
}
2019-05-11 15:23:55 +02:00
#use this to test if any threads stay open
# eg. after killing/dying of a hugvey
2019-05-11 15:23:55 +02:00
# print(threading.enumerate())
for hv_id in self.hugvey_ids:
2019-05-17 16:39:53 +02:00
status['hugveys'].append(self.getHugveyStatus(hv_id, hv_id in selected_ids))
status['hugveys'].sort(key=lambda hv: hv['light_id'] if 'light_id' in hv else hv['id'])
2019-05-17 16:39:53 +02:00
# if selected_id and selected_id in self.hugveys:
# if self.hugveys[selected_id].recorder:
# status['logbook'] = self.hugveys[selected_id].recorder.currentLog
# status['logbookId'] = selected_id
2019-05-11 15:23:55 +02:00
return status
def commandHugvey(self, hv_id, msg):
2019-01-18 12:42:50 +01:00
"""
prepare command to be picked up by the sender
"""
2019-02-18 20:38:54 +01:00
logging.debug(f"COmmand {hv_id}: {msg}")
if threading.current_thread().getName() != 'MainThread':
# Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse)
# won't trigger asyncios queue.get() so we have to do this thread
# safe, in the right loop
self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg)
else:
self._queueCommand(hv_id, msg)
def _queueCommand(self, hv_id, msg):
self.commandQueue.put_nowait((hv_id, msg))
2019-05-11 15:23:55 +02:00
def commandLight(self, route, data):
"""
Buffer light commands
"""
logging.debug(f"Light: {route} {data}")
if threading.current_thread().getName() != 'MainThread':
# Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse)
# won't trigger asyncios queue.get() so we have to do this thread
# safe, in the right loop
self.loop.call_soon_threadsafe(self._queueLightCommand, route, data)
else:
self._queueLightCommand(route, data)
def _queueLightCommand(self, route, data):
self.lightQueue.put_nowait((route, data))
def commandAllHugveys(self, msg):
for hv_id in self.hugvey_ids:
self.commandHugvey(hv_id, msg)
def commandAllActiveHugveys(self, msg):
for hv_id in self.hugveys:
self.commandHugvey(hv_id, msg)
async def commandSender(self):
s = self.ctx.socket(zmq.PUB)
s.bind(self.config['events']['cmd_address'])
self.commandAllHugveys({'action': 'show_yourself'})
# sleep to allow pending connections to connect
await asyncio.sleep(1)
logger.info("Ready to publish commands on: {}".format(
self.config['events']['cmd_address']))
logger.debug('Already {} items in queue'.format(
self.commandQueue.qsize()))
while self.isRunning.is_set():
hv_id, cmd = await self.commandQueue.get()
logger.debug('Got command to send: {} {}'.format(hv_id, cmd))
zmqSend(s, hv_id, cmd)
logger.warn('Stopping command sender')
s.close()
2019-05-11 15:23:55 +02:00
async def lightSender(self):
lightConn = udp_client.SimpleUDPClient(
self.config['light']['ip'],
self.config['light']['port'])
2019-05-11 15:23:55 +02:00
logger.info(f"Ready to send light commands to: {self.config['light']['ip']}:{self.config['light']['port']}")
while self.isRunning.is_set():
route, data = await self.lightQueue.get()
logger.debug('Got light to send: {} {}'.format(route, data))
lightConn.send_message(route, data)
await asyncio.sleep(.06)
logger.warn('Stopping light sender')
lightConn._sock.close()
2019-05-11 15:23:55 +02:00
async def redLightController(self):
"""
Every second, check if no hugveys are available. If so, the red light should be
overruled to be on. If any is available, send a 0 to release the override.
"""
currentCode = None
while self.isRunning.is_set():
statusses = [hv.getStatus() for hv in self.hugveys.values()]
lightOn = HugveyState.STATE_AVAILABLE not in statusses
lightCode = 1 if lightOn else 0
if lightCode != currentCode:
self.commandLight('/red', [lightCode])
currentCode = lightCode
await asyncio.sleep(1)
logger.warn('Stopping red light controller')
2019-04-27 11:51:11 +02:00
def instantiateHugvey(self, hugvey_id):
'''
Start a HugveyState, according to a show_yourself reply
'event': 'connection',
'id': self.hugvey_id,
'host': socket.gethostname(),
'ip': self.getIp(),
'''
2019-04-27 11:51:11 +02:00
# async with self.hugveyLock: # lock to prevent duplicates on creation
if not hugvey_id in self.hugveys:
thread = threading.Thread(
target=self.hugveyStateRunner, args=(hugvey_id,), name=f"hugvey#{hugvey_id}")
thread.start()
2019-05-11 15:23:55 +02:00
2019-04-27 11:51:11 +02:00
def hugveyStateRunner(self, hugvey_id):
while self.isRunning.is_set():
logger.info(f'Instantiate hugvey #{hugvey_id}')
h = HugveyState(hugvey_id, self)
2019-04-27 11:51:11 +02:00
# h.config(msg['host'], msg['ip'])
self.hugveys[hugvey_id] = h
r = h.run()
self.hugveys.pop(hugvey_id)
if not r:
# stop if False, ie. when stream has gone
return
logger.critical(f'Hugvey stopped (crashed?). Reinstantiate after 5 sec')
time.sleep(5)
2019-05-11 15:23:55 +02:00
async def timerEmitter(self):
"""
This is fixed: a one hour loop with a collective moment 10-15 minutes,
30-35 minutes and 50-55 minutes
"""
loop_duration = 60 * 60 # one hour loop
intervals = [
{
'start_time': 10*60,
2019-05-11 15:23:55 +02:00
'duration': 5 * 60,
},
{
'start_time': 30*60,
2019-05-11 15:23:55 +02:00
'duration': 5 * 60,
},
{
'start_time': 50*60,
2019-05-11 15:23:55 +02:00
'duration': 5 * 60,
}
]
self.start_time = time.time()
2019-05-11 15:23:55 +02:00
# TODO: emit start event
2019-05-11 15:23:55 +02:00
while self.isRunning.is_set():
2019-05-11 15:23:55 +02:00
pass
2019-05-11 15:23:55 +02:00
async def eventListener(self):
s = self.ctx.socket(zmq.SUB)
s.bind(self.config['events']['listen_address'])
logger.debug("Listen for events on: {}".format(
self.config['events']['listen_address']))
for id in self.hugvey_ids:
s.subscribe(getTopic(id))
while self.isRunning.is_set():
try:
hugvey_id, msg = await zmqReceive(s)
2019-05-11 15:23:55 +02:00
if hugvey_id not in self.hugvey_ids:
logger.critical(
"Message from alien Hugvey: {}".format(hugvey_id))
continue
elif hugvey_id not in self.hugveys:
2019-04-27 11:51:11 +02:00
# if msg['event'] == 'connection':
# # Create a hugvey
# self.instantiateHugvey(hugvey_id, msg)
# else:
logger.warning(
"Message from uninstantiated Hugvey {}".format(hugvey_id))
logger.debug("Message contains: {}".format(msg))
continue
else:
self.hugveys[hugvey_id].queueEvent(msg)
# await self.hugveys[hugvey_id].eventQueue.put(msg)
except Exception as e:
logger.critical(f"Exception while running event loop:")
logger.exception(e)
async def voiceListener(self, hugvey_id):
s = self.ctx.socket(zmq.REP) #: :type s: zmq.sugar.Socket
voiceAddr = f"ipc://voice{hugvey_id}"
s.bind(voiceAddr)
logger.debug("Listen for voice requests on: {}".format(voiceAddr))
while self.isRunning.is_set():
try:
r = await s.recv_json()
isVariable = bool(r['variable'])
text = r['text']
2019-04-09 09:40:50 +02:00
hv = self.hugveys[hugvey_id] #: :type hv: HugveyState
fn = await self.voiceStorage.requestFile(hv.language_code, text, isVariable)
if fn is None:
eventLogger.getChild(f"{hugvey_id}").critical("error: No voice file fetched, check logs.")
2019-05-11 15:23:55 +02:00
fn = 'local/crash.wav'
# TODO: trigger a repeat/crash event.
await s.send_string(fn)
except Exception as e:
logger.critical(f"Exception while running voice loop:")
logger.exception(e)
def start(self):
self.isRunning.set()
self.loop = asyncio.get_event_loop()
2019-01-18 19:39:35 +01:00
# self.panopticon_loop = asyncio.new_event_loop()
self.tasks = {} # collect tasks so we can cancel in case of error
self.tasks['eventListener'] = self.loop.create_task(
2019-02-18 20:38:54 +01:00
self.catchException(self.eventListener()))
self.tasks['commandSender'] = self.loop.create_task(
2019-02-18 20:38:54 +01:00
self.catchException(self.commandSender()))
self.tasks['lightSender'] = self.loop.create_task(
self.catchException(self.lightSender()))
self.tasks['redLightController'] = self.loop.create_task(
self.catchException(self.redLightController()))
2019-05-11 15:23:55 +02:00
for hid in self.hugvey_ids:
self.tasks['voiceListener'] = self.loop.create_task(
self.catchException(self.voiceListener(hid)))
2019-04-27 11:51:11 +02:00
self.instantiateHugvey(hid)
2019-01-18 19:39:35 +01:00
# we want the web interface in a separate thread
self.panopticon_thread = threading.Thread(
target=self.panopticon.start, name="Panopticon")
2019-01-18 19:39:35 +01:00
self.panopticon_thread.start()
2019-05-11 15:23:55 +02:00
self.loop.run_forever()
def stop(self):
self.isRunning.clear()
2019-05-11 15:23:55 +02:00
2019-02-18 20:38:54 +01:00
async def catchException(self, awaitable):
try:
2019-04-27 11:51:11 +02:00
# print(awaitable)
2019-02-18 20:38:54 +01:00
await awaitable
except Exception as e:
logger.exception(e)
logger.critical(f"Hugvey restart might be required but not implemented yet")
class HugveyState(object):
"""Represents the state of a Hugvey client on the server.
Manages server connections & voice parsing etc.
"""
2019-05-11 15:23:55 +02:00
# all statusses can only go up or down, except for gone, which is an error state:
2019-04-27 11:51:11 +02:00
# off <-> blocked <-> available <-> running <-> paused
STATE_OFF = "off"
STATE_BLOCKED = "blocked"
2019-04-27 11:51:11 +02:00
STATE_AVAILABLE = "available"
STATE_RUNNING = "running"
STATE_PAUSE = "paused"
STATE_GONE = "gone"
def __init__(self, id: int, command: CentralCommand):
self.id = id
self.lightId = id
self.command = command
self.logger = mainLogger.getChild(f"{self.id}").getChild("command")
self.loop = asyncio.new_event_loop()
2019-04-27 11:51:11 +02:00
self.isConfigured = None
2019-01-25 11:17:10 +01:00
self.isRunning = asyncio.Event(loop=self.loop)
2019-04-27 11:51:11 +02:00
self.isRunning.clear()
2019-05-11 15:23:55 +02:00
2019-01-18 12:42:50 +01:00
self.eventQueue = None
self.language_code = 'en-GB'
2019-01-25 11:17:10 +01:00
self.story = None
self.streamer = None
self.google = None
self.player = None
self.recorder = None
self.notShuttingDown = True # TODO: allow shutdown of object
2019-02-14 11:15:09 +01:00
self.startMsgId = None
self.eventLogger = eventLogger.getChild(f"{self.id}")
2019-05-11 15:23:55 +02:00
2019-04-27 11:51:11 +02:00
self.setStatus(self.STATE_GONE)
2019-05-11 15:23:55 +02:00
self.requireRestartAfterStop = None
def __del__(self):
self.logger.warn("Destroying hugvey object")
2019-05-11 15:23:55 +02:00
def getStatus(self):
return self.status
2019-05-11 15:23:55 +02:00
def setStatus(self, status):
self.status = status
2019-04-27 11:51:11 +02:00
lightOn = status in [self.STATE_AVAILABLE, self.STATE_PAUSE]
self.setLightStatus(lightOn)
2019-04-25 19:08:27 +02:00
self.eventLogger.info(f"status: {self.status}")
2019-05-11 15:23:55 +02:00
def config(self, hostname, ip):
self.ip = ip
self.hostname = hostname
2019-04-27 11:51:11 +02:00
if self.isConfigured is not None:
2019-01-17 20:28:55 +01:00
# a reconfiguration/reconnection
pass
2019-04-27 11:51:11 +02:00
else:
self.logger.info(
f"Hugvey {self.id} at {self.ip}, host: {self.hostname}")
2019-05-11 15:23:55 +02:00
2019-04-27 11:51:11 +02:00
if self.status == self.STATE_GONE:
# turn on :-)
self.setStatus(self.STATE_BLOCKED)
2019-04-27 11:51:11 +02:00
self.isConfigured = time.time()
2019-01-18 12:42:50 +01:00
def sendCommand(self, msg):
"""
Send message or command to hugvey
@param msg: The message to be sent. Probably a dict()
"""
self.command.commandHugvey(self.id, msg)
def run(self):
2019-04-27 11:51:11 +02:00
self.logger.info(f"Await hugvey #{self.id}")
2019-01-18 12:42:50 +01:00
tasks = asyncio.gather(
self.catchException(self.processAudio()),
self.catchException(self.handleEvents()),
self.catchException(self.playStory()),
loop=self.loop)
self.loop.run_until_complete(tasks)
self.logger.warning(f"FINISHED RUNNING {self.id}")
return self.requireRestartAfterStop
2019-01-18 12:42:50 +01:00
async def catchException(self, awaitable):
try:
# print(awaitable)
2019-01-18 12:42:50 +01:00
await awaitable
except Exception as e:
self.logger.exception(e)
self.logger.critical(f"Hugvey crash")
self.eventLogger.critical(f"error: {e}")
2019-05-11 15:23:55 +02:00
# restart
# TODO: test proper functioning
self.shutdown()
2019-01-18 12:42:50 +01:00
def queueEvent(self, msg):
if 'time' not in msg:
# add time, so we can track time passed
msg['time'] = time.time()
if not self.eventQueue:
self.logger.critical("No event queue to put {}".format(msg))
else:
# Allow for both the Hugvey Command, or the Story handle the event.
2019-01-25 13:11:00 +01:00
self.loop.call_soon_threadsafe(self._queueEvent, msg)
2019-05-11 15:23:55 +02:00
2019-01-25 13:11:00 +01:00
def _queueEvent(self, msg):
2019-04-27 11:51:11 +02:00
"""
Put event in both the event loop for the story as well as the Hugvey State handler
"""
# self.logger.debug(f"Queue event in hugvey loop: {msg}") # a little less logging please :-)
2019-01-25 13:11:00 +01:00
self.eventQueue.put_nowait(msg)
2019-05-11 15:23:55 +02:00
2019-04-27 11:51:11 +02:00
# connection events don't need to go to the story
if msg['event'] == 'connection':
return
2019-05-11 15:23:55 +02:00
if self.story:
self.story.events.append(msg)
else:
self.logger.critical("Cannot queue event, as no story is present.")
2019-01-18 12:42:50 +01:00
async def handleEvents(self):
self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues
while self.notShuttingDown:
try:
event = await asyncio.wait_for(self.eventQueue.get(), 2)
except asyncio.futures.TimeoutError as e:
2019-04-27 11:51:11 +02:00
# detect missing heartbeat:
if self.isConfigured and time.time() - self.isConfigured > 15:
self.logger.error("Hugvey did not send heartbeat.")
2019-05-11 18:12:25 +02:00
# self.gone()
self.shutdown()
continue
2019-05-11 15:23:55 +02:00
self.logger.debug("Received: {}".format(event))
2019-04-27 11:51:11 +02:00
if event['event'] == 'connection':
# 'event': 'connection',
# 'id': self.hugvey_id,
# 'host': socket.gethostname(),
# 'ip': self.getIp(),
self.config(event['host'], event['ip'])
2019-05-11 15:23:55 +02:00
if event['event'] == 'language':
2019-01-18 12:42:50 +01:00
self.setLanguage(event['code'])
2019-05-11 15:23:55 +02:00
if event['event'] == 'pause':
self.pause()
if event['event'] == 'block':
self.block()
if event['event'] == 'unblock':
2019-04-27 11:51:11 +02:00
self.available()
if event['event'] == 'restart':
self.restart()
if event['event'] == 'finish':
self.story._finish() # finish story AND hugvey state
if event['event'] == 'resume':
self.resume()
2019-05-11 15:23:55 +02:00
2019-01-25 11:59:03 +01:00
if event['event'] == 'change_language':
self.setLanguage(event['lang_code'])
if event['event'] == 'change_light':
self.setLightId(event['light_id'])
2019-01-25 15:45:46 +01:00
if event['event'] == 'play_msg':
self.logger.info(f"Play given message {event['msg_id']}")
2019-01-25 15:45:46 +01:00
if not self.story:
self.logger.critical("No story to play message in")
else:
if self.story is None:
return
if event['reloadStory']:
self.startMsgId = event['msg_id']
self.logger.debug(f"Restart from {self.startMsgId}")
self.restart()
else:
msg = self.story.get(event['msg_id'])
await self.story.setCurrentMessage(msg)
2019-01-18 12:42:50 +01:00
self.eventQueue = None
2019-01-18 12:42:50 +01:00
def setLanguage(self, language_code):
self.configureLanguage(language_code)
if self.isRunning.is_set():
self.restart()
def configureLanguage(self, language_code):
2019-01-18 12:42:50 +01:00
if language_code not in self.command.languages:
raise Exception("Invalid language {}".format(language_code))
2019-05-11 15:23:55 +02:00
2019-01-25 11:59:03 +01:00
self.logger.info(f"set language: {language_code}")
2019-01-18 12:42:50 +01:00
self.language_code = language_code
2019-05-11 15:23:55 +02:00
if self.google:
self.google.setLanguage(language_code)
2019-05-11 15:23:55 +02:00
def pause(self, log = True):
if log:
self.logger.info('Pause')
if self.google:
self.google.pause()
2019-01-25 11:17:10 +01:00
if self.story:
self.story.pause()
self.isRunning.clear()
self.setStatus(self.STATE_PAUSE)
2019-05-11 15:23:55 +02:00
def resume(self, log = True):
"""Start playing without reset, also used to play from a saved state"""
if log:
self.logger.info('Resume')
if self.google:
self.google.resume()
2019-01-25 11:17:10 +01:00
if self.story:
self.story.resume()
self.isRunning.set()
self.setStatus(self.STATE_RUNNING)
2019-05-11 15:23:55 +02:00
def restart(self):
"""Start playing with reset"""
self.logger.info('Restart')
if Story.hugveyHasSavedState(self.lightId):
Story.clearSavedState(self.lightId)
2019-01-25 11:17:10 +01:00
if self.story:
2019-01-25 11:59:03 +01:00
self.story.stop()
self.resume(log=False)
2019-05-11 15:23:55 +02:00
def block(self):
"""Block a hugvey"""
self.logger.info('block')
if self.google:
self.google.pause()
if self.story:
self.story.finish()
self.isRunning.clear()
self.setStatus(self.STATE_BLOCKED)
2019-05-11 15:23:55 +02:00
2019-04-27 11:51:11 +02:00
def available(self):
"""Put in available mode"""
self.logger.info('Finish/Await')
# TODO: Toggle running if config says so, but turn light on
self.pause(log=False)
2019-04-27 11:51:11 +02:00
self.setStatus(self.STATE_AVAILABLE)
2019-05-11 15:23:55 +02:00
def setLightStatus(self, on):
status = 1 if on else 0
self.logger.log(LOG_BS, f"Send /hugvey {status}")
2019-05-11 15:23:55 +02:00
self.command.commandLight('/hugvey', [self.lightId, status])
2019-05-11 15:23:55 +02:00
def setLightId(self, id):
"""
Connect hugvey to another light
"""
self.lightId = id
2019-05-11 15:23:55 +02:00
def gone(self):
'''Status to 'gone' as in, shutdown/crashed/whatever
'''
self.pause(log=False)
2019-01-25 11:17:10 +01:00
if self.story:
self.story.stop()
2019-05-11 15:23:55 +02:00
2019-04-27 16:49:46 +02:00
self.logger.warn('Gone')
self.eventLogger.warn("Gone")
2019-04-27 11:51:11 +02:00
self.isConfigured = None
self.setStatus(self.STATE_GONE)
2019-05-11 15:23:55 +02:00
def shutdown(self, definitive = False):
self.logger.info(f"Start shutdown sequence {definitive}")
2019-04-25 19:08:27 +02:00
self.eventLogger.critical(f"error: shutting down")
if self.streamer:
self.streamer.stop()
if self.story:
self.story.shutdown()
self.story = None
2019-05-11 15:23:55 +02:00
# shutdown for stream consumers already ran. Only clear references
if self.google:
self.google = None
if self.player:
self.player = None
if self.recorder:
self.recorder = None
2019-05-11 15:23:55 +02:00
if self.requireRestartAfterStop is None:
# prevent double setting of the same variable
# first call sometimes triggers second
self.requireRestartAfterStop = not definitive
2019-05-11 15:23:55 +02:00
self.notShuttingDown = False
2019-05-11 15:23:55 +02:00
2019-01-18 12:42:50 +01:00
async def playStory(self):
2019-01-25 11:17:10 +01:00
while self.notShuttingDown:
try:
await asyncio.wait_for(self.isRunning.wait(), 1)
except asyncio.futures.TimeoutError as e:
# timeout + catch so we can shutdown if needed without infinite await
continue
else:
# new story instance on each run
port = self.command.config['web']['port']
resuming = False
if Story.hugveyHasSavedState(self.id):
self.logger.info(f"Recovering from state :-)")
self.story = Story.loadStoryFromState(self)
resuming = True
if self.story.language_code != self.language_code:
self.logger.info("Changing language")
self.configureLanguage(self.story.language_code)
else:
self.story = Story(self, port)
self.story.setStoryData(copy.deepcopy(self.command.languages[self.language_code]), self.language_code)
if not self.streamer:
await asyncio.sleep(1)
2019-05-11 15:23:55 +02:00
self.streamer.triggerStart()
startMsgId = self.startMsgId
self.startMsgId = None # use only once, reset before 'run'
if not startMsgId and self.story.currentMessage:
startMsgId = self.story.currentMessage.id
self.logger.info(f"Starting from {startMsgId}")
self.setLightStatus(False)
await self.story.run(startMsgId, resuming)
2019-01-25 11:17:10 +01:00
# self.story = None
2019-05-11 15:23:55 +02:00
def getStreamer(self):
if not self.streamer:
self.streamer = AudioStreamer(
self.command.config['voice']['chunk'],
self.ip,
2019-04-10 11:13:42 +02:00
int(self.command.config['voice']['port']) + self.id,
self.id)
2019-05-11 15:23:55 +02:00
if self.command.config['voyeur']:
self.logger.warn("Debug on: Connecting Audio player")
self.player = Player(
self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate'])
self.streamer.addConsumer(self.player)
2019-05-11 15:23:55 +02:00
if self.command.config['voice']['record_dir']:
self.logger.warn("Record Audio of conversation")
self.recorder = Recorder( self.id,
self.command.config['voice']['src_rate'], self.command.config['voice']['record_dir'])
self.streamer.addConsumer(self.recorder)
2019-05-11 15:23:55 +02:00
self.logger.debug("Start Speech")
self.google = GoogleVoiceClient(
hugvey=self,
src_rate=self.command.config['voice']['src_rate'],
credential_file=self.command.config['voice']['google_credentials'],
language_code=self.language_code
)
self.streamer.addConsumer(self.google)
return self.streamer
2019-01-18 12:42:50 +01:00
async def processAudio(self):
'''
Start the audio streamer service
'''
2019-05-11 15:23:55 +02:00
self.logger.debug("Start audio loop")
while self.notShuttingDown:
try:
await asyncio.wait_for(self.isRunning.wait(), 1)
except asyncio.futures.TimeoutError as e:
# timeout + catch so we can shutdown if needed without infinite await
continue
else:
self.logger.debug("Start audio stream")
await self.getStreamer().run()
self.logger.critical(f"stream has left the building from {self.ip}")
self.eventLogger.critical(f"error: stream has left the building from {self.ip}")
# if we end up here, the streamer finished, probably meaning hte hugvey shutdown
2019-05-11 18:12:25 +02:00
# self.gone()