trap/trap/settings.py
2025-11-07 23:57:54 +01:00

163 lines
8.2 KiB
Python

from argparse import ArgumentParser
import json
import math
from pathlib import Path
from typing import Any, Dict
import zmq
from trap.node import Node
import dearpygui.dearpygui as dpg
class Settings(Node):
"""
Quickndirty gui to change some settings ad-hoc
no storage of values, no defaults. No detection of lost nodes, or sending config on them starting
"""
def setup(self):
self.config_sock.close() # setup by default for all nodes, but we want to publish
self.config_sock = self.pub(self.config.zmq_config_addr)
self.config_init_sock.close() # setup by default for all nodes, but we want to publish
self.config_init_sock = self.pull(self.config.zmq_config_init_addr)
self.settings_fields = {}
self.settings: Dict[str, Any] = {}
self.load()
dpg.create_context()
dpg.create_viewport(title='Trap settings', width=600, height=1200)
dpg.setup_dearpygui()
with dpg.window(label="General", pos=(0, 0)):
dpg.add_text(f"Settings from {self.config.settings_file}")
dpg.add_button(label="Save", callback=self.save)
with dpg.window(label="Stage", pos=(150, 0)):
self.register_setting(f'stage.fps', dpg.add_slider_int(label="FPS cap", default_value=self.get_setting(f'stage.fps', 30), callback=self.on_change))
self.register_setting(f'stage.prediction_interval', dpg.add_slider_int(label="prediction interval", default_value=self.get_setting('stage.prediction_interval', 18), callback=self.on_change))
with dpg.window(label="Lidar", pos=(0, 100), autosize=True):
self.register_setting(f'lidar.crop_map_boundaries', dpg.add_checkbox(label="crop_map_boundaries", default_value=self.get_setting(f'lidar.crop_map_boundaries', True), callback=self.on_change))
self.register_setting(f'lidar.viz_cropping', dpg.add_checkbox(label="viz_cropping", default_value=self.get_setting(f'lidar.viz_cropping', True), callback=self.on_change))
self.register_setting(f'lidar.tracking_enabled', dpg.add_checkbox(label="tracking_enabled", default_value=self.get_setting(f'lidar.tracking_enabled', True), callback=self.on_change))
dpg.add_separator(label="Clustering")
cluster_methods = ("birch", "optics", "dbscan")
self.register_setting('lidar.cluster.method', dpg.add_combo(label="Method", items=cluster_methods, default_value=self.get_setting('lidar.cluster.method', default='dbscan'), callback=self.on_change))
self.register_setting(f'lidar.eps', dpg.add_slider_float(label="DBSCAN epsilon", default_value=self.get_setting(f'lidar.eps', 0.3), max_value=1, callback=self.on_change))
self.register_setting(f'lidar.min_samples', dpg.add_slider_int(label="DBSCAN min_samples", default_value=self.get_setting(f'lidar.min_samples', 8), max_value=30, callback=self.on_change))
dpg.add_text("When using BIRCH, the resulting subclusters can be postprocessed by DBSCAN:")
self.register_setting('lidar.birch_process_subclusters', dpg.add_checkbox(label="Process subclusters", default_value=self.get_setting('lidar.birch_process_subclusters', True), callback=self.on_change))
self.register_setting('lidar.birch_threshold', dpg.add_slider_float(label="Threshold", default_value=self.get_setting('lidar.birch_threshold', 1), max_value=2.5, callback=self.on_change))
self.register_setting('lidar.birch_branching_factor', dpg.add_slider_int(label="Branching factor", default_value=self.get_setting('lidar.birch_branching_factor', 50), max_value=100, callback=self.on_change))
dpg.add_separator(label="Cluster filter")
self.register_setting(f'lidar.min_box_area', dpg.add_slider_float(label="min_box_area", default_value=self.get_setting(f'lidar.min_box_area', .1), min_value=0, max_value=1, callback=self.on_change))
for i, lidar in enumerate(["192.168.0.16", "192.168.0.10"]):
name = lidar.replace(".", "_")
with dpg.window(label=f"Lidar {lidar}", pos=(i * 300, 450),autosize=True):
# dpg.add_text("test")
# dpg.add_input_text(label="string", default_value="Quick brown fox")
self.register_setting(f'lidar.{name}.enabled', dpg.add_checkbox(label="enabled", default_value=self.get_setting(f'lidar.{name}.enabled', True), callback=self.on_change))
self.register_setting(f'lidar.{name}.rot_x', dpg.add_slider_float(label="rot_x", default_value=self.get_setting(f'lidar.{name}.rot_x', 0), max_value=math.pi * 2, callback=self.on_change))
self.register_setting(f'lidar.{name}.rot_y', dpg.add_slider_float(label="rot_y", default_value=self.get_setting(f'lidar.{name}.rot_y', 0), max_value=math.pi * 2, callback=self.on_change))
self.register_setting(f'lidar.{name}.rot_z', dpg.add_slider_float(label="rot_z", default_value=self.get_setting(f'lidar.{name}.rot_z', 0), max_value=math.pi * 2, callback=self.on_change))
self.register_setting(f'lidar.{name}.trans_x', dpg.add_slider_float(label="trans_x", default_value=self.get_setting(f'lidar.{name}.trans_x', 0), min_value=-15, max_value=15, callback=self.on_change))
self.register_setting(f'lidar.{name}.trans_y', dpg.add_slider_float(label="trans_y", default_value=self.get_setting(f'lidar.{name}.trans_y', 0), min_value=-15, max_value=15, callback=self.on_change))
self.register_setting(f'lidar.{name}.trans_z', dpg.add_slider_float(label="trans_z", default_value=self.get_setting(f'lidar.{name}.trans_z', 0), min_value=-15, max_value=15, callback=self.on_change))
self.send_for_prefix("") # spread the defaults
dpg.show_viewport()
def stop(self):
dpg.destroy_context()
def check_config(self):
# override node function to disable it
pass
def refresh_settings(self):
# override node function to disable it
pass
def get_setting(self, name: str, default: Any):
"""
Automatically configure the value with the default when requesting it
"""
r = super().get_setting(name, default)
self.settings[name] = r
return r
def register_setting(self, name: str, field: int):
self.settings_fields[field] = name
def on_change(self, sender, value, user_data = None):
# print(sender, app_data, user_data)
setting = self.settings_fields[sender]
print(setting, value)
self.settings[setting] = value
self.config_sock.send_json({setting: value})
def send_for_prefix(self, prefix: str):
self.config_sock.send_json(self.get_by_prefix(prefix))
def save(self):
with self.config.settings_file.open('w') as fp:
self.logger.info(f"Save to {self.config.settings_file}")
json.dump(self.settings, fp)
def get_by_prefix(self, prefix: str) -> Dict[str, Any]:
return {key: value for key, value in self.settings.items() if key.startswith(prefix)}
def load(self) -> Dict[str, Any]:
if not self.config.settings_file.exists():
self.logger.info(f"No config at {self.config.settings_file}")
return {}
self.logger.info(f"Loading from {self.config.settings_file}")
with self.config.settings_file.open('r') as fp:
self.settings = json.load(fp)
def run(self):
# below replaces, start_dearpygui()
while self.run_loop() and dpg.is_dearpygui_running():
# 1) receive init requests
try:
init_msg = self.config_init_sock.recv_string(zmq.NOBLOCK)
self.logger.info(f"Send init for {init_msg}")
print('init', init_msg)
self.send_for_prefix(init_msg)
except zmq.ZMQError as e:
# no msgs
pass
dpg.render_dearpygui_frame()
@classmethod
def arg_parser(cls):
argparser = ArgumentParser()
argparser.add_argument('--settings-file',
help='Where to store settings',
type=Path,
default=Path("./settings.json"))
return argparser