Remove httpclient for call to voice storage. Attempt to fix 'Too many open files' error

This commit is contained in:
Ruben van de Ven 2019-04-01 16:36:34 +02:00
parent de3c42a329
commit 8bbe75b1ea
5 changed files with 84 additions and 23 deletions

View File

@ -94,3 +94,12 @@ for i in {1..6}; do ssh pi@hugvey$i.local "sudo shutdown -h now"; done
```
```bash
lsof -p $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $9}'|sort -rn|uniq -c|sort -rn|head -20
```
or
```bash
lsof | grep $(ps aux|grep "[h]ugvey_server.py" |awk '{print $2}')| awk '{print $11}'|sort -rn|uniq -c|sort -rn|head -20
```

View File

@ -83,7 +83,11 @@ class CentralCommand(object):
self.loadLanguages()
self.panopticon = Panopticon(self, self.config)
voice_dir = os.path.join(self.config['web']['files_dir'], 'voices')
self.voiceStorage = VoiceStorage(voice_dir, self.config['voice']['token'])
self.panopticon = Panopticon(self, self.config, self.voiceStorage)
def loadLanguages(self):
logger.debug('load language files')
@ -231,6 +235,22 @@ class CentralCommand(object):
logger.critical(f"Exception while running event loop:")
logger.exception(e)
async def voiceListener(self, hugvey_id):
s = self.ctx.socket(zmq.REP) #: :type s: zmq.sugar.Socket
voiceAddr = f"ipc://voice{hugvey_id}"
s.bind(voiceAddr)
logger.debug("Listen for voice requests on: {}".format(voiceAddr))
while self.isRunning.is_set():
try:
r = await s.recv_json()
isVariable = bool(r['variable'])
text = r['text']
fn = await self.voiceStorage.requestFile(text, isVariable)
await s.send_string(fn)
except Exception as e:
logger.critical(f"Exception while running voice loop:")
logger.exception(e)
def start(self):
self.isRunning.set()
@ -242,6 +262,9 @@ class CentralCommand(object):
self.catchException(self.eventListener()))
self.tasks['commandSender'] = self.loop.create_task(
self.catchException(self.commandSender()))
for hid in self.hugvey_ids:
self.tasks['voiceListener'] = self.loop.create_task(
self.catchException(self.voiceListener(hid)))
# we want the web interface in a separate thread
self.panopticon_thread = threading.Thread(

View File

@ -182,6 +182,7 @@ def getVoiceHandler(voiceStorage):
# TODO: we should be using ZMQ here...
text = self.get_argument('text')
isVariable = True if int(self.get_argument('variable')) >0 else False
# TODO: make zmq socket request/reply pattern:
fn = await voiceStorage.requestFile(text, isVariable)
if not fn:
raise Exception(f"No Filename for text: {text}")
@ -198,12 +199,11 @@ def getVoiceHandler(voiceStorage):
class Panopticon(object):
def __init__(self, central_command, config):
def __init__(self, central_command, config, voiceStorage):
self.command = central_command
self.config = config
voice_dir = os.path.join(self.config['web']['files_dir'], 'voices')
self.voiceStorage = VoiceStorage(voice_dir, self.config['voice']['token'])
self.voiceStorage = voiceStorage
self.wsHandler = getWebSocketHandler(self.command)
@ -215,7 +215,7 @@ class Panopticon(object):
(r"/voice", getVoiceHandler(self.voiceStorage)),
(r"/(.*)", tornado.web.StaticFileHandler,
{"path": web_dir, "default_filename": 'index.html'}),
], debug=True)
], debug=False)
self.application.listen(config['web']['port'])
# self.loop.configure(evt_loop)

View File

@ -8,6 +8,10 @@ from .communication import LOG_BS
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
import uuid
import shortuuid
import threading
import faulthandler
from zmq.asyncio import Context
import zmq
mainLogger = logging.getLogger("hugvey")
logger = mainLogger.getChild("narrative")
@ -139,28 +143,50 @@ class Message(object):
if self.audioFile is not None:
return self.audioFile
self.logger.debug(f"Fetching audio for {self.getText()}")
text = self.getText()
self.logger.debug(f"Fetching audio for {text}")
# return "test";
async with self.filenameFetchLock:
client = AsyncHTTPClient()
queryString = urllib.parse.urlencode({
'text': self.getText(),
'filename': 1,
'variable': 1 if self.hasVariables() else 0
})
request = HTTPRequest(
url = f"http://localhost:{self.story.panopticon_port}/voice?{queryString}",
method="GET"
)
self.logger.log(LOG_BS, request.url)
response = await client.fetch(request)
print(threading.enumerate())
# client = AsyncHTTPClient()
info = {
'text': text,
'variable': True if self.hasVariables() else False
}
# queryString = urllib.parse.urlencode(info)
# request = HTTPRequest(
# url = f"http://localhost:{self.story.panopticon_port}/voice?{queryString}",
# # url = f"http://rubenvandeven.com/",
# method="GET"
# )
# self.logger.log(LOG_BS, request.url)
# response = await client.fetch(request)
s = Context.instance().socket(zmq.REQ) #: :type s: zmq.sugar.Socket
voiceAddr = f"ipc://voice{self.story.hugvey.id}"
s.connect(voiceAddr)
await s.send_json(info)
print('sent now wait')
filename = await s.recv_string()
print('reply', filename)
s.close()
if response.code != 200:
self.logger.critical(f"Error when fetching filename: {response.code} for {queryString}")
return None
print(threading.enumerate())
# for t in threading.enumerate(): #: :type t: threading.Thread
# if t.name.startswith('ThreadPoolExecutor'):
# faulthandler.dump_traceback()
# exit()
# return "local/voices/static/64/64a016ce38faaac6cc59f6dc8e64cfa6bb81005b.wav"
# if response.code != 200:
# self.logger.critical(f"Error when fetching filename: {response.code} for {queryString}")
# return None
self.logger.debug(f"Fetched audio for {self.getText()}")
return response.body.decode().strip()
self.logger.debug(f"Fetched audio for {text}")
return filename
class Reply(object):

View File

@ -71,11 +71,13 @@ class VoiceStorage(object):
except Exception as e:
logger.exception(e)
self.pendingRequests[id].set()
http_client.close()
return None
else:
if response.code != 200:
logger.critical(f"No proper response! {response.code}")
self.pendingRequests[id].set()
http_client.close()
return None
logger.debug(f"Wrote body: {response.code}")
@ -83,4 +85,5 @@ class VoiceStorage(object):
f.write(response.body)
self.pendingRequests[id].set()
print(type(fn), fn)
http_client.close()
return fn