From 8bbe75b1ea942e09408434d4be3d22d43a9b12ec Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Mon, 1 Apr 2019 16:36:34 +0200 Subject: [PATCH] Remove httpclient for call to voice storage. Attempt to fix 'Too many open files' error --- README.md | 9 ++++++ hugvey/central_command.py | 25 +++++++++++++++- hugvey/panopticon.py | 8 ++--- hugvey/story.py | 62 +++++++++++++++++++++++++++------------ hugvey/voice.py | 3 ++ 5 files changed, 84 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 5193e21..a79f4c1 100644 --- a/README.md +++ b/README.md @@ -94,3 +94,12 @@ for i in {1..6}; do ssh pi@hugvey$i.local "sudo shutdown -h now"; done ``` +```bash +lsof -p $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $9}'|sort -rn|uniq -c|sort -rn|head -20 +``` + +or + +```bash +lsof | grep $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $11}'|sort -rn|uniq -c|sort -rn|head -20 +``` diff --git a/hugvey/central_command.py b/hugvey/central_command.py index 201b40c..ad959ec 100644 --- a/hugvey/central_command.py +++ b/hugvey/central_command.py @@ -83,7 +83,11 @@ class CentralCommand(object): self.loadLanguages() - self.panopticon = Panopticon(self, self.config) + voice_dir = os.path.join(self.config['web']['files_dir'], 'voices') + self.voiceStorage = VoiceStorage(voice_dir, self.config['voice']['token']) + + self.panopticon = Panopticon(self, self.config, self.voiceStorage) + def loadLanguages(self): logger.debug('load language files') @@ -231,6 +235,22 @@ class CentralCommand(object): logger.critical(f"Exception while running event loop:") logger.exception(e) + async def voiceListener(self, hugvey_id): + s = self.ctx.socket(zmq.REP) #: :type s: zmq.sugar.Socket + voiceAddr = f"ipc://voice{hugvey_id}" + s.bind(voiceAddr) + logger.debug("Listen for voice requests on: {}".format(voiceAddr)) + + while self.isRunning.is_set(): + try: + r = await s.recv_json() + isVariable = bool(r['variable']) + text = r['text'] + fn = await self.voiceStorage.requestFile(text, isVariable) + await s.send_string(fn) + except Exception as e: + logger.critical(f"Exception while running voice loop:") + logger.exception(e) def start(self): self.isRunning.set() @@ -242,6 +262,9 @@ class CentralCommand(object): self.catchException(self.eventListener())) self.tasks['commandSender'] = self.loop.create_task( self.catchException(self.commandSender())) + for hid in self.hugvey_ids: + self.tasks['voiceListener'] = self.loop.create_task( + self.catchException(self.voiceListener(hid))) # we want the web interface in a separate thread self.panopticon_thread = threading.Thread( diff --git a/hugvey/panopticon.py b/hugvey/panopticon.py index 17a4086..8a48569 100644 --- a/hugvey/panopticon.py +++ b/hugvey/panopticon.py @@ -182,6 +182,7 @@ def getVoiceHandler(voiceStorage): # TODO: we should be using ZMQ here... text = self.get_argument('text') isVariable = True if int(self.get_argument('variable')) >0 else False + # TODO: make zmq socket request/reply pattern: fn = await voiceStorage.requestFile(text, isVariable) if not fn: raise Exception(f"No Filename for text: {text}") @@ -198,12 +199,11 @@ def getVoiceHandler(voiceStorage): class Panopticon(object): - def __init__(self, central_command, config): + def __init__(self, central_command, config, voiceStorage): self.command = central_command self.config = config - voice_dir = os.path.join(self.config['web']['files_dir'], 'voices') - self.voiceStorage = VoiceStorage(voice_dir, self.config['voice']['token']) + self.voiceStorage = voiceStorage self.wsHandler = getWebSocketHandler(self.command) @@ -215,7 +215,7 @@ class Panopticon(object): (r"/voice", getVoiceHandler(self.voiceStorage)), (r"/(.*)", tornado.web.StaticFileHandler, {"path": web_dir, "default_filename": 'index.html'}), - ], debug=True) + ], debug=False) self.application.listen(config['web']['port']) # self.loop.configure(evt_loop) diff --git a/hugvey/story.py b/hugvey/story.py index e59bfb8..6ce12d0 100644 --- a/hugvey/story.py +++ b/hugvey/story.py @@ -8,6 +8,10 @@ from .communication import LOG_BS from tornado.httpclient import AsyncHTTPClient, HTTPRequest import uuid import shortuuid +import threading +import faulthandler +from zmq.asyncio import Context +import zmq mainLogger = logging.getLogger("hugvey") logger = mainLogger.getChild("narrative") @@ -139,28 +143,50 @@ class Message(object): if self.audioFile is not None: return self.audioFile - self.logger.debug(f"Fetching audio for {self.getText()}") + text = self.getText() + self.logger.debug(f"Fetching audio for {text}") + # return "test"; async with self.filenameFetchLock: - client = AsyncHTTPClient() - queryString = urllib.parse.urlencode({ - 'text': self.getText(), - 'filename': 1, - 'variable': 1 if self.hasVariables() else 0 - }) - request = HTTPRequest( - url = f"http://localhost:{self.story.panopticon_port}/voice?{queryString}", - method="GET" - ) - self.logger.log(LOG_BS, request.url) - response = await client.fetch(request) + print(threading.enumerate()) + +# client = AsyncHTTPClient() + info = { + 'text': text, + 'variable': True if self.hasVariables() else False + } +# queryString = urllib.parse.urlencode(info) +# request = HTTPRequest( +# url = f"http://localhost:{self.story.panopticon_port}/voice?{queryString}", +# # url = f"http://rubenvandeven.com/", +# method="GET" +# ) +# self.logger.log(LOG_BS, request.url) +# response = await client.fetch(request) + s = Context.instance().socket(zmq.REQ) #: :type s: zmq.sugar.Socket + voiceAddr = f"ipc://voice{self.story.hugvey.id}" + s.connect(voiceAddr) + await s.send_json(info) + print('sent now wait') + filename = await s.recv_string() + print('reply', filename) + s.close() + - if response.code != 200: - self.logger.critical(f"Error when fetching filename: {response.code} for {queryString}") - return None + print(threading.enumerate()) +# for t in threading.enumerate(): #: :type t: threading.Thread +# if t.name.startswith('ThreadPoolExecutor'): +# faulthandler.dump_traceback() +# exit() + +# return "local/voices/static/64/64a016ce38faaac6cc59f6dc8e64cfa6bb81005b.wav" + +# if response.code != 200: +# self.logger.critical(f"Error when fetching filename: {response.code} for {queryString}") +# return None - self.logger.debug(f"Fetched audio for {self.getText()}") - return response.body.decode().strip() + self.logger.debug(f"Fetched audio for {text}") + return filename class Reply(object): diff --git a/hugvey/voice.py b/hugvey/voice.py index d8c4b1b..b40143b 100644 --- a/hugvey/voice.py +++ b/hugvey/voice.py @@ -71,11 +71,13 @@ class VoiceStorage(object): except Exception as e: logger.exception(e) self.pendingRequests[id].set() + http_client.close() return None else: if response.code != 200: logger.critical(f"No proper response! {response.code}") self.pendingRequests[id].set() + http_client.close() return None logger.debug(f"Wrote body: {response.code}") @@ -83,4 +85,5 @@ class VoiceStorage(object): f.write(response.body) self.pendingRequests[id].set() print(type(fn), fn) + http_client.close() return fn