Interruption diversions for specific timeouts etc.

This commit is contained in:
Ruben van de Ven 2019-05-01 18:27:10 +02:00
parent 1550bdeb80
commit 0861215794
6 changed files with 177 additions and 43 deletions

View file

@ -96,6 +96,7 @@ class CentralCommand(object):
def loadLanguages(self): def loadLanguages(self):
logger.debug('load language files') logger.debug('load language files')
self.languages = {} self.languages = {}
self.languageCache = {}
for lang in self.config['languages']: for lang in self.config['languages']:
lang_filename = os.path.join(self.config['web']['files_dir'], lang['file']) lang_filename = os.path.join(self.config['web']['files_dir'], lang['file'])
@ -511,7 +512,8 @@ class HugveyState(object):
event = await asyncio.wait_for(self.eventQueue.get(), 2) event = await asyncio.wait_for(self.eventQueue.get(), 2)
except asyncio.futures.TimeoutError as e: except asyncio.futures.TimeoutError as e:
# detect missing heartbeat: # detect missing heartbeat:
if self.isConfigured and time.time() - self.isConfigured > 5: if self.isConfigured and time.time() - self.isConfigured > 15:
self.logger.error("Hugvey did not send heartbeat.")
self.gone() self.gone()
continue continue

View file

@ -285,6 +285,10 @@ class CommandHandler(object):
duration = cmd['duration'] if 'duration' in cmd else None duration = cmd['duration'] if 'duration' in cmd else None
self.playingMsgId = msgId self.playingMsgId = msgId
if self.playPopen:
logger.info("Interrupting playback of {}".format(self.playingMsgId))
self.playPopen.terminate()
err = None err = None
if file is None and text is None: if file is None and text is None:
logger.critical("No file nor text given: {}".format(cmd)) logger.critical("No file nor text given: {}".format(cmd))

View file

@ -270,6 +270,6 @@ class Panopticon(object):
msg['args'] = items[2] msg['args'] = items[2]
j = json.dumps(msg) j = json.dumps(msg)
print(j) logger.debug(j)
self.loop.add_callback(wsHandler.write_to_clients, j) self.loop.add_callback(wsHandler.write_to_clients, j)

View file

@ -30,25 +30,25 @@ class AudioStreamer(object):
address = "tcp://{}:{}".format(self.address, self.port) address = "tcp://{}:{}".format(self.address, self.port)
self.ctx = Context.instance() self.ctx = Context.instance()
self.socket = self.ctx.socket(zmq.SUB) self.socket = self.ctx.socket(zmq.SUB)
self.socket.setsockopt(zmq.RCVTIMEO, 6000) # timeout: 8 sec self.socket.setsockopt(zmq.RCVTIMEO, 8000) # timeout: 8 sec
self.socket.subscribe('') self.socket.subscribe('')
# self.socket.setsockopt(zmq.CONFLATE, 1) # self.socket.setsockopt(zmq.CONFLATE, 1)
self.socket.connect(address) self.socket.connect(address)
# s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.logger.info("Attempt connection on {}:{}".format(self.address, self.port)) self.logger.info("0mq SUBSCRIBE for audio stream on {}:{}".format(self.address, self.port))
# s.connect((self.address, self.port)) # s.connect((self.address, self.port))
# #
try: while self.isRunning:
while self.isRunning: try:
data = await self.socket.recv() data = await self.socket.recv()
# self.logger.debug('chunk received') # self.logger.debug('chunk received')
self.process(data) self.process(data)
except zmq.error.Again as timeout_e: except zmq.error.Again as timeout_e:
self.logger.warn("Timeout of audiostream. Hugvey shutdown?") self.logger.warn("Timeout of audiostream. Hugvey shutdown?")
finally:
self.logger.info("Close socket on {}:{}".format(self.address, self.port)) self.logger.info("Close socket on {}:{}".format(self.address, self.port))
self.socket.close() self.socket.close()
def stop(self): def stop(self):
self.isRunning = False self.isRunning = False

View file

@ -14,6 +14,7 @@ from zmq.asyncio import Context
import zmq import zmq
import wave import wave
from pythonosc import udp_client from pythonosc import udp_client
import random
mainLogger = logging.getLogger("hugvey") mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("narrative") logger = mainLogger.getChild("narrative")
@ -254,11 +255,14 @@ class Condition(object):
self.type = None self.type = None
self.vars = {} self.vars = {}
self.logInfo = None self.logInfo = None
self.originalJsonString = None
self.usedContainsDuration = None
@classmethod @classmethod
def initFromJson(conditionClass, data, story): def initFromJson(conditionClass, data, story):
condition = conditionClass(data['@id']) condition = conditionClass(data['@id'])
condition.type = data['type'] condition.type = data['type']
condition.originalJsonString = json.dumps(data)
# TODO: should Condition be subclassed? # TODO: should Condition be subclassed?
if data['type'] == "replyContains": if data['type'] == "replyContains":
@ -416,6 +420,7 @@ class Condition(object):
capturedVariables, capturedVariables,
timeSinceReply timeSinceReply
) )
self.usedContainsDuration = float(delay['waitTime'])
return True return True
break # don't check other delays break # don't check other delays
# wait for delay to match # wait for delay to match
@ -487,6 +492,7 @@ class Diversion(object):
self.hasHit = False self.hasHit = False
self.disabled = False self.disabled = False
self.type = type self.type = type
self.counter = 0
if type == 'no_response': if type == 'no_response':
self.method = self._divergeIfNoResponse self.method = self._divergeIfNoResponse
self.finaliseMethod = self._returnAfterNoResponse self.finaliseMethod = self._returnAfterNoResponse
@ -505,6 +511,8 @@ class Diversion(object):
if type == 'repeat': if type == 'repeat':
self.method = self._divergeIfRepeatRequest self.method = self._divergeIfRepeatRequest
self.regex = re.compile(self.params['regex']) self.regex = re.compile(self.params['regex'])
if type == 'interrupt':
self.method = self._divergeIfInterrupted
if not self.method: if not self.method:
raise Exception("No valid type given for diversion") raise Exception("No valid type given for diversion")
@ -521,7 +529,7 @@ class Diversion(object):
'id': self.id, 'id': self.id,
} }
async def divergeIfNeeded(self, story, direction: None): async def divergeIfNeeded(self, story, direction = None):
""" """
Validate if condition is met for the current story state Validate if condition is met for the current story state
Returns True when diverging Returns True when diverging
@ -542,21 +550,30 @@ class Diversion(object):
direction.msgTo if direction else None, direction.msgTo if direction else None,
direction if direction else None) direction if direction else None)
if r: if r:
if self.type != 'repeat': if self.type != 'repeat' and self.type !='interrupt':
# repeat diversion should be usable infinte times # repeat diversion should be usable infinte times
self.hasHit = True self.hasHit = True
story.addToLog(self) story.addToLog(self)
return r return r
def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True): def createReturnDirectionsTo(self, story, startMsg, returnMsg, originalDirection = None, inheritTiming = True, timeoutDuration = .5, replyContainsDurations = None):
""" """
The finishes of this diversion's strand should point to the return message The finishes of this diversion's strand should point to the return message
with the right timeout/timing. If hit, this direction should also notify with the right timeout/timing. If hit, this direction should also notify
this diversion. this diversion.
replyContainsDurations: list formatted as in JSON
[{
"minReplyDuration": "0",
"waitTime": "3"
}]
""" """
self.counter +=1
finishMessageIds = story.getFinishesForMsg(startMsg) finishMessageIds = story.getFinishesForMsg(startMsg)
finalTimeoutDuration = 0.5 finalTimeoutDuration = timeoutDuration
finalContainsDurations = replyContainsDurations
#: :type story: Story #: :type story: Story
#: :type originalDirection: Direction #: :type originalDirection: Direction
# story.directionsPerMsg[story.currentMessage.id] # story.directionsPerMsg[story.currentMessage.id]
@ -568,7 +585,8 @@ class Diversion(object):
for condition in originalDirection.conditions: for condition in originalDirection.conditions:
if condition.type == 'timeout': if condition.type == 'timeout':
finalTimeoutDuration = float(condition.vars['seconds']) finalTimeoutDuration = float(condition.vars['seconds'])
break if condition.type == 'replyContains':
finalContainsDurations = json.loads(condition.originalJsonString)['vars']['delays']
i = 0 i = 0
for msgId in finishMessageIds: for msgId in finishMessageIds:
@ -578,21 +596,41 @@ class Diversion(object):
if not msg: if not msg:
continue continue
direction = Direction(f"{self.id}-{i}", msg, returnMsg) direction = Direction(f"{self.id}-{i}-{self.counter}", msg, returnMsg)
data = json.loads(f""" data = json.loads(f"""
{{ {{
"@id": "{self.id}-c{i}", "@id": "{self.id}-ct{i}-{self.counter}",
"@type": "Condition", "@type": "Condition",
"type": "timeout", "type": "timeout",
"label": "", "label": "Autogenerated Timeout",
"vars": {{ "vars": {{
"seconds": "{finalTimeoutDuration}", "seconds": "{finalTimeoutDuration}"
"onlyIfNoReply": false
}} }}
}} }}
""") """)
data['vars']['onlyIfNoReply'] = finalContainsDurations is not None
# TODO: also at replycontains if it exists, with the same timings
condition = Condition.initFromJson(data, story) condition = Condition.initFromJson(data, story)
direction.addCondition(condition) direction.addCondition(condition)
if finalContainsDurations is not None:
data2 = json.loads(f"""
{{
"@id": "{self.id}-cr{i}-{self.counter}",
"@type": "Condition",
"type": "replyContains",
"label": "Autogenerated Reply Contains",
"vars": {{
"regex": "",
"instantMatch": false
}} }}
""")
data2['vars']['delays'] = finalContainsDurations
condition2 = Condition.initFromJson(data2, story)
direction.addCondition(condition2)
story.add(condition2)
direction.isDiversionReturn = True # will clear the currentDiversion on story direction.isDiversionReturn = True # will clear the currentDiversion on story
story.logger.info(f"Created direction: {direction.id} {condition.id} with timeout {finalTimeoutDuration}s") story.logger.info(f"Created direction: {direction.id} {condition.id} with timeout {finalTimeoutDuration}s")
story.add(condition) story.add(condition)
@ -790,7 +828,7 @@ class Diversion(object):
# no direction is here, as this diversion triggers before a direction is taken # no direction is here, as this diversion triggers before a direction is taken
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False) self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False)
await story.setCurrentMessage(msg) await story.setCurrentMessage(msg, allowReplyInterrupt=True)
story.currentDiversion = self story.currentDiversion = self
story.timer.setMark('last_diversion_timeout') story.timer.setMark('last_diversion_timeout')
return True return True
@ -798,8 +836,31 @@ class Diversion(object):
async def _returnAfterTimeout(self, story): async def _returnAfterTimeout(self, story):
story.logger.info(f"Finalise diversion: {self.id}") story.logger.info(f"Finalise diversion: {self.id}")
# if self.params['returnAfterStrand']:
# await story.setCurrentMessage(self.returnMessage) async def _divergeIfInterrupted(self, story, msgFrom, msgTo, direction):
"""
This is here as a placeholder for the interruption diversion.
These will however be triggered differently
"""
if story.currentDiversion or story.allowReplyInterrupt:
return False
msg = story.get(self.params['msgId'])
if msg is None:
story.logger.critical(f"Not a valid message id for diversion: {self.params['msgId']}")
return
self.returnMessage = story.currentMessage
# no direction is here, as this diversion triggers before a direction is taken
self.createReturnDirectionsTo(story, msg, self.returnMessage, inheritTiming=False, timeoutDuration=3, replyContainsDurations = [{
"minReplyDuration": "0",
"waitTime": "2"
}])
await story.setCurrentMessage(msg)
story.currentDiversion = self
return True
storyClasses = { storyClasses = {
@ -875,9 +936,11 @@ class Story(object):
self.currentMessage = None self.currentMessage = None
self.currentDiversion = None self.currentDiversion = None
self.currentReply = None self.currentReply = None
self.allowReplyInterrupt = False
self.timer = Stopwatch() self.timer = Stopwatch()
self.isRunning = False self.isRunning = False
self.diversions = [] self.diversions = []
self.interruptionDiversions = []
self.variables = {} self.variables = {}
def pause(self): def pause(self):
@ -939,6 +1002,7 @@ class Story(object):
self.elements = {} self.elements = {}
self.strands = {} self.strands = {}
self.diversions = [] self.diversions = []
self.interruptionDiversions = []
self.directionsPerMsg = {} self.directionsPerMsg = {}
self.startMessage = None # The entrypoint to the graph self.startMessage = None # The entrypoint to the graph
self.variables = {} self.variables = {}
@ -953,6 +1017,7 @@ class Story(object):
# self.logger.debug(self.directionsPerMsg) # self.logger.debug(self.directionsPerMsg)
self.diversions = [el for el in self.elements.values() if type(el) == Diversion] self.diversions = [el for el in self.elements.values() if type(el) == Diversion]
self.interruptionDiversions = [el for el in self.elements.values() if type(el) == Diversion and el.type == 'interrupt']
if currentId: if currentId:
self.currentMessage = self.get(currentId) self.currentMessage = self.get(currentId)
@ -1107,21 +1172,28 @@ class Story(object):
# messages that come in, in the case google is faster than our playbackFinish event. # messages that come in, in the case google is faster than our playbackFinish event.
# (if this setup doesn't work, try to test on self.lastMsgFinish time anyway) # (if this setup doesn't work, try to test on self.lastMsgFinish time anyway)
# it keeps tricky with all these run conditions # it keeps tricky with all these run conditions
self.logger.info("ignore speech while playing message") if len(self.interruptionDiversions) and not self.currentDiversion and not self.allowReplyInterrupt:
continue self.logger.warn("diverge when speech during playing message")
diversion = random.choice(self.interruptionDiversions)
#: :type diversion: Diversion
r = await diversion.divergeIfNeeded(self, None)
print(r) # is always needed :-)
else:
self.logger.info("ignore speech during playing message")
continue
# DEPRECATED WAY OF DOING IT:
# message is still playing: # message is still playing:
if self.currentMessage and not self.lastMsgFinishTime and self.previousReply and self.previousReply.forMessage.interruptCount < 4: # if self.currentMessage and not self.lastMsgFinishTime and self.previousReply and self.previousReply.forMessage.interruptCount < 4:
timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime() # timeDiff = self.timer.getElapsed() - self.previousReply.forMessage.getFinishedTime()
if self.previousReply.forMessage.afterrunTime > timeDiff: # if self.previousReply.forMessage.afterrunTime > timeDiff:
#interrupt only in given interval: # #interrupt only in given interval:
self.logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id)) # self.logger.warn("Interrupt message, replay {}".format(self.previousReply.forMessage.id))
self.currentReply = self.previousReply # self.currentReply = self.previousReply
self.previousReply.forMessage.interruptCount += 1 # self.previousReply.forMessage.interruptCount += 1
self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage, self.previousReply) # self.currentMessage = await self.setCurrentMessage(self.previousReply.forMessage, self.previousReply)
# log if somebody starts speaking # log if somebody starts speaking
# TODO: implement interrupt
if self.currentReply is None: if self.currentReply is None:
self.logger.info("Start speaking") self.logger.info("Start speaking")
self.currentReply= Reply(self.currentMessage) self.currentReply= Reply(self.currentMessage)
@ -1143,6 +1215,7 @@ class Story(object):
async def _processDirections(self, directions): async def _processDirections(self, directions):
':type directions: list(Direction)' ':type directions: list(Direction)'
chosenDirection = None chosenDirection = None
metCondition = None
for direction in directions: for direction in directions:
for condition in direction.conditions: for condition in direction.conditions:
if condition.isMet(self): if condition.isMet(self):
@ -1150,6 +1223,7 @@ class Story(object):
condition.id, direction.msgTo.id)) condition.id, direction.msgTo.id))
self.hugvey.eventLogger.info("condition: {0}".format(condition.id)) self.hugvey.eventLogger.info("condition: {0}".format(condition.id))
self.hugvey.eventLogger.info("direction: {0}".format(direction.id)) self.hugvey.eventLogger.info("direction: {0}".format(direction.id))
metCondition = condition
direction.setMetCondition(condition) direction.setMetCondition(condition)
self.addToLog(condition) self.addToLog(condition)
self.addToLog(direction) self.addToLog(direction)
@ -1158,11 +1232,19 @@ class Story(object):
isDiverging = await self._processDiversions(chosenDirection) isDiverging = await self._processDiversions(chosenDirection)
allowReplyInterrupt = False
# in some cases, conditions should be allowed to interrupt the reply
if metCondition:
if metCondition.type == 'timeout' and not ('onlyIfNoReply' in metCondition.vars and metCondition.vars['onlyIfNoReply']):
allowReplyInterrupt = True
if metCondition.usedContainsDuration is not None and metCondition.usedContainsDuration < 0.1:
allowReplyInterrupt = True
if not isDiverging and chosenDirection: if not isDiverging and chosenDirection:
if chosenDirection.isDiversionReturn and self.currentDiversion: if chosenDirection.isDiversionReturn and self.currentDiversion:
await self.currentDiversion.finalise(self) await self.currentDiversion.finalise(self)
await self.setCurrentMessage(chosenDirection.msgTo) await self.setCurrentMessage(chosenDirection.msgTo, allowReplyInterrupt=allowReplyInterrupt)
return chosenDirection return chosenDirection
@ -1180,7 +1262,8 @@ class Story(object):
activeNoResponseDiv = None activeNoResponseDiv = None
for diversion in self.diversions: for diversion in self.diversions:
#: :type diversion: Diversion #: :type diversion: Diversion
if diversion.disabled or diversion.hasHit: if diversion.disabled or diversion.hasHit or diversion.type == 'interrupt':
# interruptions are triggered somewhere else.
continue continue
if diversion.type == 'timeout': if diversion.type == 'timeout':
@ -1269,7 +1352,7 @@ class Story(object):
self.logger.debug("Stop renderer") self.logger.debug("Stop renderer")
async def setCurrentMessage(self, message, useReply = None): async def setCurrentMessage(self, message, useReply = None, allowReplyInterrupt = False):
""" """
Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption. Use Reply allows to pre-initiate a reply to use with the message. This is used eg. when doing an interruption.
""" """
@ -1288,6 +1371,7 @@ class Story(object):
self.lastMsgTime = time.time() self.lastMsgTime = time.time()
self.lastMsgFinishTime = None # to be filled in by the event self.lastMsgFinishTime = None # to be filled in by the event
self.lastMsgStartTime = None # to be filled in by the event self.lastMsgStartTime = None # to be filled in by the event
self.allowReplyInterrupt = allowReplyInterrupt
# if not reset: # if not reset:
self.previousReply = self.currentReply # we can use this for interrptions self.previousReply = self.currentReply # we can use this for interrptions

View file

@ -367,6 +367,9 @@ class Graph {
div['params']['msgId'] = ""; div['params']['msgId'] = "";
div['params']['notForColor'] = ""; div['params']['notForColor'] = "";
} }
else if(type == 'interrupt') {
div['params']['msgId'] = "";
}
else if(type == 'timeout') { else if(type == 'timeout') {
div['params']['interval'] = 20; div['params']['interval'] = 20;
div['params']['timesOccured'] = 0; div['params']['timesOccured'] = 0;
@ -382,7 +385,7 @@ class Graph {
alert('invalid type for diversion'); alert('invalid type for diversion');
} }
if(type != 'repeat') { if(type != 'repeat' && type != 'interrupt') {
div['params']['notAfterMsgId'] = ""; div['params']['notAfterMsgId'] = "";
} }
@ -403,7 +406,7 @@ class Graph {
let msgEl = document.getElementById( 'msg' ); let msgEl = document.getElementById( 'msg' );
msgEl.innerHTML = ""; msgEl.innerHTML = "";
let divsNoResponse =[], divsRepeat = [], divsReplyContains = [], divsTimeouts = []; let divsNoResponse =[], divsRepeat = [], divsReplyContains = [], divsTimeouts = [], divsInterrupts = [];
for(let div of this.diversions) { for(let div of this.diversions) {
let notAfterMsgIdEl = ""; let notAfterMsgIdEl = "";
@ -691,9 +694,37 @@ class Graph {
) )
)); ));
} }
if(div['type'] == 'interrupt'){
let msgOptions = [crel('option',"")];
let starts = this.messages.filter( m => m.hasOwnProperty('start') && m['start'] == true);
for(let startMsg of starts) {
let optionParams = {};
if(div['params']['msgId'] == startMsg['@id']) {
optionParams['selected'] = 'selected';
}
msgOptions.push(crel('option', optionParams , startMsg['@id']));
}
divsInterrupts.push(crel(
'div', {'class': 'diversion'},
crel('h3', div['@id']),
crel(
'div', {
'class':'btn btn--delete',
'on': {
'click': (e) => this.deleteDiversion(div)
}
}, 'Delete diversion'),
crel('label', 'Go to (start message)',
crel('select', {'on': {
'change': (e) => div['params']['msgId'] = e.target.value
}}, ...msgOptions)
)
));
}
} }
console.log(divsReplyContains, divsNoResponse, divsRepeat, divsTimeouts); console.log(divsReplyContains, divsNoResponse, divsRepeat, divsTimeouts, divsInterrupts);
let divEl = crel( let divEl = crel(
'div', 'div',
@ -752,6 +783,19 @@ class Graph {
}, },
'New case for timeout' 'New case for timeout'
) )
),
crel('div',
crel('h2', 'Interruptions (random pick)'),
...divsInterrupts,
crel('div',
{
'class': 'btn',
'on': {
'click': (e) => this.createDiversion('interrupt')
}
},
'New case for Interrupt'
)
) )
); );