Laser output tweaks

This commit is contained in:
Ruben van de Ven 2025-04-11 22:47:48 +02:00
parent 35b8e77d07
commit dc41f64df9
4 changed files with 83 additions and 7 deletions

View file

@ -206,6 +206,7 @@ class Track:
source: Optional[int] = None # to keep track of processed tracks source: Optional[int] = None # to keep track of processed tracks
lost: bool = False lost: bool = False
created_at: Optional[float] = None created_at: Optional[float] = None
frame_index: int = 0
def __post_init__(self): def __post_init__(self):
if not self.created_at: if not self.created_at:

11
trap/shapes.py Normal file

File diff suppressed because one or more lines are too long

View file

@ -7,13 +7,16 @@ import json
import logging import logging
import time import time
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
import numpy as np
from statemachine import Event, State, StateMachine from statemachine import Event, State, StateMachine
from statemachine.exceptions import TransitionNotAllowed from statemachine.exceptions import TransitionNotAllowed
import zmq import zmq
from sgan.sgan import data from sgan.sgan import data
from trap import shapes
from trap.base import DataclassJSONEncoder, Frame, Track from trap.base import DataclassJSONEncoder, Frame, Track
from trap.counter import CounterSender from trap.counter import CounterSender
from trap.laser_renderer import rotateMatrix
from trap.node import Node from trap.node import Node
from trap.timer import Timer from trap.timer import Timer
from trap.utils import exponentialDecay, relativePointToPolar, relativePolarToPoint from trap.utils import exponentialDecay, relativePointToPolar, relativePolarToPoint
@ -44,7 +47,7 @@ class TrackScenario(StateMachine):
mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play) mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play)
receive_prediction = detected.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing") receive_prediction = detected.to(first_prediction) | substantial.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing")
def __init__(self): def __init__(self):
self._track = None self._track = None
@ -178,6 +181,9 @@ class DrawnScenario(TrackScenario):
self.drawn_positions: List[Tuple[float,float]] = [] self.drawn_positions: List[Tuple[float,float]] = []
self.drawn_pred_history: List[Tuple[float,float]] = [] self.drawn_pred_history: List[Tuple[float,float]] = []
self.drawn_predictions: List[List[Tuple[float,float]]] = [] self.drawn_predictions: List[List[Tuple[float,float]]] = []
self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = []
super().__init__() super().__init__()
def update_drawn_positions(self) -> List: def update_drawn_positions(self) -> List:
@ -198,6 +204,8 @@ class DrawnScenario(TrackScenario):
self.last_update_t = t self.last_update_t = t
# 1. track history, direct update # 1. track history, direct update
MAX_HISTORY = 80
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
positions = self._track.get_projected_history(None, self.camera) positions = self._track.get_projected_history(None, self.camera)
# TODO)) Limit history to N points, or N lenght # TODO)) Limit history to N points, or N lenght
for i, pos in enumerate(self.drawn_positions): for i, pos in enumerate(self.drawn_positions):
@ -219,6 +227,7 @@ class DrawnScenario(TrackScenario):
if self.prediction_track and self.prediction_track.predictions: if self.prediction_track and self.prediction_track.predictions:
# 3. predictions # 3. predictions
prediction_offset = self._track.frame_index - self.prediction_track.frame_index
if len(self.prediction_track.predictions): if len(self.prediction_track.predictions):
for a, drawn_prediction in enumerate(self.drawn_predictions): for a, drawn_prediction in enumerate(self.drawn_predictions):
for i, pos in enumerate(drawn_prediction): for i, pos in enumerate(drawn_prediction):
@ -227,7 +236,7 @@ class DrawnScenario(TrackScenario):
decay = 16 decay = 16
origin = self.drawn_positions[-1] origin = self.drawn_positions[-1]
drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i]) pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i+prediction_offset])
r = exponentialDecay(drawn_r, pred_r, decay, dt) r = exponentialDecay(drawn_r, pred_r, decay, dt)
angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
x, y = relativePolarToPoint(origin, r, angle) x, y = relativePolarToPoint(origin, r, angle)
@ -236,24 +245,75 @@ class DrawnScenario(TrackScenario):
# self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
if len(self.prediction_track.predictions) > len(self.drawn_predictions): if len(self.prediction_track.predictions) > len(self.drawn_predictions):
self.drawn_predictions.extend(self.prediction_track.predictions[len(self.drawn_predictions):]) for pred in self.prediction_track.predictions[len(self.drawn_predictions):]:
self.drawn_predictions.append(pred[prediction_offset:])
# for a, drawn_prediction in self.drawn_predictions: # for a, drawn_prediction in self.drawn_predictions:
# if len(self.pred_coords) > len(self.drawn_predictions): # if len(self.pred_coords) > len(self.drawn_predictions):
# self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):]) # self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):])
def to_renderable_lines(self) -> RenderableLines: def to_renderable_lines(self) -> RenderableLines:
lines = [] lines: List[RenderableLine] = []
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor()) color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions] # positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
points = [RenderablePoint(pos, color) for pos in self.drawn_positions] points = [RenderablePoint(pos, color) for pos in self.drawn_positions[::4]]
lines.append(RenderableLine(points)) lines.append(RenderableLine(points))
if len(self.drawn_predictions): if len(self.drawn_predictions):
color = SrgbaColor(0.,0.6,0.,1.-self.lost_factor()) color = SrgbaColor(0.,0.5,0.,1.-self.lost_factor())
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions] # positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
points = [RenderablePoint(pos, color) for pos in self.drawn_predictions[0]] points = [RenderablePoint(pos, color) for pos in self.drawn_predictions[0]]
lines.append(RenderableLine(points)) lines.append(RenderableLine(points))
# # print(self.current_state)
# if self.current_state is self.first_prediction or self.current_state is self.corrected_prediction:
# shape = np.array(shapes.YOUR if time.time() % 2 > 1 else shapes.FUTURE)
# text = "your" if time.time() % 2 > 1 else "future"
# color = SrgbaColor(0.5,0.5,0.,1.-self.lost_factor())
# line = self.get_text_lines(text, shape, color)
# if not line:
# POSITION_INDEX = 50
# draw_pos = self.drawn_predictions[0][POSITION_INDEX-1]
# current_pos = self.drawn_positions[-1]
# angle = np.arctan2(draw_pos[0]-current_pos[0], draw_pos[1]-current_pos[1]) + np.pi
# # for i, line in enumerate(shape):
# # if i != 0:
# # continue
# points = np.array(shape[:,:2])
# avg_x = np.average(points[:,0])
# avg_y = np.average(points[:,1])
# minx, maxx = np.min(points[:,0]), np.max(points[:,0])
# miny, maxy = np.min(points[:,1]), np.max(points[:,1])
# sx = maxx-minx
# sy = maxy-miny
# points[:,0] -= avg_x
# points[:,1] -= avg_y
# points /= (sx * 1.5) # scale to 1
# points @= rotateMatrix(angle)
# points += draw_pos
# points = [RenderablePoint.from_list(pos, color.with_alpha(intensity)) for pos, intensity in zip(points, shape[:,2])]
# self.drawn_text = text
# self.drawn_text_lines = [RenderableLine(points)]
# lines.extend(self.drawn_text_lines)
return lines return lines
def get_text_lines(self, text, shape, color):
if self.drawn_text == text:
return self.drawn_text_lines
return None
# drawn_pred_history # drawn_pred_history
# drawn_predictions # drawn_predictions
@ -277,6 +337,9 @@ class SrgbaColor():
blue: float blue: float
alpha: float alpha: float
def with_alpha(self, alpha: float) -> SrgbaColor:
return SrgbaColor(self.red, self.green, self.blue, alpha)
@dataclass @dataclass
class RenderablePoint(): class RenderablePoint():
position: RenderablePosition position: RenderablePosition

View file

@ -465,6 +465,7 @@ class Tracker:
track = self.tracks[detection.track_id] track = self.tracks[detection.track_id]
track.track_id = detection.track_id # for new tracks track.track_id = detection.track_id # for new tracks
track.fps = frame.camera.fps track.fps = frame.camera.fps
track.frame_index = frame.index
# track.fps = self.config.camera.fps # for new tracks # track.fps = self.config.camera.fps # for new tracks
track.history.append(detection) # add to history track.history.append(detection) # add to history
@ -514,7 +515,7 @@ class Tracker:
end_time = None end_time = None
tracker_dt = None tracker_dt = None
w_time = None w_time = None
displacement_filter = FinalDisplacementFilter(.2) displacement_filter = FinalDisplacementFilter(.8)
while self.is_running.is_set(): while self.is_running.is_set():
with timer_counter.get_lock(): with timer_counter.get_lock():