import atexit import logging from logging.handlers import SocketHandler, QueueHandler, QueueListener from multiprocessing import Event, Process, Queue import multiprocessing import signal import sys import time from trap.config import parser from trap.cv_renderer import run_cv_renderer from trap.frame_emitter import run_frame_emitter from trap.prediction_server import run_prediction_server from trap.preview_renderer import run_preview_renderer from trap.animation_renderer import run_animation_renderer from trap.socket_forwarder import run_ws_forwarder from trap.tracker import run_tracker from setproctitle import setproctitle, setthreadtitle logger = logging.getLogger("trap.plumbing") class ExceptionHandlingProcess(Process): def run(self): assert 'is_running' in self._kwargs # exit handler to make sure that on many kinds of crashes and kills the # suprocess warns parent/siblings # TODO: Does not work with OOM kill. Would need a watchdog process for that def exit_handler(*args): self._kwargs['is_running'].clear() atexit.register(exit_handler) signal.signal(signal.SIGTERM, exit_handler) signal.signal(signal.SIGINT, exit_handler) setproctitle(f"trap-{self.name}") try: super(Process, self).run() print("finished ", self.name) except BaseException as e: logger.critical(f"Exception in {self.name}") logger.exception(e) self._kwargs['is_running'].clear() def start(): args = parser.parse_args() loglevel = logging.NOTSET if args.verbose > 1 else logging.DEBUG if args.verbose > 0 else logging.INFO # print(args) # exit() isRunning = Event() isRunning.set() q = multiprocessing.Queue(-1) queue_handler = QueueHandler(q) stream_handler = logging.StreamHandler() log_handlers = [stream_handler] if args.remote_log_addr: logging.captureWarnings(True) # root_logger.setLevel(logging.NOTSET) # to send all records to cutelog socket_handler = SocketHandler(args.remote_log_addr, args.remote_log_port) socket_handler.setLevel(logging.NOTSET) log_handlers.append(socket_handler) queue_listener = QueueListener(q, *log_handlers, respect_handler_level=True) # root = logging.getLogger() logging.basicConfig( level=loglevel, handlers=[queue_handler] ) # root_logger = logging.getLogger() # # set per handler, so we can set it lower for the root logger if remote logging is enabled # [h.setLevel(loglevel) for h in root_logger.handlers] # queue_listener.handlers.append(socket_handler) # instantiating process with arguments procs = [ # ExceptionHandlingProcess(target=run_ws_forwarder, kwargs={'config': args, 'is_running': isRunning}, name='forwarder'), ExceptionHandlingProcess(target=run_frame_emitter, kwargs={'config': args, 'is_running': isRunning}, name='frame_emitter'), ExceptionHandlingProcess(target=run_tracker, kwargs={'config': args, 'is_running': isRunning}, name='tracker'), ] # if args.render_file or args.render_url or args.render_window: if args.render_window or args.render_file or args.render_url: procs.append( # ExceptionHandlingProcess(target=run_cv_renderer, kwargs={'config': args, 'is_running': isRunning}, name='preview') ExceptionHandlingProcess(target=run_cv_renderer, kwargs={'config': args, 'is_running': isRunning}, name='preview') ) if args.render_animation: procs.append( ExceptionHandlingProcess(target=run_animation_renderer, kwargs={'config': args, 'is_running': isRunning}, name='renderer') ) if not args.bypass_prediction: procs.append( ExceptionHandlingProcess(target=run_prediction_server, kwargs={'config': args, 'is_running':isRunning}, name='inference'), ) try: logger.info("start") for proc in procs: proc.start() # if start the listener before the subprocesses, it becomes a mess, because the # running threat is forked too, but cannot easily be stopped in the forks. # Thus, only start the queue-listener threat _after_ starting processes queue_listener.start() # wait for processes to clean up for proc in procs: proc.join() isRunning.clear() logger.info('Stop') except BaseException as e: # mainly for KeyboardInterrupt # but in any case, on error all processed need to be signalled to shut down logger.critical(f"Exception in plumber") logger.exception(e) isRunning.clear() # while True: # time.sleep(2) # any_alive = False # alive = [proc for proc in procs if proc.is_alive()] # print("alive: ", [p.name for p in alive]) # if len(alive) < 1: # break print('stop listener') queue_listener.stop() print('stopped listener') print("finished plumber") if __name__ == "__main__": start()