From ad9a4b8f1343caf80020103f62fdef4569edc29b Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Tue, 15 Jan 2019 23:34:59 +0100 Subject: [PATCH] First client version --- client_config.yml | 8 ++++ hugvey/client.py | 86 +++++++++++++++++++++++++++-------------- hugvey/communication.py | 4 +- hugvey_client.py | 35 ++++------------- test_pub.py | 22 +++++++++++ 5 files changed, 98 insertions(+), 57 deletions(-) create mode 100644 client_config.yml create mode 100644 test_pub.py diff --git a/client_config.yml b/client_config.yml new file mode 100644 index 0000000..611621d --- /dev/null +++ b/client_config.yml @@ -0,0 +1,8 @@ +events: + cmd_address: "tcp://127.0.0.1:5555" + publish_address: "tcp://0.0.0.0:5556" +voice: + input_rate: 44100 + target_rate: 16000 + port: 4444 + input_name: null diff --git a/hugvey/client.py b/hugvey/client.py index 4cc907a..b740f11 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -8,13 +8,15 @@ import time import zmq import asyncio from zmq.asyncio import Context +import yaml +import re from .communication import zmqReceive, zmqSend, getTopic logger = logging.getLogger("client") class VoiceServer(object): """A UDP server, providing mic data at 16 kHz""" - def __init__(self, voice_port, input_rate, input_name = None, target_rate = 16000): + def __init__(self, voice_port: int, input_rate: int, input_name: str = None, target_rate: int = 16000): self.voice_port = voice_port self.input_rate = input_rate self.target_rate = target_rate @@ -54,7 +56,7 @@ class VoiceServer(object): pass return (None, pyaudio.paContinue) - async def start(self): + def start(self): FORMAT = pyaudio.paInt16 CHANNELS = 1 CHUNK = 4096 @@ -99,14 +101,21 @@ class VoiceServer(object): def stop(self): self.stopped = True + async def asyncStart(self, loop): + future = loop.run_in_executor(None, self.start) + r = await future + # await self.start() + class CommandHandler(object): - def __init__(self, hugvey_id, event_address = "tcp://127.0.0.1:5555"): + def __init__(self, hugvey_id, cmd_address = "tcp://127.0.0.1:5555", publish_address = "tcp://0.0.0.0:5555"): self.eventQueue = [] self.ctx = Context.instance() self.hugvey_id = hugvey_id - self.event_address = event_address + self.cmd_address = cmd_address + self.publish_address = publish_address def handle(self, cmd): + # self.sendMessage({'reply':'test'}) if not 'action' in cmd: logger.critical("Invalid command: {}".format(cmd)) return @@ -117,9 +126,11 @@ class CommandHandler(object): def cmdPlay(self, msgId, msgText): # espeak(msgText) - logger.inof("Play: {}".format(msgText)) + # TODO kill if playing & play wave file + # preferably a cat (local)/curl (remote) pipe into player + logger.info("Play: {}".format(msgText)) time.sleep(2) - sendMessage({ + self.sendMessage({ 'event': 'playbackFinish', 'msgId': msgId }) @@ -129,26 +140,25 @@ class CommandHandler(object): async def command_listener(self): s = self.ctx.socket(zmq.SUB) - s.connect(self.event_address) - queueName = 'hv{}'.format(self.hugvey_id) - s.subscribe(queueName) - logger.info("Subscribed to commands on {}".format(queueName)) - while True: - hugvey_id, msg = await zmqReceive(s) - # topic, msg = await s.recv_multipart() - print('received', msg) - s.close() - - async def event_sender(port): - s = self.ctx.socket(zmq.PUB) - s.connect(self.event_send_address) + s.connect(self.cmd_address) topic = getTopic(self.hugvey_id) s.subscribe(topic) + logger.info("Subscribed to commands for {} on {}".format(topic, self.cmd_address)) + while True: + hugvey_id, cmd = await zmqReceive(s) + self.handle(cmd) + # topic, msg = await s.recv_multipart() + # print('received', msg, time.time()) + s.close() - logger.info("Subscribed to commands on {}".format(topic)) + async def event_sender(self): + s = self.ctx.socket(zmq.PUB) + # TODO: see if we can connect() here. So we can PUSH(??) the ip + s.bind(self.publish_address) + logger.info("Publish on: {}".format(self.publish_address)) while True: for i in range(len(self.eventQueue)): - hugvey_id, msg = await zmqSend(s, self.hugvey_id, self.eventQueue.pop(0)) + await zmqSend(s, self.hugvey_id, self.eventQueue.pop(0)) if len(self.eventQueue) == 0: await asyncio.sleep(0.05) @@ -158,21 +168,41 @@ class Hugvey(object): """The Hugvey client, to be ran on the Raspberry Pi's """ def __init__(self): + self.id = self.getId() pass + def getId(self) -> int: + """Get Hugvey ID from hostname""" + h = socket.gethostname() + return int(re.findall('\d+', h )[0]) + def loadConfig(self, filename): - # filename - pass + with open(filename, 'r') as fp: + logger.debug('Load config from {}'.format(filename)) + self.config = yaml.safe_load(fp) async def startCommandListener(): return await self.cmd_server.command_listener() def start(self): - self.voice_server = VoiceServer(4444, 44100) - self.cmd_server = CommandHandler(1) + self.voice_server = VoiceServer( + voice_port = int(self.config['voice']['port']), + input_rate = int(self.config['voice']['input_rate']), + input_name = self.config['voice']['input_name'], + target_rate = int(self.config['voice']['target_rate']), + ) + self.cmd_server = CommandHandler( + hugvey_id = self.id, + cmd_address = self.config['events']['cmd_address'], + publish_address = self.config['events']['publish_address'], + ) loop = asyncio.get_event_loop() logger.info('start') - loop.run_until_complete(self.voice_server.start()) - loop.run_until_complete(self.cmd_server.command_listener()) - loop.run_until_complete(self.cmd_server.command_listener()) + # self.voice_server.asyncStart(loop) + # loop.run_until_complete(self.voice_server.start()) + asyncio.ensure_future(self.voice_server.asyncStart(loop)) + asyncio.ensure_future(self.cmd_server.command_listener()) + asyncio.ensure_future(self.cmd_server.event_sender()) + + loop.run_forever() logger.info('done') diff --git a/hugvey/communication.py b/hugvey/communication.py index d9a3eed..cc8bcac 100644 --- a/hugvey/communication.py +++ b/hugvey/communication.py @@ -7,11 +7,11 @@ def getTopic(hugvey_id): return "hv{}".format(hugvey_id) -def zmqSend(socket, hugvey_id, msg): +async def zmqSend(socket, hugvey_id, msg): msgData = json.dumps(msg) topic = getTopic(hugvey_id) logger.info("Send 0mq to {} containing {}".format(topic, msg)) - return socket.send_multipart([topic.encode(), msgData.encode()]) + await socket.send_multipart([topic.encode(), msgData.encode()]) async def zmqReceive(socket): topic, msg = await socket.recv_multipart() diff --git a/hugvey_client.py b/hugvey_client.py index 4a3ca89..282309c 100644 --- a/hugvey_client.py +++ b/hugvey_client.py @@ -2,34 +2,15 @@ from hugvey.client import Hugvey import coloredlogs, logging import argparse - - - if __name__ == '__main__': argParser = argparse.ArgumentParser(description='Start up a Hugvey pillow. Mic stream becomes available on TCP Socket, and starts listening + emitting events') - # argParser.add_argument( - # '--voice-port', - # required=True, - # type=int, - # help='The port on which to listen for TCP connections (listens on 0.0.0.0) for audio receivers' - # ) - # argParser.add_argument( - # '--event-address', - # type=str, - # default="127.0.0.1", - # help='The ip to which to set up the TCP connection for sending events. Can also be an existing unix file socket.' - # ) - # argParser.add_argument( - # '--event-port', - # type=int, - # help='The port on which to set up the TCP connection for sending events. Ignored if --event-address points to a file socket' - # ) - # argParser.add_argument( - # '--language-code', - # default="en-US", - # type=str, - # help='Language code for Speech to Text (BCP-47 language tag)' - # ) + argParser.add_argument( + '--config', + '-c', + required=True, + type=str, + help='The yaml config file to load' + ) argParser.add_argument( '--verbose', '-v', @@ -42,6 +23,6 @@ if __name__ == '__main__': level=logging.DEBUG if args.verbose else logging.INFO, ) - # server = VoiceServer(voice_port=4444, input_rate=44100) hv = Hugvey() + hv.loadConfig(args.config) hv.start() diff --git a/test_pub.py b/test_pub.py new file mode 100644 index 0000000..d9b5f7a --- /dev/null +++ b/test_pub.py @@ -0,0 +1,22 @@ +import zmq +import random +import sys +import time +import hugvey.communication + +port = "5555" +if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + +context = zmq.Context() +socket = context.socket(zmq.PUB) +socket.bind("tcp://*:%s" % port) + +while True: + # topic = random.randrange(9999,10005) + # topic = "hv" + str(random.randrange(1,3)) + messagedata = str(time.time()) + # print ("{} {}".format(topic, messagedata)) + hugvey.communication.zmqSend(socket, random.randrange(1,3), messagedata) + time.sleep(1)