diff --git a/hugvey/central_command.py b/hugvey/central_command.py index f5b60f4..90555ae 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -17,13 +17,33 @@ from hugvey.story import Story from hugvey.voice.google import GoogleVoiceClient from hugvey.voice.player import Player from hugvey.voice.streamer import AudioStreamer +import queue logger = logging.getLogger("command") +# def exceptionEmitter(a): +# print(a) +# def decorate(func): +# print('decorate') +# async def call(*args, **kwargs): +# print('call') +# # pre(func, *args, **kwargs) +# try: +# result = await func(*args, **kwargs) +# except Exception as e: +# logger.critical(e, "in", func) +# raise e +# # post(func, *args, **kwargs) +# return result +# return call +# return decorate + + class CentralCommand(object): """docstring for CentralCommand.""" - def __init__(self, debug_mode = False): + + def __init__(self, debug_mode=False): self.debug = debug_mode self.eventQueue = asyncio.Queue() self.commandQueue = asyncio.Queue() @@ -31,39 +51,67 @@ class CentralCommand(object): self.hugveys = {} self.ctx = Context.instance() self.hugveyLock = asyncio.Lock() - + self.start_time = time.time() def loadConfig(self, filename): if hasattr(self, 'config'): raise Exception("Overriding config not supported yet") - + with open(filename, 'r') as fp: logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) - - self.hugvey_ids = [i+1 for i in range(self.config['hugveys'])] - + + self.hugvey_ids = [i + 1 for i in range(self.config['hugveys'])] + # load languages: self.languages = {} - + for lang in self.config['languages']: with open(lang['file'], 'r') as fp: self.languages[lang['code']] = yaml.load(fp) - + self.panopticon = Panopticon(self, self.config) - - + + def getHugveyStatus(self, hv_id): + status = {'id': hv_id} + if not hv_id in self.hugveys: + status['status'] = 'off' + return status + + hv = self.hugveys[hv_id] + status['status'] = 'running' if hv.isRunning.is_set() else 'paused' + status['language'] = hv.language_code + status['msg'] = hv.story.currentMessage.id + status['counts'] = hv.story.getStoryCounts() + status['finished'] = hv.story.isFinished() + + + return status + + def getStatusSummary(self): + status = { + 'uptime': time.time() - self.start_time, + 'languages': self.config['languages'], + 'hugveys': [], + } + + for hv_id in self.hugvey_ids: + status['hugveys'].append(self.getHugveyStatus(hv_id)) + + return status + def commandHugvey(self, hv_id, msg): """ prepare command to be picked up by the sender """ if threading.current_thread().getName() != 'MainThread': # Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse) - # won't trigger asyncios queue.get() so we have to do this thread safe, in the right loop - self.loop.call_soon_threadsafe( self._queueCommand, hv_id, msg ) + # won't trigger asyncios queue.get() so we have to do this thread + # safe, in the right loop + self.loop.call_soon_threadsafe(self._queueCommand, hv_id, msg) else: self._queueCommand(hv_id, msg) - + def _queueCommand(self, hv_id, msg): self.commandQueue.put_nowait((hv_id, msg)) # if msg['action'] == 'play': @@ -72,155 +120,166 @@ class CentralCommand(object): # 'msg': "This is an interrption", # 'id': 'test', # })) - + def commandAllHugveys(self, msg): for hv_id in self.hugvey_ids: self.commandHugvey(hv_id, msg) - + def commandAllActiveHugveys(self, msg): for hv_id in self.hugveys: self.commandHugvey(hv_id, msg) - + async def commandSender(self): s = self.ctx.socket(zmq.PUB) s.bind(self.config['events']['cmd_address']) - + self.commandAllHugveys({'action': 'show_yourself'}) - + # sleep to allow pending connections to connect await asyncio.sleep(1) - logger.info("Ready to publish commands on: {}".format(self.config['events']['cmd_address'])) - logger.debug('Already {} items in queue'.format(self.commandQueue.qsize())) - + logger.info("Ready to publish commands on: {}".format( + self.config['events']['cmd_address'])) + logger.debug('Already {} items in queue'.format( + self.commandQueue.qsize())) + while self.isRunning.is_set(): hv_id, cmd = await self.commandQueue.get() logger.info('Got command to send: {} {}'.format(hv_id, cmd)) zmqSend(s, hv_id, cmd) - + logger.warn('Stopping command sender') s.close() - + async def instantiateHugvey(self, hugvey_id, msg): ''' Start a HugveyState, according to a show_yourself reply - + 'event': 'connection', 'id': self.hugvey_id, 'host': socket.gethostname(), 'ip': self.getIp(), ''' - async with self.hugveyLock: # lock to prevent duplicates on creation + async with self.hugveyLock: # lock to prevent duplicates on creation if not hugvey_id in self.hugveys: logger.info(f'Instantiate hugvey #{hugvey_id}') + print('a') h = HugveyState(hugvey_id, self) - h.config(msg['host'],msg['ip']) + print('a') + h.config(msg['host'], msg['ip']) + print('b') self.hugveys[hugvey_id] = h - thread = threading.Thread(target=h.start, name=f"hugvey#{hugvey_id}") + thread = threading.Thread( + target=h.start, name=f"hugvey#{hugvey_id}") thread.start() + print('c') else: logger.info(f'Reconfigure hugvey #{hugvey_id}') # (re)configure exisitng hugveys - h.config(msg['host'],msg['ip']) - - - + h.config(msg['host'], msg['ip']) + async def eventListener(self): s = self.ctx.socket(zmq.SUB) s.bind(self.config['events']['listen_address']) - logger.info("Listen for events on: {}".format(self.config['events']['listen_address'])) - + logger.info("Listen for events on: {}".format( + self.config['events']['listen_address'])) + for id in self.hugvey_ids: s.subscribe(getTopic(id)) - + while self.isRunning.is_set(): - hugvey_id, msg = await zmqReceive(s) - - if hugvey_id not in self.hugvey_ids: - logger.critical("Message from alien Hugvey: {}".format(hugvey_id)) - continue - elif hugvey_id not in self.hugveys: - if msg['event'] == 'connection': - # Create a hugvey - await self.instantiateHugvey(hugvey_id, msg) + try: + hugvey_id, msg = await zmqReceive(s) + + if hugvey_id not in self.hugvey_ids: + logger.critical( + "Message from alien Hugvey: {}".format(hugvey_id)) + continue + elif hugvey_id not in self.hugveys: + if msg['event'] == 'connection': + # Create a hugvey + await self.instantiateHugvey(hugvey_id, msg) + else: + logger.warning( + "Message from uninstantiated Hugvey {}".format(hugvey_id)) + logger.debug("Message contains: {}".format(msg)) + continue else: - logger.warning("Message from uninstantiated Hugvey {}".format(hugvey_id)) - logger.debug("Message contains: {}".format(msg)) - continue - else: - await self.hugveys[hugvey_id].eventQueue.put(msg) - pass - -# def getPanopticon(self): -# self.panopticon = - + await self.hugveys[hugvey_id].eventQueue.put(msg) + except Exception as e: + logger.critical(f"Exception while running event loop:") + logger.exception(e) + + def start(self): self.isRunning.set() self.loop = asyncio.get_event_loop() # self.panopticon_loop = asyncio.new_event_loop() - - self.tasks = {} # collect tasks so we can cancel in case of error - self.tasks['eventListener'] = self.loop.create_task(self.eventListener()) - self.tasks['commandSender'] = self.loop.create_task(self.commandSender()) - - print(threading.current_thread()) + + self.tasks = {} # collect tasks so we can cancel in case of error + self.tasks['eventListener'] = self.loop.create_task( + self.eventListener()) + self.tasks['commandSender'] = self.loop.create_task( + self.commandSender()) + # we want the web interface in a separate thread - self.panopticon_thread = threading.Thread(target=self.panopticon.start, name="Panopticon") + self.panopticon_thread = threading.Thread( + target=self.panopticon.start, name="Panopticon") self.panopticon_thread.start() - print(threading.current_thread()) self.loop.run_forever() - + def stop(self): self.isRunning.clear() - class HugveyState(object): """Represents the state of a Hugvey client on the server. Manages server connections & voice parsing etc. """ + def __init__(self, id: int, command: CentralCommand): + self.id = id self.command = command self.logger = logging.getLogger(f"hugvey{self.id}") self.loop = asyncio.new_event_loop() self.isConfigured = False + self.isRunning = threading.Event() self.eventQueue = None self.language_code = 'en-GB' self.story = Story(self) self.story.setStoryData(self.command.languages[self.language_code]) - + def config(self, hostname, ip): self.ip = ip self.hostname = hostname - self.logger.info(f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") - + self.logger.info( + f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") + if self.isConfigured == True: # a reconfiguration/reconnection pass - + self.isConfigured = True - + def sendCommand(self, msg): """ Send message or command to hugvey @param msg: The message to be sent. Probably a dict() """ self.command.commandHugvey(self.id, msg) - + def start(self): """ Start the tasks """ - # stop on isRunning.is_set() or wait() -# self.loop.create_task(self.processAudio()) tasks = asyncio.gather( self.catchException(self.processAudio()), self.catchException(self.handleEvents()), self.catchException(self.playStory()), loop=self.loop) self.loop.run_until_complete(tasks) -# asyncio.run_coroutine_threadsafe(self._start(), self.loop) + self.isRunning.set() async def catchException(self, awaitable): try: @@ -228,7 +287,7 @@ class HugveyState(object): except Exception as e: logger.exception(e) logger.critical(f"Hugvey restart required but not implemented yet") - + # TODO: restart def queueEvent(self, msg): @@ -243,51 +302,74 @@ class HugveyState(object): self.story.events.append(msg) async def handleEvents(self): - self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues + self.eventQueue = asyncio.Queue() # start event queue here, to avoid loop issues while self.command.isRunning.is_set(): event = await self.eventQueue.get() self.logger.info("Received: {}".format(event)) - - if event['event'] =='language': + + if event['event'] == 'language': self.setLanguage(event['code']) + if event['event'] == 'pause': + self.pause() + if event['event'] == 'restart': + self.restart() + if event['event'] == 'resume': + self.resume() + self.eventQueue = None - + def setLanguage(self, language_code): if language_code not in self.command.languages: raise Exception("Invalid language {}".format(language_code)) - + self.language_code = language_code self.google.setLanguage(language_code) - + self.story.reset() self.story.setStoryData(self.command.languages[language_code]) + def pause(self): + self.google.pause() + self.story.pause() + self.isRunning.clear() + + def resume(self): + self.google.resume() + self.story.resume() + self.isRunning.set() + + def restart(self): + self.story.reset() + self.resume() + self.isRunning.set() + async def playStory(self): await self.story.start() - + async def processAudio(self): ''' Start the audio streamer service ''' self.logger.info("Start audio stream") streamer = AudioStreamer( - self.command.config['voice']['chunk'], + self.command.config['voice']['chunk'], self.ip, int(self.command.config['voice']['port'])) - + if self.command.debug: self.logger.warn("Debug on: Connecting Audio player") - self.player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) + self.player = Player( + self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) streamer.addConsumer(self.player) - + self.logger.info("Start Speech") self.google = GoogleVoiceClient( hugvey=self, src_rate=self.command.config['voice']['src_rate'], credential_file=self.command.config['voice']['google_credentials'], language_code=self.language_code - ) + ) streamer.addConsumer(self.google) - + await streamer.run() diff --git a/hugvey/panopticon.py b/hugvey/panopticon.py index b33d824..adf8b23 100644 --- a/hugvey/panopticon.py +++ b/hugvey/panopticon.py @@ -11,28 +11,73 @@ import tornado.ioloop import os from pytz.reference import Central import asyncio +import json logger = logging.getLogger("panopticon") -web_dir = os.path.join(os.path.split(__file__)[0], '..','www') +web_dir = os.path.join(os.path.split(__file__)[0], '..', 'www') print(web_dir) -class WebSocketHandler(tornado.websocket.WebSocketHandler): - connections = set() - # the client connected - def open(self): - self.connections.add(self) - print("New client connected") +def getWebSocketHandler(central_command): + class WebSocketHandler(tornado.websocket.WebSocketHandler): + connections = set() - # the client sent the message - def on_message(self, message): - [con.write_message(message) for con in self.connections] + # the client connected + def open(self): + self.connections.add(self) + logger.info("New client connected") - # client disconnected - def on_close(self): - self.connections.remove(self) - print("Client disconnected") + # the client sent the message + def on_message(self, message): + try: + msg = json.loads(message) + if msg['action'] == 'init': + self.msgInit() + if msg['action'] == 'get_status': + self.msgStatus() + if msg['action'] == 'resume': + self.msgResume(msg['hugvey']) + if msg['action'] == 'pause': + self.msgPause(msg['hugvey']) + if msg['action'] == 'restart': + self.msgRestart(msg['hugvey']) + + except Exception as e: + self.send({'alert': 'Invalid request: {}'.format(e)}) + + def send(self, message): + j = json.dumps(message) + [con.write_message(j) for con in self.connections] + + # client disconnected + def on_close(self): + self.connections.remove(self) + logger.info("Client disconnected") + + def getStatusMsg(self): + msg = central_command.getStatusSummary() + msg['action'] = 'status' + + return msg + + def msgStatus(self): + self.send(self.getStatusMsg()) + + def msgInit(self): + msg = self.getStatusMsg() + self.send(msg) + + def msgResume(self, hv_id): + central_command.hugveys[hv_id].eventQueue.put({'event': 'resume'}) + + def msgPause(self, hv_id): + central_command.hugveys[hv_id].eventQueue.put({'event': 'pause'}) + + def msgRestart(self, hv_id): + central_command.hugveys[hv_id].eventQueue.put({'event': 'restart'}) + + return WebSocketHandler class Panopticon(object): @@ -40,22 +85,23 @@ class Panopticon(object): self.command = central_command self.config = config self.application = tornado.web.Application([ - (r"/ws", WebSocketHandler), - (r"/uploads/(.*)", tornado.web.StaticFileHandler, {"path": config['web']['files_dir']}), - (r"/(.*)", tornado.web.StaticFileHandler, {"path": web_dir, "default_filename": 'index.html'}), + (r"/ws", getWebSocketHandler(self.command)), + (r"/uploads/(.*)", tornado.web.StaticFileHandler, + {"path": config['web']['files_dir']}), + (r"/(.*)", tornado.web.StaticFileHandler, + {"path": web_dir, "default_filename": 'index.html'}), ], debug=True) - - + self.application.listen(config['web']['port']) # self.loop.configure(evt_loop) def start(self): evt_loop = asyncio.new_event_loop() asyncio.set_event_loop(evt_loop) - + self.loop = tornado.ioloop.IOLoop.current() logger.info(f"Start Panopticon on port {self.config['web']['port']}") self.loop.start() - + def stop(self): self.loop.stop() diff --git a/hugvey/story.py b/hugvey/story.py index d54f72d..a5dee2f 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -7,6 +7,7 @@ import asyncio logger = logging.getLogger("narrative") + class Message(object): def __init__(self, id, text): self.id = id @@ -16,9 +17,9 @@ class Message(object): @classmethod def initFromJson(message, data, story): - msg = message(data['@id'], data['text']) + msg = message(data['@id'], data['text']) msg.isStart = data['start'] if 'start' in data else False - return msg; + return msg def setReply(self, text): self.reply = text @@ -28,7 +29,8 @@ class Message(object): def getReply(self): if self.reply is None: - raise Exception("Getting reply while there is none! {0}".format(self.id)) + raise Exception( + "Getting reply while there is none! {0}".format(self.id)) return self.reply @@ -38,6 +40,7 @@ class Condition(object): A condition, basic conditions are built in, custom condition can be given by providing a custom method. """ + def __init__(self, id): self.id = id self.method = None @@ -45,7 +48,7 @@ class Condition(object): @classmethod def initFromJson(conditionClass, data, story): - condition = conditionClass(data['@id']) + condition = conditionClass(data['@id']) # TODO: should Condition be subclassed? if data['type'] == "replyContains": condition.method = condition._hasMetReplyContains @@ -55,7 +58,7 @@ class Condition(object): if 'vars' in data: condition.vars = data['vars'] - return condition; + return condition def isMet(self, story): """ @@ -98,6 +101,7 @@ class Direction(object): """ A condition based edge in the story graph """ + def __init__(self, id, msgFrom: Message, msgTo: Message): self.id = id self.msgFrom = msgFrom @@ -111,18 +115,19 @@ class Direction(object): def initFromJson(direction, data, story): msgFrom = story.get(data['source']) msgTo = story.get(data['target']) - direction = direction(data['@id'], msgFrom, msgTo) + direction = direction(data['@id'], msgFrom, msgTo) if 'conditions' in data: for conditionId in data['conditions']: c = story.get(conditionId) direction.addCondition(c) - return direction; + return direction class Interruption(object): """ An Interruption. Used to catch events outside of story flow. """ + def __init__(self, id): self.id = id self.conditions = [] @@ -132,12 +137,13 @@ class Interruption(object): @classmethod def initFromJson(interruptionClass, data, story): - interrupt = interruptionClass(data['@id']) + interrupt = interruptionClass(data['@id']) if 'conditions' in data: for conditionId in data['conditions']: c = story.get(conditionId) interrupt.addCondition(c) - return interrupt; + return interrupt + storyClasses = { 'Msg': Message, @@ -146,34 +152,103 @@ storyClasses = { 'Interruption': Interruption, } + +class Stopwatch(object): + """ + Keep track of elapsed time. Use multiple markers, but a single pause/resume button + """ + def __init__(self): + self.isRunning = asyncio.Event() + self.reset() + + def getElapsed(self, since_mark='start'): + t = time.time() + if self.paused_at != 0: + pause_duration = t - self.paused_at + else: + pause_duration = 0 + return t - self.marks[since_mark] - pause_duration + + def pause(self): + self.paused_at = time.time() + self.isRunning.clear() + + def resume(self): + if self.paused_at == 0: + return + + pause_duration = time.time() - self.paused_at + for m in self.marks: + self.marks[m] += pause_duration + + self.paused_at = 0 + self.isRunning.set() + + def reset(self): + self.marks = {} + self.setMark('start') + self.paused_at = 0 + self.isRunning.set() + + def setMark(self, name): + self.marks[name] = time.time() + + + def clearMark(self, name): + if name in self.marks: + self.marks.pop(name) + class Story(object): """Story represents and manages a story/narrative flow""" - #TODO should we separate 'narrative' (the graph) from the story (the current user flow) + # TODO should we separate 'narrative' (the graph) from the story (the + # current user flow) + def __init__(self, hugvey_state): super(Story, self).__init__() self.hugvey = hugvey_state - - self.events = [] # queue of received events - self.commands = [] # queue of commands to send - self.log = [] # all nodes/elements that are triggered - self.currentMessage = None + self.events = [] # queue of received events + self.commands = [] # queue of commands to send + self.log = [] # all nodes/elements that are triggered + self.currentMessage = None + self.timer = Stopwatch() + + def pause(self): + logger.debug('pause hugvey') + self.timer.pause() + + def resume(self): + logger.debug('resume hugvey') + self.timer.resume() + + def getStoryCounts(self): +# counts = {} +# for item in self.log: +# n =item.__class__.__name__ +# if n not in counts: +# counts[n] = 0 +# counts[n] += 1 +# return counts + return { + 'messages': len([e for e in self.log if isinstance(e, Message)]), + 'interruptions': len([e for e in self.log if isinstance(e, Interruption)]) + } def setStoryData(self, story_data): """ Parse self.data into a working story engine """ self.data = story_data - + # keep to be able to reset it in the end currentId = self.currentMessage.id if self.currentMessage else None - + self.elements = {} self.interruptions = [] self.directionsPerMsg = {} - self.startMessage = None # The entrypoint to the graph + self.startMessage = None # The entrypoint to the graph self.reset() - + for el in self.data: className = storyClasses[el['@type']] obj = className.initFromJson(el, self) @@ -181,22 +256,31 @@ class Story(object): logger.debug(self.elements) logger.debug(self.directionsPerMsg) - + if currentId: self.currentMessage = self.get(currentId) if self.currentMessage: - logger.info(f"Reinstantiated current message: {self.currentMessage.id}") + logger.info( + f"Reinstantiated current message: {self.currentMessage.id}") else: - logger.warn("Could not reinstatiate current message. Starting over") + logger.warn( + "Could not reinstatiate current message. Starting over") def reset(self): - self.startTime = time.time() - self.currentMessage = None # currently active message, determines active listeners etc. + self.timer.reset() +# self.startTime = time.time() + # currently active message, determines active listeners etc. + self.currentMessage = None self.lastMsgTime = None self.lastSpeechStartTime = None self.lastSpeechEndTime = None - self.variables = {} # captured variables from replies + self.variables = {} # captured variables from replies + self.finish_time = False + self.events = [] # queue of received events + self.commands = [] # queue of commands to send + self.log = [] # all nodes/elements that are triggered + def add(self, obj): if obj.id in self.elements: # print(obj) @@ -223,25 +307,24 @@ class Story(object): return self.elements[id] return None - def stop(self): logger.info("Stop Story") if self.isRunning: self.isRunning = False - def _processPendingEvents(self): # Gather events: nr = len(self.events) for i in range(nr): e = self.events.pop(0) - logger.info("handle '{}'".format( e )) + logger.info("handle '{}'".format(e)) if e['event'] == "exit": self.stop() if e['event'] == 'connect': # a client connected. Shold only happen in the beginning or in case of error # that is, until we have a 'reset' or 'start' event. - self.setCurrentMessage(self.currentMessage) # reinitiate current message + # reinitiate current message + self.setCurrentMessage(self.currentMessage) if e['event'] == "playbackFinish": if e['msgId'] == self.currentMessage.id: @@ -254,6 +337,7 @@ class Story(object): if e['event'] == 'speech': # log if somebody starts speaking + # TODO: use pausing timer if self.lastSpeechStartTime is None or self.lastSpeechStartTime < self.lastMsgTime: self.lastSpeechStartTime = e['time'] @@ -266,7 +350,8 @@ class Story(object): for direction in directions: for condition in direction.conditions: if condition.isMet(self): - logger.info("Condition is met: {0}, going to {1}".format(condition.id, direction.msgTo.id)) + logger.info("Condition is met: {0}, going to {1}".format( + condition.id, direction.msgTo.id)) self.log.append(condition) self.log.append(direction) self.setCurrentMessage(direction.msgTo) @@ -276,20 +361,21 @@ class Story(object): """ every 1/10 sec. determine what needs to be done based on the current story state """ - loopDuration = 0.1 # Configure fps + loopDuration = 0.1 # Configure fps lastTime = time.time() logger.info("Start renderer") while self.isRunning: if self.isRunning is False: break + + # pause on timer paused + await self.timer.isRunning.wait() # wait for un-pause for i in range(len(self.events)): self._processPendingEvents() if self.currentMessage.id not in self.directionsPerMsg: - # TODO: finish! - - pass + self.finish() directions = self.getCurrentDirections() self._processDirections(directions) @@ -307,9 +393,11 @@ class Story(object): def setCurrentMessage(self, message): self.currentMessage = message self.lastMsgTime = time.time() - self.lastMsgFinishTime = None # to be filled in by the event + self.lastMsgFinishTime = None # to be filled in by the event - logger.info("Current message: ({0}) \"{1}\"".format(message.id, message.text)) + logger.info("Current message: ({0}) \"{1}\"".format( + message.id, message.text)) + self.log.append(message) # TODO: prep events & timer etc. self.hugvey.sendCommand({ 'action': 'play', @@ -321,8 +409,8 @@ class Story(object): for direction in self.getCurrentDirections(): conditions = [c.id for c in direction.conditions] - logger.debug("- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions)) - + logger.debug( + "- {0} -> {1} (when: {2}) ".format(direction.msgFrom.id, direction.msgTo.id, conditions)) def getCurrentDirections(self): if self.currentMessage.id not in self.directionsPerMsg: @@ -332,10 +420,21 @@ class Story(object): async def start(self): logger.info("Starting story") - self.startTime = time.time() + self.timer.reset() +# self.startTime = time.time() self.isRunning = True self.setCurrentMessage(self.startMessage) await self._renderer() - - + def isFinished(self): + if hasattr(self, 'finish_time'): + return self.finish_time + + return False + + def finish(self): + logger.info(f"Finished story for {self.hugvey.id}") + self.hugvey.pause() + self.finish_time = time.time() + self.timer.pause() + diff --git a/hugvey/voice/google.py b/hugvey/voice/google.py index 1e50ee7..21003f5 100644 --- a/hugvey/voice/google.py +++ b/hugvey/voice/google.py @@ -32,17 +32,26 @@ class GoogleVoiceClient(object): # Create a thread-safe buffer of audio data self.buffer = queue.Queue() - self.isRunning = False + self.isRunning = threading.Event() + self.toBeShutdown = False self.target_rate = 16000 self.cv_laststate = None self.restart = False + self.task = threading.Thread(target=self.run, name=f"hugvey#{self.hugvey.id}v") self.task.setDaemon(True) self.task.start() + + def pause(self): + self.isRunning.clear() + self.restart = True + + def resume(self): + self.isRunning.set() def generator(self): - while self.isRunning: + while not self.toBeShutdown: yield self.buffer.get() def setLanguage(self, language_code): @@ -54,11 +63,13 @@ class GoogleVoiceClient(object): self.restart = True def run(self): - self.isRunning = True + self.isRunning.set() - while self.isRunning: + while not self.toBeShutdown: try: + self.isRunning.wait() + self.speech_client = speech.SpeechClient() config = types.RecognitionConfig( encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16, @@ -122,7 +133,7 @@ class GoogleVoiceClient(object): self.restart = False raise RequireRestart("Restart required") - if not self.isRunning: + if self.toBeShutdown: logger.warn("Stopping voice loop") break except RequireRestart as e: @@ -142,7 +153,7 @@ class GoogleVoiceClient(object): self.buffer.put_nowait(data) def shutdown(self): - self.isRunning = False + self.toBeShutdown = True diff --git a/local/.gitignore b/local/.gitignore new file mode 100644 index 0000000..a5baada --- /dev/null +++ b/local/.gitignore @@ -0,0 +1,3 @@ +* +!.gitignore + diff --git a/server_config.yml b/server_config.yml index 63e77db..b8cc1d0 100644 --- a/server_config.yml +++ b/server_config.yml @@ -7,7 +7,7 @@ voice: port: 4444 chunk: 2972 google_credentials: "/home/ruben/Documents/Projecten/2018/Hugvey/test_googlespeech/My First Project-0c7833e0d5fa.json" -hugveys: 3 +hugveys: 25 languages: - code: en-GB file: story_en.json diff --git a/www/hugvey_console.js b/www/hugvey_console.js new file mode 100644 index 0000000..436e11c --- /dev/null +++ b/www/hugvey_console.js @@ -0,0 +1,100 @@ +var panopticon; + +class Panopticon { + constructor() { + console.log( "Init panopticon" ); + this.hugveys = new Vue( { + el: "#status", + data: { + uptime: 0, + languages: [], + hugveys: [] + }, + methods: { + time_passed: function (hugvey, property) { + console.log("property!", Date(hugvey[property] * 1000)); + return moment(Date(hugvey[property] * 1000)).fromNow(); + } + } + } ); + + + this.socket = new ReconnectingWebSocket( "ws://localhost:8888/ws", null, { debug: true, reconnectInterval: 3000 } ); + + + this.socket.addEventListener( 'open', ( e ) => { + this.send( { action: 'init' } ); + } ); + + this.socket.addEventListener( 'close', function( e ) { + console.log( 'Closed connection' ); + } ); + this.socket.addEventListener( 'message', ( e ) => { + let msg = JSON.parse( e.data ); + if ( typeof msg['alert'] !== 'undefined' ) { + alert(msg['alert']); + } + + if ( typeof msg['action'] === 'undefined' ) { + console.error( "not a valid message: " + e.data ); + return; + } + + switch ( msg['action'] ) { + + case 'status': + this.hugveys.uptime = this.stringToHHMMSS(msg['uptime']); + this.hugveys.languages = msg['languages']; + this.hugveys.hugveys = msg['hugveys']; + break; + } + } ); + } + + send( msg ) { + this.socket.send( JSON.stringify( msg ) ); + } + + getStatus() { +// console.log('get status', this, panopticon); + panopticon.send( { action: 'get_status' } ); + } + + init() { + setInterval( this.getStatus, 3000 ); + } + + stringToHHMMSS (string) { + var sec_num = parseInt(string, 10); // don't forget the second param + var hours = Math.floor(sec_num / 3600); + var minutes = Math.floor((sec_num - (hours * 3600)) / 60); + var seconds = sec_num - (hours * 3600) - (minutes * 60); + + if (hours < 10) {hours = "0"+hours;} + if (minutes < 10) {minutes = "0"+minutes;} + if (seconds < 10) {seconds = "0"+seconds;} + return hours+':'+minutes+':'+seconds; + } + + + loadNarrative(code, file) { + + } + + resume(hv_id) { + this.send({ action: 'resume', hugvey: hv_id }) + } + pause(hv_id) { + this.send({ action: 'play', hugvey: hv_id }) + } + restart(hv_id) { + this.send({ action: 'restart', hugvey: hv_id }) + } +} + + + +window.addEventListener( 'load', function() { + panopticon = new Panopticon(); + panopticon.init(); +}) \ No newline at end of file diff --git a/www/index.html b/www/index.html index 3f40d66..0734a71 100644 --- a/www/index.html +++ b/www/index.html @@ -1,17 +1,43 @@
-, or missing
. Bailing hydration and performing ' + + 'full client-side render.' + ); + } + } + // either not server-rendered, or hydration failed. + // create an empty node and replace it + oldVnode = emptyNodeAt(oldVnode); + } + + // replacing existing element + var oldElm = oldVnode.elm; + var parentElm = nodeOps.parentNode(oldElm); + + // create new node + createElm( + vnode, + insertedVnodeQueue, + // extremely rare edge case: do not insert if old element is in a + // leaving transition. Only happens when combining transition + + // keep-alive + HOCs. (#4590) + oldElm._leaveCb ? null : parentElm, + nodeOps.nextSibling(oldElm) + ); + + // update parent placeholder node element, recursively + if (isDef(vnode.parent)) { + var ancestor = vnode.parent; + var patchable = isPatchable(vnode); + while (ancestor) { + for (var i = 0; i < cbs.destroy.length; ++i) { + cbs.destroy[i](ancestor); + } + ancestor.elm = vnode.elm; + if (patchable) { + for (var i$1 = 0; i$1 < cbs.create.length; ++i$1) { + cbs.create[i$1](emptyNode, ancestor); + } + // #6513 + // invoke insert hooks that may have been merged by create hooks. + // e.g. for directives that uses the "inserted" hook. + var insert = ancestor.data.hook.insert; + if (insert.merged) { + // start at index 1 to avoid re-invoking component mounted hook + for (var i$2 = 1; i$2 < insert.fns.length; i$2++) { + insert.fns[i$2](); + } + } + } else { + registerRef(ancestor); + } + ancestor = ancestor.parent; + } + } + + // destroy old node + if (isDef(parentElm)) { + removeVnodes(parentElm, [oldVnode], 0, 0); + } else if (isDef(oldVnode.tag)) { + invokeDestroyHook(oldVnode); + } + } + } + + invokeInsertHook(vnode, insertedVnodeQueue, isInitialPatch); + return vnode.elm + } + } + + /* */ + + var directives = { + create: updateDirectives, + update: updateDirectives, + destroy: function unbindDirectives (vnode) { + updateDirectives(vnode, emptyNode); + } + }; + + function updateDirectives (oldVnode, vnode) { + if (oldVnode.data.directives || vnode.data.directives) { + _update(oldVnode, vnode); + } + } + + function _update (oldVnode, vnode) { + var isCreate = oldVnode === emptyNode; + var isDestroy = vnode === emptyNode; + var oldDirs = normalizeDirectives$1(oldVnode.data.directives, oldVnode.context); + var newDirs = normalizeDirectives$1(vnode.data.directives, vnode.context); + + var dirsWithInsert = []; + var dirsWithPostpatch = []; + + var key, oldDir, dir; + for (key in newDirs) { + oldDir = oldDirs[key]; + dir = newDirs[key]; + if (!oldDir) { + // new directive, bind + callHook$1(dir, 'bind', vnode, oldVnode); + if (dir.def && dir.def.inserted) { + dirsWithInsert.push(dir); + } + } else { + // existing directive, update + dir.oldValue = oldDir.value; + callHook$1(dir, 'update', vnode, oldVnode); + if (dir.def && dir.def.componentUpdated) { + dirsWithPostpatch.push(dir); + } + } + } + + if (dirsWithInsert.length) { + var callInsert = function () { + for (var i = 0; i < dirsWithInsert.length; i++) { + callHook$1(dirsWithInsert[i], 'inserted', vnode, oldVnode); + } + }; + if (isCreate) { + mergeVNodeHook(vnode, 'insert', callInsert); + } else { + callInsert(); + } + } + + if (dirsWithPostpatch.length) { + mergeVNodeHook(vnode, 'postpatch', function () { + for (var i = 0; i < dirsWithPostpatch.length; i++) { + callHook$1(dirsWithPostpatch[i], 'componentUpdated', vnode, oldVnode); + } + }); + } + + if (!isCreate) { + for (key in oldDirs) { + if (!newDirs[key]) { + // no longer present, unbind + callHook$1(oldDirs[key], 'unbind', oldVnode, oldVnode, isDestroy); + } + } + } + } + + var emptyModifiers = Object.create(null); + + function normalizeDirectives$1 ( + dirs, + vm + ) { + var res = Object.create(null); + if (!dirs) { + // $flow-disable-line + return res + } + var i, dir; + for (i = 0; i < dirs.length; i++) { + dir = dirs[i]; + if (!dir.modifiers) { + // $flow-disable-line + dir.modifiers = emptyModifiers; + } + res[getRawDirName(dir)] = dir; + dir.def = resolveAsset(vm.$options, 'directives', dir.name, true); + } + // $flow-disable-line + return res + } + + function getRawDirName (dir) { + return dir.rawName || ((dir.name) + "." + (Object.keys(dir.modifiers || {}).join('.'))) + } + + function callHook$1 (dir, hook, vnode, oldVnode, isDestroy) { + var fn = dir.def && dir.def[hook]; + if (fn) { + try { + fn(vnode.elm, dir, vnode, oldVnode, isDestroy); + } catch (e) { + handleError(e, vnode.context, ("directive " + (dir.name) + " " + hook + " hook")); + } + } + } + + var baseModules = [ + ref, + directives + ]; + + /* */ + + function updateAttrs (oldVnode, vnode) { + var opts = vnode.componentOptions; + if (isDef(opts) && opts.Ctor.options.inheritAttrs === false) { + return + } + if (isUndef(oldVnode.data.attrs) && isUndef(vnode.data.attrs)) { + return + } + var key, cur, old; + var elm = vnode.elm; + var oldAttrs = oldVnode.data.attrs || {}; + var attrs = vnode.data.attrs || {}; + // clone observed objects, as the user probably wants to mutate it + if (isDef(attrs.__ob__)) { + attrs = vnode.data.attrs = extend({}, attrs); + } + + for (key in attrs) { + cur = attrs[key]; + old = oldAttrs[key]; + if (old !== cur) { + setAttr(elm, key, cur); + } + } + // #4391: in IE9, setting type can reset value for input[type=radio] + // #6666: IE/Edge forces progress value down to 1 before setting a max + /* istanbul ignore if */ + if ((isIE || isEdge) && attrs.value !== oldAttrs.value) { + setAttr(elm, 'value', attrs.value); + } + for (key in oldAttrs) { + if (isUndef(attrs[key])) { + if (isXlink(key)) { + elm.removeAttributeNS(xlinkNS, getXlinkProp(key)); + } else if (!isEnumeratedAttr(key)) { + elm.removeAttribute(key); + } + } + } + } + + function setAttr (el, key, value) { + if (el.tagName.indexOf('-') > -1) { + baseSetAttr(el, key, value); + } else if (isBooleanAttr(key)) { + // set attribute for blank value + // e.g. + if (isFalsyAttrValue(value)) { + el.removeAttribute(key); + } else { + // technically allowfullscreen is a boolean attribute for