laser drawing WIP

This commit is contained in:
Ruben van de Ven 2025-05-12 18:56:35 +02:00
parent 4c126876b1
commit d1703a7a86
5 changed files with 145 additions and 61 deletions

View file

@ -381,12 +381,20 @@ class Track:
frame_nr = a.frame_nr + g frame_nr = a.frame_nr + g
new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class)) new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class))
return self.get_with_new_history(new_history)
def get_with_new_history(self, new_history: List[Detection]):
return Track( return Track(
self.track_id, self.track_id,
new_history, new_history,
self.predictor_history, self.predictor_history,
self.predictions, self.predictions,
self.fps) self.fps,
self.source,
self.lost,
self.created_at,
self.frame_index)
def is_complete(self): def is_complete(self):
diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])] diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])]
@ -405,7 +413,11 @@ class Track:
t.history[offset::step_size], t.history[offset::step_size],
t.predictor_history, t.predictor_history,
t.predictions, t.predictions,
t.fps/step_size) t.fps/step_size,
self.source,
self.lost,
self.created_at,
self.frame_index)
def get_simplified_history(self, distance: float, camera: Camera) -> list[tuple[float, float]]: def get_simplified_history(self, distance: float, camera: Camera) -> list[tuple[float, float]]:
# TODO)) Simplify to get a point every n-th meter # TODO)) Simplify to get a point every n-th meter

View file

@ -2,6 +2,7 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
import math
from typing import List, Tuple from typing import List, Tuple
from simplification.cutil import simplify_coords_idx, simplify_coords_vw_idx from simplification.cutil import simplify_coords_idx, simplify_coords_vw_idx
@ -65,3 +66,23 @@ class RenderableLines():
[line.as_simplified(method) for line in self.lines] [line.as_simplified(method) for line in self.lines]
) )
def circle_arc(cx, cy, r, t, l, c: SrgbaColor):
"""
draw an cirlce arc, around point cx,cy, with radius r
for l*2pi, offset by t. Both t and l are 0<= [t,l] <= 1
"""
resolution = 40
steps = int(resolution * l)
offset = int(resolution * t)
pointlist: list[RenderablePoint] = []
for i in range(offset, offset+steps):
x = cx + math.cos(i * (2*math.pi)/resolution) * r
y = cy + math.sin(i * (2*math.pi)/resolution)* r
pointlist.append(RenderablePoint((x, y), c))
return RenderableLine(pointlist)

View file

@ -294,6 +294,8 @@ class PredictionServer:
data = self.trajectory_socket.recv() data = self.trajectory_socket.recv()
# print('recv tracker frame') # print('recv tracker frame')
frame: Frame = pickle.loads(data) frame: Frame = pickle.loads(data)
# print('indexrecv', [frame.tracks[t].frame_index for t in frame.tracks])
# trajectory_data = {t.track_id: t.get_projected_history_as_dict(frame.H) for t in frame.tracks.values()} # trajectory_data = {t.track_id: t.get_projected_history_as_dict(frame.H) for t in frame.tracks.values()}
# trajectory_data = json.loads(data) # trajectory_data = json.loads(data)
# logger.debug(f"Receive {frame.index}") # logger.debug(f"Receive {frame.index}")
@ -482,6 +484,8 @@ class PredictionServer:
frame.maps = list([m.cpu().numpy() for m in maps.values()]) if maps else None frame.maps = list([m.cpu().numpy() for m in maps.values()]) if maps else None
# print('index', [frame.tracks[t].frame_index for t in frame.tracks])
self.send_frame(frame) self.send_frame(frame)
logger.info('Stopping') logger.info('Stopping')

View file

@ -18,8 +18,8 @@ from sgan.sgan import data
from trap import shapes from trap import shapes
from trap.base import DataclassJSONEncoder, Frame, Track from trap.base import DataclassJSONEncoder, Frame, Track
from trap.counter import CounterSender from trap.counter import CounterSender
from trap.laser_renderer import rotateMatrix from trap.laser_renderer import circle_points, rotateMatrix
from trap.lines import RenderableLine, RenderableLines, RenderablePoint, SrgbaColor from trap.lines import RenderableLine, RenderableLines, RenderablePoint, SrgbaColor, circle_arc
from trap.node import Node from trap.node import Node
from trap.timer import Timer from trap.timer import Timer
from trap.utils import exponentialDecay, relativePointToPolar, relativePolarToPoint from trap.utils import exponentialDecay, relativePointToPolar, relativePolarToPoint
@ -34,7 +34,7 @@ class ScenarioScene(Enum):
LOST = -1 LOST = -1
LOST_FADEOUT = 3 LOST_FADEOUT = 3
PREDICTION_INTERVAL: float|None = 1 PREDICTION_INTERVAL: float|None = 15 # frames
PREDICTION_FADE_IN: float = 3 PREDICTION_FADE_IN: float = 3
PREDICTION_FADE_SLOPE: float = -10 PREDICTION_FADE_SLOPE: float = -10
PREDICTION_FADE_AFTER_DURATION: float = 10 # seconds PREDICTION_FADE_AFTER_DURATION: float = 10 # seconds
@ -46,7 +46,7 @@ TRACK_FADE_ASSUME_FPS = 12
# Don't render the first n points of the prediction, # Don't render the first n points of the prediction,
# helps to avoid jitter in the line transition # helps to avoid jitter in the line transition
PREDICTION_OFFSET = int(TRACK_FADE_ASSUME_FPS * PREDICTION_INTERVAL * .8) # PREDICTION_OFFSET = int(TRACK_FADE_ASSUME_FPS * PREDICTION_INTERVAL * .8)
class TrackScenario(StateMachine): class TrackScenario(StateMachine):
detected = State(initial=True) detected = State(initial=True)
@ -67,8 +67,9 @@ class TrackScenario(StateMachine):
def __init__(self): def __init__(self):
self._track = None self._track = None
self.first_prediction_track: Optional[Track] = None # self.first_prediction_track: Optional[Track] = None
self.prediction_track: Optional[Track] = None # self.prediction_track: Optional[Track] = None
self._predictions: List[Track] = []
super().__init__() super().__init__()
def track_is_long(self, track: Track): def track_is_long(self, track: Track):
@ -84,7 +85,7 @@ class TrackScenario(StateMachine):
def prediction_is_stale(self, track: Track): def prediction_is_stale(self, track: Track):
# TODO use displacement instead of time # TODO use displacement instead of time
return bool(self.prediction_track and self.prediction_track.created_at < (time.time() - 2)) return bool(len(self._predictions) and self._predictions[-1].created_at < (time.time() - 2))
def prediction_is_playing(self, Track): def prediction_is_playing(self, Track):
return False return False
@ -105,20 +106,20 @@ class TrackScenario(StateMachine):
# state change is optional # state change is optional
pass pass
def set_prediction(self, track: Track): def add_prediction(self, track: Track):
if not self._track: if not self._track:
# in case of the unlikely event that prediction was passed sooner # in case of the unlikely event that prediction was passed sooner
self.set_track(track) self.set_track(track)
if not self.first_prediction_track: # if not self.first_prediction_track:
self.first_prediction_track = track # self.first_prediction_track = track
if PREDICTION_INTERVAL is not None and self.prediction_track and (track.created_at - self.prediction_track.created_at) < PREDICTION_INTERVAL: if PREDICTION_INTERVAL is not None and len(self._predictions) and (track.frame_index - self._predictions[-1].frame_index) < PREDICTION_INTERVAL:
# just drop tracks if the predictions come to quick # just drop tracks if the predictions come to quick
return return
self.prediction_track = track self._predictions.append(track)
try: try:
self.receive_prediction(track) self.receive_prediction(track)
except TransitionNotAllowed as e: except TransitionNotAllowed as e:
@ -140,9 +141,10 @@ class TrackScenario(StateMachine):
def after_receive_prediction(self, track: Track): def after_receive_prediction(self, track: Track):
# after # after
self.prediction_track = track pass
if not self.first_prediction_track: # self.prediction_track = track
self.first_prediction_track = track # if not self.first_prediction_track:
# self.first_prediction_track = track
def on_enter_corrected_prediction(self): def on_enter_corrected_prediction(self):
print('corrected!') print('corrected!')
@ -200,6 +202,8 @@ class DrawnScenario(TrackScenario):
self.drawn_text = "" self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = [] self.drawn_text_lines: List[RenderableLine] = []
self.anomly_score = .3 # TODO: variable
super().__init__() super().__init__()
def update_drawn_positions(self) -> List: def update_drawn_positions(self) -> List:
@ -222,67 +226,98 @@ class DrawnScenario(TrackScenario):
# 1. track history, direct update # 1. track history, direct update
MAX_HISTORY = 80 MAX_HISTORY = 80
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:] # positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
positions = self._track.get_projected_history(None, self.camera) self.drawn_positions = self._track.get_projected_history(None, self.camera)
# TODO)) Limit history to N points, or N lenght # TODO)) Limit history to N points, or N lenght
for i, pos in enumerate(self.drawn_positions): # for i, pos in enumerate(self.drawn_positions):
self.drawn_positions[i][0] = positions[i][0] # self.drawn_positions[i][0] = positions[i][0]
self.drawn_positions[i][1] = positions[i][1] # self.drawn_positions[i][1] = positions[i][1]
if len(positions) > len(self.drawn_positions): # if len(positions) > len(self.drawn_positions):
self.drawn_positions.extend(positions[len(self.drawn_positions):]) # self.drawn_positions.extend(positions[len(self.drawn_positions):])
if self.prediction_track and self.prediction_track.predictor_history:
# 2. history as seen by predictor (Trajectron) # 2. history as seen by predictor (Trajectron)
for i, pos in enumerate(self.drawn_pred_history): # if self.prediction_track and self.prediction_track.predictor_history:
if len(self.prediction_track.predictor_history) > i: # for i, pos in enumerate(self.drawn_pred_history):
self.drawn_pred_history[i][0] = int_or_not(exponentialDecay(self.drawn_pred_history[i][0], self.prediction_track.predictor_history[i][0], 16, dt)) # if len(self.prediction_track.predictor_history) > i:
self.drawn_pred_history[i][1] = int_or_not(exponentialDecay(self.drawn_pred_history[i][1], self.prediction_track.predictor_history[i][1], 16, dt)) # self.drawn_pred_history[i][0] = int_or_not(exponentialDecay(self.drawn_pred_history[i][0], self.prediction_track.predictor_history[i][0], 16, dt))
# self.drawn_pred_history[i][1] = int_or_not(exponentialDecay(self.drawn_pred_history[i][1], self.prediction_track.predictor_history[i][1], 16, dt))
if len(self.prediction_track.predictor_history) > len(self.drawn_pred_history): # if len(self.prediction_track.predictor_history) > len(self.drawn_pred_history):
self.drawn_pred_history.extend(positions[len(self.drawn_pred_history):]) # self.drawn_pred_history.extend(positions[len(self.drawn_pred_history):])
if self.prediction_track and self.prediction_track.predictions:
# 3. predictions # 3. predictions
prediction_offset = self._track.frame_index - self.prediction_track.frame_index self.drawn_predictions = []
if len(self.prediction_track.predictions): for a, (ptrack, next_ptrack) in enumerate(zip(self._predictions, [*self._predictions[1:], None])):
for a, drawn_prediction in enumerate(self.drawn_predictions):
for i, pos in enumerate(drawn_prediction): prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track
# TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) if next_ptrack is not None:
decay = max(3, (18/i) if i else 10) # points further away move with more delay # not the last one, cut off
decay = 16 next_ptrack: Track = self._predictions[a+1]
origin = self.drawn_positions[-1] end_step = next_ptrack.frame_index - ptrack.frame_index
drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) else:
pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i+prediction_offset]) end_step = None # not last item; show all
r = exponentialDecay(drawn_r, pred_r, decay, dt) self.drawn_predictions.append(ptrack.predictions[0][:end_step])
angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
x, y = relativePolarToPoint(origin, r, angle)
self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) # print(self.drawn_predictions)
# line = []
# for i, pos in enumerate(ptrack.predictions):
# line.append((ptrack.predictions[i][0], ptrack.predictions[i][1]))
# print(line)
# if len(self.drawn_predictions) <= a:
# self.drawn_predictions.append(line)
# else:
# self.drawn_predictions[a] = line
# if self.prediction_track and self.prediction_track.predictions:
# prediction_offset = self._track.frame_index - self.prediction_track.frame_index
# if len(self.prediction_track.predictions):
# for a, drawn_prediction in enumerate(self.drawn_predictions):
# for i, pos in enumerate(drawn_prediction):
# # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
# decay = max(3, (18/i) if i else 10) # points further away move with more delay
# decay = 16
# origin = self.drawn_positions[-1]
# drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
# pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i+prediction_offset])
# r = exponentialDecay(drawn_r, pred_r, decay, dt)
# angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
# x, y = relativePolarToPoint(origin, r, angle)
# self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
# if len(self.prediction_track.predictions) > len(self.drawn_predictions):
# for pred in self.prediction_track.predictions[len(self.drawn_predictions):]:
# self.drawn_predictions.append(pred[prediction_offset:])
if len(self.prediction_track.predictions) > len(self.drawn_predictions):
for pred in self.prediction_track.predictions[len(self.drawn_predictions):]:
self.drawn_predictions.append(pred[prediction_offset:])
# for a, drawn_prediction in self.drawn_predictions:
# if len(self.pred_coords) > len(self.drawn_predictions):
# self.drawn_predictions.extend(self.pred_coords[len(self.drawn_predictions):])
def to_renderable_lines(self) -> RenderableLines: def to_renderable_lines(self) -> RenderableLines:
track_age = time.time() - self._track.created_at t = time.time()
track_age = t - self._track.created_at
lines: List[RenderableLine] = []
drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE) drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
# track_age_in_frames = int(track_age * TRACK_FADE_ASSUME_FPS) # track_age_in_frames = int(track_age * TRACK_FADE_ASSUME_FPS)
# track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames # track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames
lines: List[RenderableLine] = []
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor()) color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)] points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points)) lines.append(RenderableLine(points))
anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout
lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color))
if len(self.drawn_predictions): if len(self.drawn_predictions):
color = SrgbaColor(0.,1,0.,1.-self.lost_factor()) color = SrgbaColor(0.,1,0.,1.-self.lost_factor())
prediction_track_age = time.time() - self.first_prediction_track.created_at prediction_track_age = time.time() - self._predictions[0].created_at
t_factor = prediction_track_age / PREDICTION_FADE_IN t_factor = prediction_track_age / PREDICTION_FADE_IN
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions] # positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
for drawn_prediction in self.drawn_predictions: for drawn_prediction in self.drawn_predictions:
@ -302,7 +337,8 @@ class DrawnScenario(TrackScenario):
# colors = [color.as_faded(a1*a2) for a1, a2 in zip(alphas1, alphas2)] # colors = [color.as_faded(a1*a2) for a1, a2 in zip(alphas1, alphas2)]
points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])] # points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])]
points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction, colors)]
lines.append(RenderableLine(points)) lines.append(RenderableLine(points))
# # print(self.current_state) # # print(self.current_state)
@ -353,6 +389,16 @@ class DrawnScenario(TrackScenario):
return self.drawn_text_lines return self.drawn_text_lines
return None return None
# def circle_points(cx, cy, r, c: Color): PointList
# # r = r
# steps = 30
# pointlist: list[LaserPoint] = []
# for i in range(steps):
# x = int(cx + math.cos(i * (2*math.pi)/steps) * r)
# y = int(cy + math.sin(i * (2*math.pi)/steps)* r)
# pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0)))
# return pointlist
def points_fade_out_alpha_mask(positions: List, track_age_seconds: float, fade_after_duration: float, fade_frames: int, no_frame_max=False): def points_fade_out_alpha_mask(positions: List, track_age_seconds: float, fade_after_duration: float, fade_frames: int, no_frame_max=False):
@ -442,7 +488,7 @@ class Stage(Node):
try: try:
prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK) prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in prediction_frame.tracks.items(): for track_id, track in prediction_frame.tracks.items():
self.scenarios[track_id].set_prediction(track) self.scenarios[track_id].add_prediction(track)
except zmq.ZMQError as e: except zmq.ZMQError as e:
self.logger.debug(f'reuse prediction') self.logger.debug(f'reuse prediction')

View file

@ -772,7 +772,8 @@ class Smoother:
self.smoother.smooth(hs) self.smoother.smooth(hs)
hs = self.smoother.smooth_data[0] hs = self.smoother.smooth_data[0]
new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)] new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)]
return Track(track.track_id, new_history, track.predictor_history, track.predictions, track.fps) return track.get_with_new_history(new_history)
# return Track(track.track_id, new_history, track.predictor_history, track.predictions, track.fps)
def smooth_frame_tracks(self, frame: Frame) -> Frame: def smooth_frame_tracks(self, frame: Frame) -> Frame:
new_tracks = [] new_tracks = []