diff --git a/.gitignore b/.gitignore index 339feb3..2932f58 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ venv *.pyc +.project +.pydevproject diff --git a/hugvey/central_command.py b/hugvey/central_command.py new file mode 100644 index 0000000..1fcdce2 --- /dev/null +++ b/hugvey/central_command.py @@ -0,0 +1,166 @@ +""" +"Conscript reporting" +This server controls all hugveys and the processing of their narratives. It exposes itself for control to the panopticon server. +""" + + +import asyncio +import logging +import yaml +import zmq + +from hugvey import panopticon +from hugvey.voice.streamer import AudioStreamer +from hugvey.voice.player import Player +from hugvey.communication import getTopic, zmqSend, zmqReceive +from pandas.conftest import ip +from zmq.asyncio import Context +import threading + + +logger = logging.getLogger("command") + +class CentralCommand(object): + """docstring for CentralCommand.""" + def __init__(self, debug_mode = False): + self.debug = debug_mode + self.eventQueue = asyncio.Queue() + self.commandQueue = asyncio.Queue() + self.isRunning = threading.Event() + self.hugveys = {} + self.ctx = Context.instance() + + + def loadConfig(self, filename): + with open(filename, 'r') as fp: + logger.debug('Load config from {}'.format(filename)) + self.config = yaml.safe_load(fp) + + + + self.hugvey_ids = [i+1 for i in range(self.config['hugveys'])] + + def commandHugvey(self, hv_id, msg): + self.commandQueue.put_nowait((hv_id, msg)) + + def commandAllHugveys(self, msg): + for hv_id in self.hugvey_ids: + self.commandHugvey(hv_id, msg) + + def commandAllActiveHugveys(self, msg): + for hv_id in self.hugveys: + self.commandHugvey(hv_id, msg) + + async def commandSender(self): + s = self.ctx.socket(zmq.PUB) + s.bind(self.config['events']['cmd_address']) + + + self.commandAllHugveys({'action': 'show_yourself'}) + + # sleep to allow pending connections to connect + await asyncio.sleep(1) + logger.info("Ready to publish commands on: {}".format(self.config['events']['cmd_address'])) + logger.debug('Already {} items in queue'.format(self.commandQueue.qsize())) + + while self.isRunning.is_set(): + hv_id, cmd = await self.commandQueue.get() + zmqSend(s, hv_id, cmd) + + s.close() + + def instantiateHugvey(self, hugvey_id, msg): + ''' + Start a HugveyState, according to a show_yourself reply + + 'event': 'connection', + 'id': self.hugvey_id, + 'host': socket.gethostname(), + 'ip': self.getIp(), + ''' +# def startHugvey(): + h = HugveyState(hugvey_id, self) + h.config(msg['host'],msg['ip']) + + thread = threading.Thread(target=h.start) + thread.start() +# self.tasks['hv{}'.format(hugvey_id)] = self.loop.create_task(h.start()) + + async def eventListener(self): + s = self.ctx.socket(zmq.SUB) + s.bind(self.config['events']['listen_address']) + logger.info("Listen for events on: {}".format(self.config['events']['listen_address'])) + + for id in self.hugvey_ids: + s.subscribe(getTopic(id)) + + while self.isRunning.is_set(): + hugvey_id, msg = await zmqReceive(s) + + if hugvey_id not in self.hugvey_ids: + logger.critical("Message from alien Hugvey: {}".format(hugvey_id)) + continue + elif hugvey_id not in self.hugveys: + if msg['event'] == 'connection': + # Create a hugvey + self.instantiateHugvey(hugvey_id, msg) + else: + logger.warning("Message from uninstantiated Hugvey {}".format(hugvey_id)) + logger.debug("Message contains: {}".format(msg)) + continue + else: + pass + + def start(self): + self.isRunning.set() + self.loop = asyncio.get_event_loop() + self.tasks = {} # collect tasks so we can cancel in case of error + self.tasks['eventListener'] = self.loop.create_task(self.eventListener()) + self.tasks['commandSender'] = self.loop.create_task(self.commandSender()) +# self.tasks['commandSender'] = self.loop.create_task(self.commandSender()) + self.loop.run_forever() + + def stop(self): + self.isRunning.clear() + + + +class HugveyState(object): + """Represents the state of a Hugvey client on the server. + Manages server connections & voice parsing etc. + """ + def __init__(self, id: int, command: CentralCommand): + super(HugveyState, self).__init__() + self.id = id + self.command = command + self.logger = logging.getLogger(f"hugvey{self.id}") + self.loop = asyncio.new_event_loop() + + def config(self, hostname, ip): + self.ip = ip + self.hostname = hostname + self.logger.info(f"Hugvey {self.id} at {self.ip}, host: {self.hostname}") + + def start(self): + # stop on isRunning.is_set() or wait() +# self.loop.create_task(self.startAudio()) + tasks = asyncio.gather(self.startAudio(), loop=self.loop) + self.loop.run_until_complete(tasks) +# asyncio.run_coroutine_threadsafe(self._start(), self.loop) + + async def startAudio(self): + ''' + Start the audio streamer service + ''' + self.logger.info("Start audio stream") + streamer = AudioStreamer( + self.command.config['voice']['chunk'], + self.ip, + int(self.command.config['voice']['port'])) + + if self.command.debug: + self.logger.warn("Debug on: Connecting Audio player") + player = Player(self.command.config['voice']['src_rate'], self.command.config['voice']['out_rate']) + streamer.addConsumer(player) + + await streamer.run() diff --git a/hugvey/client.py b/hugvey/client.py index a31a005..dd54c83 100644 --- a/hugvey/client.py +++ b/hugvey/client.py @@ -1,8 +1,6 @@ import pyaudio import socket -import select import audioop -import threading import logging import time import zmq @@ -31,13 +29,13 @@ class VoiceServer(object): # input_device_idx = 0 devices_count = self.p.get_device_count() for i in range(devices_count): - dev = self.p.get_device_info_by_index(i) - if input_device_idx is None and dev['maxInputChannels'] > 0: - if (self.input_name and self.input_name in dev['name']) or \ - (not self.input_name and dev['name'] != 'default'): - input_device_idx = dev['index'] - logger.info("Use device {0}: {1}".format(dev['index'],dev['name'])) - logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name'])) + dev = self.p.get_device_info_by_index(i) + if input_device_idx is None and dev['maxInputChannels'] > 0: + if (self.input_name and self.input_name in dev['name']) or \ + (not self.input_name and dev['name'] != 'default'): + input_device_idx = dev['index'] + logger.info("Use device {0}: {1}".format(dev['index'],dev['name'])) + logger.debug("{} {:0d} {}".format("* " if input_device_idx == i else "- ", i, dev['name'])) return input_device_idx @@ -45,6 +43,8 @@ class VoiceServer(object): if self.input_rate == self.target_rate: f = in_data else: + # chunk 4096, with 2 bytes per frame gives len(in_data) of 8192 + # rate converted 44k1 -> 16k gives len(f) == 2972 (16/44.1 * 8192) f, self.laststate = audioop.ratecv(in_data, 2, 1, self.input_rate, self.target_rate, self.laststate) for s in self.clients: @@ -52,7 +52,7 @@ class VoiceServer(object): s.send(f) except Exception as e: self.clients.remove(s) - logger.warn("Error sending to {}".format(s.getsockname())) + logger.warn("Error sending to {}, {}".format(s.getsockname(), e)) pass return (None, pyaudio.paContinue) @@ -78,13 +78,15 @@ class VoiceServer(object): try: self.voice_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.voice_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.voice_socket.bind(('', self.voice_port)) + address = ('', self.voice_port) + self.voice_socket.bind(address) self.voice_socket.listen(5) read_list = [self.voice_socket] - logger.info( "Waiting for connections") + logger.info( "Waiting for voice connections on {}".format(address) ) while not self.stopped: (clientsocket, address) = self.voice_socket.accept() + logger.info( "Got voice connection from {}".format(address)) self.clients.append(clientsocket) logger.info( "Stop recording & streaming") @@ -178,7 +180,11 @@ class CommandHandler(object): logger.info("Publish on: {}".format(self.publish_address)) # For some reason, sending only one message is lost, perhaps due # to connect() rather than bind() ?? + + await asyncio.sleep(1) # wait for connection to be proper set + self.showMyself() + while True: for i in range(len(self.eventQueue)): zmqSend(s, self.hugvey_id, self.eventQueue.pop(0)) @@ -204,7 +210,7 @@ class Hugvey(object): logger.debug('Load config from {}'.format(filename)) self.config = yaml.safe_load(fp) - async def startCommandListener(): + async def startCommandListener(self): return await self.cmd_server.command_listener() def start(self): diff --git a/hugvey/communication.py b/hugvey/communication.py index 5094cb6..a0b51b8 100644 --- a/hugvey/communication.py +++ b/hugvey/communication.py @@ -17,4 +17,4 @@ async def zmqReceive(socket): topic, msg = await socket.recv_multipart() hugvey_id = topic.decode()[2:] logger.info("Received 0mq messages for Hugvey #{} containing {}".format(hugvey_id, msg.decode())) - return hugvey_id, json.loads(msg) + return int(hugvey_id), json.loads(msg.decode()) diff --git a/hugvey/narrative.py b/hugvey/narrative.py new file mode 100644 index 0000000..e69de29 diff --git a/hugvey/panopticon.py b/hugvey/panopticon.py new file mode 100644 index 0000000..ef8b73a --- /dev/null +++ b/hugvey/panopticon.py @@ -0,0 +1,13 @@ +""" +The panopticon provides a way to observe (& control) all running Hugveys trough a web interface +""" + +import logging +import tornado + + +logger = logging.getLogger("panopticon") + +class Panopticon(object): + def __init__(self, config, command): + pass \ No newline at end of file diff --git a/hugvey/voice/google.py b/hugvey/voice/google.py new file mode 100644 index 0000000..e69de29 diff --git a/hugvey/voice/player.py b/hugvey/voice/player.py new file mode 100644 index 0000000..e10cab3 --- /dev/null +++ b/hugvey/voice/player.py @@ -0,0 +1,62 @@ +import pyaudio +import logging +import audioop + + +logger = logging.getLogger("player") + +class Player: + """ + Play the streamed audio + """ + + def __init__(self, src_rate, out_rate): + self.p = None + self.src_rate = src_rate + self.out_rate = out_rate # unfortunately not every device plays 16kHz audio streams + self.cv_laststate = None + + try: + self.p = p = pyaudio.PyAudio() + + self.stream = p.open(format=pyaudio.paInt16, + channels=1, + rate=out_rate, + output=True, + output_device_index=self.get_output_idx() + ) + except Exception as e: + logger.critical(f"Player not instatiated: {e}") + if self.p: + self.p.terminate() + self.p = None + + def get_output_idx(self): + output_device_idx = None + devices_count = self.p.get_device_count() + for i in range(devices_count): + dev = self.p.get_device_info_by_index(i) +# print(dev) + if output_device_idx is None and dev['maxOutputChannels'] > 0: + output_device_idx = dev['index'] + logger.info("Use device {0}: {1} {2}".format(dev['index'],dev['name'], dev['maxOutputChannels'])) + logger.debug("{} {:0d} {}".format("* " if output_device_idx == i else "- ", i, dev['name'])) + return output_device_idx + + def receive(self, chunk): + if not self.p: + return + +# logger.debug('receive {}'.format(len(chunk))) + if self.src_rate == self.out_rate: + data = chunk + else: + data, self.cv_laststate = audioop.ratecv(chunk, 2, 1, self.src_rate, self.out_rate, self.cv_laststate) + self.stream.write(data) + + def shutdown(self): + if not self.p: + return + + self.stream.close() + self.p.terminate() diff --git a/hugvey/voice/streamer.py b/hugvey/voice/streamer.py new file mode 100644 index 0000000..ea2a159 --- /dev/null +++ b/hugvey/voice/streamer.py @@ -0,0 +1,49 @@ +""" +Consume a given Hugvey audio socket, and stream into the given services to emit events to the given server +""" +import socket +import logging + +logger = logging.getLogger("streamer") + +class AudioStreamer(object): + def __init__(self, chunk, address: str, port: int): + self.consumers = [] + + self.chunk = chunk + self.address = address + self.port = port + + self.isRunning = False + + def addConsumer(self, consumer): + self.consumers.append(consumer) + + async def run(self): + self.isRunning = True + + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + logger.info("Attempt connection on {}:{}".format(self.address, self.port)) + s.connect((self.address, self.port)) + + while self.isRunning: + data = s.recv(self.chunk) +# logger.debug('chunk received') + self.process(data) + + logger.info("Close socket on {}:{}".format(self.address, self.port)) + s.close() + + def stop(self): + self.isRunning = False + + for consumer in self.consumers: + consumer.shutdown() + + + def process(self, chunk): +# logger.debug("Received chunk") + for consumer in self.consumers: + consumer.receive(chunk) + + \ No newline at end of file diff --git a/hugvey_server.py b/hugvey_server.py new file mode 100644 index 0000000..61a3d8b --- /dev/null +++ b/hugvey_server.py @@ -0,0 +1,36 @@ +import argparse +import logging + +import coloredlogs + +from hugvey.central_command import CentralCommand + + +if __name__ == '__main__': + argParser = argparse.ArgumentParser( + description='Start up a Hugvey Central Command. Ready to receive voices and other events, and eager to send commands') + argParser.add_argument( + '--config', + '-c', + required=True, + type=str, + help='The yaml config file to load' + ) + argParser.add_argument( + '--verbose', + '-v', + action="store_true", + ) + + args = argParser.parse_args() +# print(coloredlogs.DEFAULT_LOG_FORMAT) +# exit() + coloredlogs.install( + level=logging.DEBUG if args.verbose else logging.INFO, +# default: "%(asctime)s %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s" + fmt="%(asctime)s %(hostname)s %(name)s[%(process)d,%(threadName)s] %(levelname)s %(message)s" + ) + + command = CentralCommand(debug_mode=args.verbose) + command.loadConfig(args.config) + command.start() diff --git a/requirements.txt b/requirements.txt index d4002e1..5aa5e19 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ pyzmq pyaudio coloredlogs +pyyaml +audioop \ No newline at end of file diff --git a/server_config.yml b/server_config.yml new file mode 100644 index 0000000..e099b51 --- /dev/null +++ b/server_config.yml @@ -0,0 +1,9 @@ +events: + cmd_address: "tcp://0.0.0.0:5555" + listen_address: "tcp://0.0.0.0:5556" +voice: + src_rate: 16000 + out_rate: 44100 + port: 4444 + chunk: 2972 +hugveys: 3 \ No newline at end of file diff --git a/test_sub.py b/test_sub.py new file mode 100644 index 0000000..b81f97a --- /dev/null +++ b/test_sub.py @@ -0,0 +1,58 @@ +import zmq +import random +import sys +import time +import asyncio +import hugvey.communication + +def main(): + if len(sys.argv) > 1: + port = sys.argv[1] + int(port) + + context = zmq.Context() + socket = context.socket(zmq.SUB) + socket.bind("tcp://*:5556") + for i in range(0, 25): + socket.setsockopt(zmq.SUBSCRIBE, "hv{}".format(i).encode()) + + + while True: + # a = await hugvey.communication.zmqReceive(socket) + # print(a) + print('received') + r = socket.recv_multipart() + print(r) + +main() +# loop = asyncio.get_event_loop() +# loop.run_until_complete(main()) +# +# +# import sys +# import zmq +# +# port = "5555" +# +# # Socket to talk to server +# context = zmq.Context() +# socket = context.socket(zmq.SUB) +# +# # print "Collecting updates from weather server..." +# socket.bind ("tcp://*:%s" % port) +# +# # Subscribe to zipcode, default is NYC, 10001 +# # topicfilter = "10001" +# for i in range(0, 25): +# socket.setsockopt(zmq.SUBSCRIBE, "hv{}".format(i).encode()) +# +# # Process 5 updates +# # total_value = 0 +# for update_nbr in range (5): +# string = socket.recv_multipart() +# print(string) +# # topic, messagedata = string.split() +# # total_value += int(messagedata) +# # print topic, messagedata +# +# # print "Average messagedata value for topic '%s' was %dF" % (topicfilter, total_value / update_nbr)