trap/trap/node.py
2025-05-26 16:14:47 +02:00

144 lines
No EOL
4.7 KiB
Python

import logging
from logging.handlers import QueueHandler, QueueListener, SocketHandler
import multiprocessing
from multiprocessing.synchronize import Event as BaseEvent
from argparse import ArgumentParser, Namespace
import time
from typing import Optional
import zmq
from trap.counter import CounterFpsSender, CounterSender
from trap.timer import Timer
class Node():
def __init__(self, config: Namespace, is_running: BaseEvent, fps_counter: CounterFpsSender):
self.config = config
self.is_running = is_running
self.fps_counter = fps_counter
self.zmq_context = zmq.Context()
self.logger = self._logger()
self._prev_loop_time = 0
self.setup()
@classmethod
def _logger(cls):
return logging.getLogger(f"trap.{cls.__name__}")
def tick(self):
self.fps_counter.tick()
# with self.fps_counter.get_lock():
# self.fps_counter.value+=1
def setup(self):
raise RuntimeError("Not implemented setup()")
def run(self):
raise RuntimeError("Not implemented run()")
def run_loop(self):
"""Use in run(), to check if it should keep looping
Takes care of tick()'ing the iterations/second counter
"""
self.tick()
return self.is_running.is_set()
def run_loop_capped_fps(self, max_fps: float):
"""Use in run(), to check if it should keep looping
Takes care of tick()'ing the iterations/second counter
"""
now = time.perf_counter()
time_diff = (now - self._prev_loop_time)
if time_diff < 1/max_fps:
# print(f"sleep {1/max_fps - time_diff}")
time.sleep(1/max_fps - time_diff)
now += 1/max_fps - time_diff
self._prev_loop_time = now
return self.run_loop()
@classmethod
def arg_parser(cls) -> ArgumentParser:
raise RuntimeError("Not implemented arg_parser()")
@classmethod
def _get_arg_parser(cls) -> ArgumentParser:
parser = cls.arg_parser()
# add some defaults
parser.add_argument(
'--verbose',
'-v',
help="Increase verbosity. Add multiple times to increase further.",
action='count', default=0
)
parser.add_argument(
'--remote-log-addr',
help="Connect to a remote logger like cutelog. Specify the ip",
type=str,
default="100.72.38.82"
)
parser.add_argument(
'--remote-log-port',
help="Connect to a remote logger like cutelog. Specify the port",
type=int,
default=19996
)
return parser
def sub(self, addr: str):
"Default zmq sub configuration"
sock = self.zmq_context.socket(zmq.SUB)
sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame. NB. make sure this comes BEFORE connect, otherwise it's ignored!!
sock.setsockopt(zmq.SUBSCRIBE, b'')
sock.connect(addr)
return sock
def pub(self, addr: str):
"Default zmq pub configuration"
sock = self.zmq_context.socket(zmq.PUB)
sock.setsockopt(zmq.CONFLATE, 1) # only keep latest frame
sock.bind(addr)
return sock
@classmethod
def start(cls, config: Namespace, is_running: BaseEvent, timer_counter: Optional[Timer]):
instance = cls(config, is_running, timer_counter)
instance.run()
instance.logger.info("Stopping")
@classmethod
def parse_and_start(cls):
"""To start the node from CLI/supervisor"""
config = cls._get_arg_parser().parse_args()
setup_logging(config) # running from cli, we need to setup logging
is_running = multiprocessing.Event()
is_running.set()
statsender = CounterSender()
counter = CounterFpsSender(f"trap.{cls.__name__}", statsender)
# timer_counter = Timer(cls.__name__)
cls.start(config, is_running, counter)
def setup_logging(config: Namespace):
loglevel = logging.NOTSET if config.verbose > 1 else logging.DEBUG if config.verbose > 0 else logging.INFO
stream_handler = logging.StreamHandler()
log_handlers = [stream_handler]
if config.remote_log_addr:
logging.captureWarnings(True)
# root_logger.setLevel(logging.NOTSET) # to send all records to cutelog
socket_handler = SocketHandler(config.remote_log_addr, config.remote_log_port)
print(socket_handler.host, socket_handler.port)
socket_handler.setLevel(logging.NOTSET)
log_handlers.append(socket_handler)
logging.basicConfig(
level=loglevel,
handlers=log_handlers # [queue_handler]
)