import atexit import logging from logging.handlers import SocketHandler, QueueHandler, QueueListener from multiprocessing import Event, Process, Queue import multiprocessing import signal import sys import time from trap.config import parser from trap.frame_emitter import run_frame_emitter from trap.prediction_server import run_prediction_server from trap.preview_renderer import run_preview_renderer from trap.animation_renderer import run_animation_renderer from trap.socket_forwarder import run_ws_forwarder from trap.tracker import run_tracker logger = logging.getLogger("trap.plumbing") class ExceptionHandlingProcess(Process): def run(self): assert 'is_running' in self._kwargs # exit handler to make sure that on many kinds of crashes and kills the # suprocess warns parent/siblings # TODO: Does not work with OOM kill. Would need a watchdog process for that def exit_handler(*args): self._kwargs['is_running'].clear() atexit.register(exit_handler) signal.signal(signal.SIGTERM, exit_handler) signal.signal(signal.SIGINT, exit_handler) try: super(Process, self).run() except Exception as e: logger.critical(f"Exception in {self.name}") logger.exception(e) self._kwargs['is_running'].clear() def start(): args = parser.parse_args() loglevel = logging.NOTSET if args.verbose > 1 else logging.DEBUG if args.verbose > 0 else logging.INFO # print(args) # exit() isRunning = Event() isRunning.set() q = multiprocessing.Queue(-1) queue_handler = QueueHandler(q) stream_handler = logging.StreamHandler() log_handlers = [stream_handler] if args.remote_log_addr: logging.captureWarnings(True) # root_logger.setLevel(logging.NOTSET) # to send all records to cutelog socket_handler = SocketHandler(args.remote_log_addr, args.remote_log_port) socket_handler.setLevel(logging.NOTSET) log_handlers.append(socket_handler) queue_listener = QueueListener(q, *log_handlers, respect_handler_level=True) queue_listener.start() # root = logging.getLogger() logging.basicConfig( level=loglevel, handlers=[queue_handler] ) # root_logger = logging.getLogger() # # set per handler, so we can set it lower for the root logger if remote logging is enabled # [h.setLevel(loglevel) for h in root_logger.handlers] # queue_listener.handlers.append(socket_handler) # instantiating process with arguments procs = [ ExceptionHandlingProcess(target=run_ws_forwarder, kwargs={'config': args, 'is_running': isRunning}, name='forwarder'), ExceptionHandlingProcess(target=run_frame_emitter, kwargs={'config': args, 'is_running': isRunning}, name='frame_emitter'), ExceptionHandlingProcess(target=run_tracker, kwargs={'config': args, 'is_running': isRunning}, name='tracker'), ] if args.render_file or args.render_url or args.render_window: if not args.render_no_preview or args.render_file or args.render_url: procs.append( ExceptionHandlingProcess(target=run_preview_renderer, kwargs={'config': args, 'is_running': isRunning}, name='renderer') ) procs.append( ExceptionHandlingProcess(target=run_animation_renderer, kwargs={'config': args, 'is_running': isRunning}, name='map_renderer') ) if not args.bypass_prediction: procs.append( ExceptionHandlingProcess(target=run_prediction_server, kwargs={'config': args, 'is_running':isRunning}, name='inference'), ) logger.info("start") for proc in procs: proc.start() # wait for processes to clean up for proc in procs: proc.join() logger.info('Stop') if __name__ == "__main__": start()