From 30b2f9e8af16876144ba75b8c35094b64c7f7aef Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Fri, 18 Jan 2019 15:40:43 +0100 Subject: [PATCH] Basic speaking interaction now working with google + espeak --- .gitignore | 1 + hugvey/central_command.py | 17 ++++++++++- hugvey/client.py | 60 +++++++++++++++++++++++---------------- requirements.server.txt | 3 +- requirements.txt | 1 - 5 files changed, 53 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 2932f58..1b2ac94 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ venv *.pyc .project .pydevproject +venv3 diff --git a/hugvey/central_command.py b/hugvey/central_command.py index 735b273..9b357d0 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -56,7 +56,21 @@ class CentralCommand(object): """ prepare command to be picked up by the sender """ + if threading.current_thread().getName() != 'MainThread': + # Threading nightmares! Adding to queue from other thread/loop (not sure which is the isse) + # won't trigger asyncios queue.get() so we have to do this thread safe, in the right loop + self.loop.call_soon_threadsafe( self._queueCommand, hv_id, msg ) + else: + self._queueCommand(hv_id, msg) + + def _queueCommand(self, hv_id, msg): self.commandQueue.put_nowait((hv_id, msg)) +# if msg['action'] == 'play': +# self.commandQueue.put_nowait((hv_id, { +# 'action': 'play', +# 'msg': "This is an interrption", +# 'id': 'test', +# })) def commandAllHugveys(self, msg): for hv_id in self.hugvey_ids: @@ -70,7 +84,6 @@ class CentralCommand(object): s = self.ctx.socket(zmq.PUB) s.bind(self.config['events']['cmd_address']) - self.commandAllHugveys({'action': 'show_yourself'}) # sleep to allow pending connections to connect @@ -80,8 +93,10 @@ class CentralCommand(object): while self.isRunning.is_set(): hv_id, cmd = await self.commandQueue.get() + logger.info('Got command to send: {} {}'.format(hv_id, cmd)) zmqSend(s, hv_id, cmd) + logger.warn('Stopping command sender') s.close() async def instantiateHugvey(self, hugvey_id, msg): diff --git a/hugvey/client.py b/hugvey/client.py index da3a0a3..557858f 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -1,14 +1,18 @@ -import pyaudio -import socket +import asyncio import audioop import logging -import time -import zmq -import asyncio -from zmq.asyncio import Context -import yaml +import pyaudio import re +import socket +import threading +import time +import yaml +import zmq +from zmq.asyncio import Context + from .communication import zmqReceive, zmqSend, getTopic +import subprocess + logger = logging.getLogger("client") @@ -83,22 +87,11 @@ class VoiceServer(object): try: address = "tcp://*:{}".format(self.voice_port) self.voice_socket = self.ctx.socket(zmq.PUB) -# self.voice_socket.setsockopt(zmq.CONFLATE, 1) self.voice_socket.bind(address) -# self.voice_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -# self.voice_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -# address = ('', self.voice_port) -# self.voice_socket.bind(address) -# self.voice_socket.listen(5) - -# read_list = [self.voice_socket] logger.info( "Waiting for voice connections on {}".format(address) ) while not self.stopped: await asyncio.sleep(1) -# (clientsocket, address) = self.voice_socket.accept() -# logger.info( "Got voice connection from {}".format(address)) -# self.clients.append(clientsocket) logger.info( "Stop recording & streaming") self.voice_socket.close() @@ -117,7 +110,6 @@ class VoiceServer(object): async def asyncStart(self, loop): future = loop.run_in_executor(None, self.start) r = await future - # await self.start() class CommandHandler(object): def __init__(self, hugvey_id, cmd_address = "tcp://127.0.0.1:5555", publish_address = "tcp://0.0.0.0:5555"): @@ -126,9 +118,11 @@ class CommandHandler(object): self.hugvey_id = hugvey_id self.cmd_address = cmd_address self.publish_address = publish_address + self.playPopen = None # self.showMyself() # queue message for connection request def handle(self, cmd): + print('handle', cmd) # self.sendMessage({'reply':'test'}) if not 'action' in cmd: logger.critical("Invalid command: {}".format(cmd)) @@ -141,16 +135,30 @@ class CommandHandler(object): self.cmdPlay(cmd['id'], cmd['msg']) - def cmdPlay(self, msgId, msgText): - # espeak(msgText) - # TODO kill if playing & play wave file - # preferably a cat (local)/curl (remote) pipe into player + def cmdPlay(self, msgId, msgText, pitch=50): logger.info("Play: {}".format(msgText)) - time.sleep(2) + self.playPopen = subprocess.Popen(['espeak', '-p','{0}'.format(pitch), msgText], stdout=subprocess.PIPE) + returnCode = self.playPopen.wait() + self.playPopen = None + + if returnCode: + logger.warn("Had returncode on play: {}".format(returnCode)) + else: + logger.debug("Finished playback. Return code: {}".format(returnCode)) + + self.sendMessage({ 'event': 'playbackFinish', 'msgId': msgId }) + + def cmdStop(self, msgId): + if self.playPopen: + logger.info("Interrupting playback") + try: + self.playPopen.terminate() + except Exception as e: + logger.critical("Could not stop playback: {}".format(e)) def showMyself(self): """Publish about this hugvey to central command @@ -180,7 +188,9 @@ class CommandHandler(object): logger.info("Subscribed to commands for {} on {}".format(topic, self.cmd_address)) while True: hugvey_id, cmd = await zmqReceive(s) - self.handle(cmd) +# print("GOGOG", hugvey_id, cmd) + t = threading.Thread(target=self.handle, args=(cmd,)) + t.start() # topic, msg = await s.recv_multipart() # print('received', msg, time.time()) s.close() diff --git a/requirements.server.txt b/requirements.server.txt index 667b814..931fba4 100644 --- a/requirements.server.txt +++ b/requirements.server.txt @@ -2,5 +2,4 @@ pyzmq pyaudio coloredlogs pyyaml -audioop -google-cloud-speech \ No newline at end of file +google-cloud-speech diff --git a/requirements.txt b/requirements.txt index 5aa5e19..c40d64f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,3 @@ pyzmq pyaudio coloredlogs pyyaml -audioop \ No newline at end of file