trap/trap/plumber.py

98 lines
3.0 KiB
Python

import atexit
import logging
from logging.handlers import SocketHandler
from multiprocessing import Event, Process, Queue
import multiprocessing
import signal
import sys
import time
from trap.config import parser
from trap.frame_emitter import run_frame_emitter
from trap.prediction_server import run_prediction_server
from trap.renderer import run_renderer
from trap.socket_forwarder import run_ws_forwarder
from trap.tracker import run_tracker
logger = logging.getLogger("trap.plumbing")
class ExceptionHandlingProcess(Process):
def run(self):
assert 'is_running' in self._kwargs
# exit handler to make sure that on many kinds of crashes and kills the
# suprocess warns parent/siblings
# TODO: Does not work with OOM kill. Would need a watchdog process for that
def exit_handler(*args):
self._kwargs['is_running'].clear()
atexit.register(exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
signal.signal(signal.SIGINT, exit_handler)
try:
super(Process, self).run()
except Exception as e:
logger.critical(f"Exception in {self.name}")
logger.exception(e)
self._kwargs['is_running'].clear()
def start():
args = parser.parse_args()
loglevel = logging.NOTSET if args.verbose > 1 else logging.DEBUG if args.verbose > 0 else logging.INFO
# print(args)
# exit()
logging.basicConfig(
level=loglevel,
)
# set per handler, so we can set it lower for the root logger if remote logging is enabled
root_logger = logging.getLogger()
[h.setLevel(loglevel) for h in root_logger.handlers]
isRunning = Event()
isRunning.set()
if args.remote_log_addr:
logging.captureWarnings(True)
root_logger.setLevel(logging.NOTSET) # to send all records to cutelog
socket_handler = SocketHandler(args.remote_log_addr, args.remote_log_port)
root_logger.addHandler(socket_handler)
# instantiating process with arguments
procs = [
ExceptionHandlingProcess(target=run_ws_forwarder, kwargs={'config': args, 'is_running': isRunning}, name='forwarder'),
ExceptionHandlingProcess(target=run_frame_emitter, kwargs={'config': args, 'is_running': isRunning}, name='frame_emitter'),
ExceptionHandlingProcess(target=run_tracker, kwargs={'config': args, 'is_running': isRunning}, name='tracker'),
]
if args.render_file or args.render_url:
procs.append(
ExceptionHandlingProcess(target=run_renderer, kwargs={'config': args, 'is_running': isRunning}, name='renderer')
)
if not args.bypass_prediction:
procs.append(
ExceptionHandlingProcess(target=run_prediction_server, kwargs={'config': args, 'is_running':isRunning}, name='inference'),
)
logger.info("start")
for proc in procs:
proc.start()
# wait for processes to clean up
for proc in procs:
proc.join()
logger.info('Stop')
if __name__ == "__main__":
start()