Add highwater mark for client zmq to lower level

This commit is contained in:
Hugvey Central Command 2019-05-13 19:48:13 +02:00
parent 2acfb1c321
commit 019479d916

View file

@ -38,7 +38,7 @@ class VoiceServer(object):
def __init__(self, loop, hugvey, config): def __init__(self, loop, hugvey, config):
self.config = config self.config = config
self.input_rate = self.config['voice']['input_rate'] self.input_rate = self.config['voice']['input_rate']
self.target_rate = self.config['voice']['target_rate'] self.target_rate = self.config['voice']['target_rate']
self.stopped = True self.stopped = True
@ -47,28 +47,28 @@ class VoiceServer(object):
self.ctx = Context.instance() self.ctx = Context.instance()
self.loop = loop self.loop = loop
self.hugvey = hugvey self.hugvey = hugvey
self.chunk = 4096 self.chunk = 4096
self.mic_prerol_sec = .2 self.mic_prerol_sec = .2
self.prerol_frame_count = math.ceil((self.input_rate / self.chunk) * self.mic_prerol_sec) self.prerol_frame_count = math.ceil((self.input_rate / self.chunk) * self.mic_prerol_sec)
self.prerol_frames = collections.deque(maxlen = self.prerol_frame_count) self.prerol_frames = collections.deque(maxlen = self.prerol_frame_count)
self.p = pyaudio.PyAudio() self.p = pyaudio.PyAudio()
# wait a sec for the input devices to come up # wait a sec for the input devices to come up
logger.debug("Use a mic prerol of {} frames".format(self.prerol_frame_count)) logger.debug("Use a mic prerol of {} frames".format(self.prerol_frame_count))
logger.debug('wait for mic') logger.debug('wait for mic')
time.sleep(3) time.sleep(3)
logger.debug('done waiting for mic') logger.debug('done waiting for mic')
self.info = self.get_card_info() self.info = self.get_card_info()
def get_card_info(self): def get_card_info(self):
output_device_idx = None output_device_idx = None
input_device_idx = None input_device_idx = None
devices_count = self.p.get_device_count() devices_count = self.p.get_device_count()
for i in range(devices_count): for i in range(devices_count):
dev = self.p.get_device_info_by_index(i) dev = self.p.get_device_info_by_index(i)
@ -88,13 +88,13 @@ class VoiceServer(object):
"< " if output_device_idx == i else "> " if input_device_idx == i else "- ", i, dev['name'], "< " if output_device_idx == i else "> " if input_device_idx == i else "- ", i, dev['name'],
dev['maxInputChannels'], dev['maxInputChannels'],
dev['maxOutputChannels'])) dev['maxOutputChannels']))
# Don't continue without pyAudio indexes # Don't continue without pyAudio indexes
if input_device_idx is None: if input_device_idx is None:
raise Exception("Input device is not found: {}".format(self.config['voice']['input_name'])) raise Exception("Input device is not found: {}".format(self.config['voice']['input_name']))
if output_device_idx is None: if output_device_idx is None:
raise Exception("Output device is not found: {}".format(self.config['voice']['output_name'])) raise Exception("Output device is not found: {}".format(self.config['voice']['output_name']))
try: try:
# get eg: "hw:1,0" or "hw:0,3" -> used by Sox' play # get eg: "hw:1,0" or "hw:0,3" -> used by Sox' play
output_device_name = self.p.get_device_info_by_index(output_device_idx)['name'].split("(",1)[1][:-1] output_device_name = self.p.get_device_info_by_index(output_device_idx)['name'].split("(",1)[1][:-1]
@ -109,28 +109,28 @@ class VoiceServer(object):
except IndexError as e: except IndexError as e:
input_device_name = None input_device_name = None
input_card_name = None input_card_name = None
logger.debug("Output: {}, Input: {}".format(output_device_name, input_device_name)) logger.debug("Output: {}, Input: {}".format(output_device_name, input_device_name))
return { return {
'input': { 'input': {
'idx': input_device_idx, 'idx': input_device_idx,
'device': input_device_name, 'device': input_device_name,
'card': input_card_name 'card': input_card_name
}, },
'output': { 'output': {
'idx': output_device_idx, 'idx': output_device_idx,
'device': output_device_name, 'device': output_device_name,
'card': output_card_name 'card': output_card_name
} }
} }
# #
# def get_output_idxs(self): # def get_output_idxs(self):
# pass # pass
# #
# def get_input_idx(self): # def get_input_idx(self):
# input_device_idx = None # input_device_idx = None
# # input_device_idx = 6 # # input_device_idx = 6
@ -159,14 +159,14 @@ class VoiceServer(object):
try: try:
if self.hugvey.cmd_server.muteMic: if self.hugvey.cmd_server.muteMic:
logger.log(LOG_BS, 'block recording {}' .format( # logger.log(LOG_BS, 'block recording {}' .format(
self.hugvey.cmd_server.muteMic)) # self.hugvey.cmd_server.muteMic))
# multiply by 0 to disable audio recording while playback # multiply by 0 to disable audio recording while playback
f = audioop.mul(f, 2, 0) f = audioop.mul(f, 2, 0)
self.loop.call_soon_threadsafe(self.voice_socket.send, f)
self.loop.call_soon_threadsafe(self.voice_socket.send, f)
except Exception as e: except Exception as e:
logger.warn("Error sending to {}".format(e)) logger.warn("Error sending to {}".format(e))
pass pass
@ -177,10 +177,10 @@ class VoiceServer(object):
CHANNELS = 1 CHANNELS = 1
CHUNK = 4096 CHUNK = 4096
self.stopped = False self.stopped = False
if 'alsaaudio' in sys.modules: if 'alsaaudio' in sys.modules:
if self.config['voice']['input_mixer'] and self.config['voice']['input_volume'] and self.info['input']['card']: if self.config['voice']['input_mixer'] and self.config['voice']['input_volume'] and self.info['input']['card']:
logger.info("Set input volume on {}/{} to {}".format( logger.info("Set input volume on {}/{} to {}".format(
@ -190,7 +190,7 @@ class VoiceServer(object):
)) ))
alsaaudio.Mixer(self.config['voice']['input_mixer'], device=self.info['input']['card']).setvolume( alsaaudio.Mixer(self.config['voice']['input_mixer'], device=self.info['input']['card']).setvolume(
self.config['voice']['input_volume']) self.config['voice']['input_volume'])
if self.config['voice']['output_mixer'] and self.config['voice']['output_volume'] and self.info['output']['card']: if self.config['voice']['output_mixer'] and self.config['voice']['output_volume'] and self.info['output']['card']:
logger.info("Set output volume on {}/{} to {}".format( logger.info("Set output volume on {}/{} to {}".format(
self.config['voice']['output_mixer'], self.config['voice']['output_mixer'],
@ -214,6 +214,7 @@ class VoiceServer(object):
try: try:
address = "tcp://*:{}".format(self.config['voice']['port'] + self.hugvey.id) address = "tcp://*:{}".format(self.config['voice']['port'] + self.hugvey.id)
self.voice_socket = self.ctx.socket(zmq.PUB) self.voice_socket = self.ctx.socket(zmq.PUB)
self.voice_socket.set_hwm(100)
self.voice_socket.bind(address) self.voice_socket.bind(address)
logger.info( logger.info(
@ -263,7 +264,7 @@ class CommandHandler(object):
return return
logger.info("Received {}".format(cmd)) logger.info("Received {}".format(cmd))
if cmd['action'] == 'show_yourself': if cmd['action'] == 'show_yourself':
self.showMyself() self.showMyself()
if cmd['action'] == 'prepare': if cmd['action'] == 'prepare':
@ -275,7 +276,7 @@ class CommandHandler(object):
def cmdPlay(self, cmd): def cmdPlay(self, cmd):
self.muteMic = True self.muteMic = True
msgId = cmd['id'] msgId = cmd['id']
pitch = cmd['pitch'] if 'pitch' in cmd else 50 pitch = cmd['pitch'] if 'pitch' in cmd else 50
file = cmd['file'] if 'file' in cmd else None file = cmd['file'] if 'file' in cmd else None
@ -284,7 +285,7 @@ class CommandHandler(object):
# use duration for timing the popen duration (and redo it if needed) # use duration for timing the popen duration (and redo it if needed)
duration = cmd['duration'] if 'duration' in cmd else None duration = cmd['duration'] if 'duration' in cmd else None
self.playingMsgId = msgId self.playingMsgId = msgId
if self.playPopen: if self.playPopen:
logger.info("Interrupting playback of {}".format(self.playingMsgId)) logger.info("Interrupting playback of {}".format(self.playingMsgId))
self.playPopen.terminate() self.playPopen.terminate()
@ -298,7 +299,7 @@ class CommandHandler(object):
file = self.file_address + "/" + file file = self.file_address + "/" + file
# logger.debug(['play', file]) # logger.debug(['play', file])
playCmd = ['play', file] playCmd = ['play', file]
for param, value in params.items(): for param, value in params.items():
if not value: if not value:
continue continue
@ -312,9 +313,9 @@ class CommandHandler(object):
environment_vars['AUDIODRIVER'] = self.play_audiodriver environment_vars['AUDIODRIVER'] = self.play_audiodriver
elif self.play_audiodev is not None: elif self.play_audiodev is not None:
environment_vars['AUDIODEV'] = self.play_audiodev environment_vars['AUDIODEV'] = self.play_audiodev
logger.debug(playCmd) logger.debug(playCmd)
t = None t = None
if duration is not None: if duration is not None:
t = threading.Timer(duration+3, self.checkPopen, (msgId,)) t = threading.Timer(duration+3, self.checkPopen, (msgId,))
@ -330,10 +331,10 @@ class CommandHandler(object):
returnCode = self.playPopen.returncode if self.playPopen else 0 returnCode = self.playPopen.returncode if self.playPopen else 0
logger.debug('finished') logger.debug('finished')
self.playPopen = None self.playPopen = None
if t is not None: if t is not None:
t.cancel() t.cancel()
else: else:
logger.info("Speak: {}".format(text)) logger.info("Speak: {}".format(text))
playCmd = ['espeak', '-p', '{0}'.format(pitch), text] playCmd = ['espeak', '-p', '{0}'.format(pitch), text]
@ -354,14 +355,14 @@ class CommandHandler(object):
'event': 'playbackFinish', 'event': 'playbackFinish',
'msgId': msgId 'msgId': msgId
}) })
def checkPopen(self, msgId): def checkPopen(self, msgId):
if self.playingMsgId != msgId: if self.playingMsgId != msgId:
return return
if self.playPopen is None: if self.playPopen is None:
return return
# prevent a lock of the story, no repeat or anything for now # prevent a lock of the story, no repeat or anything for now
logger.critical("Interrupting playback after timeout") logger.critical("Interrupting playback after timeout")
self.playPopen.terminate() self.playPopen.terminate()
@ -430,7 +431,7 @@ class CommandHandler(object):
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
s.close() s.close()
async def heartbeat(self): async def heartbeat(self):
while True: while True:
self.showMyself() self.showMyself()
@ -466,15 +467,15 @@ class Hugvey(object):
def start(self): def start(self):
logger.debug('Hugvey {}, reporting'.format(self.id)) logger.debug('Hugvey {}, reporting'.format(self.id))
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.voice_server = VoiceServer( self.voice_server = VoiceServer(
loop=self.loop, loop=self.loop,
hugvey=self, hugvey=self,
config=self.config config=self.config
) )
self.cmd_server = CommandHandler( self.cmd_server = CommandHandler(
hugvey_id=self.id, hugvey_id=self.id,
cmd_address=self.config['events']['cmd_address'], cmd_address=self.config['events']['cmd_address'],
@ -483,7 +484,7 @@ class Hugvey(object):
play_audiodev=self.voice_server.info['output']['device'], play_audiodev=self.voice_server.info['output']['device'],
play_audiodriver=self.config['voice']['output_driver'] if 'output_driver' in self.config['voice'] else None, play_audiodriver=self.config['voice']['output_driver'] if 'output_driver' in self.config['voice'] else None,
) )
logger.info('start') logger.info('start')
# self.voice_server.asyncStart(loop) # self.voice_server.asyncStart(loop)
# loop.run_until_complete(self.voice_server.start()) # loop.run_until_complete(self.voice_server.start())