trap/trap/stage.py
2025-10-14 17:24:35 +02:00

1140 lines
No EOL
46 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from argparse import ArgumentParser
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
import json
import logging
import math
import pickle
import time
from typing import Dict, List, Optional, Tuple
from matplotlib.pyplot import isinteractive
import numpy as np
from shapely import LineString, MultiLineString, line_locate_point, linestrings
from shapely.ops import substring
from statemachine import Event, State, StateMachine
from statemachine.exceptions import TransitionNotAllowed
import zmq
from sgan.sgan import data
from trap import shapes
from trap.base import Camera, DataclassJSONEncoder, DistortedCamera, Frame, ProjectedTrack, Track
from trap.counter import CounterSender
from trap.laser_renderer import circle_points, rotateMatrix
from trap.lines import RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SimplifyMethod, SrgbaColor, circle_arc
from trap.node import Node
from trap.timer import Timer
from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointToPolar, relativePolarToPoint
from noise import snoise2
logger = logging.getLogger('trap.stage')
Coordinate = Tuple[float, float]
DeltaT = float # delta_t in seconds
OPTION_POSITION_MARKER = False
OPTION_GROW_ANOMALY_CIRCLE = False
# OPTION_RENDER_DIFF_SEGMENT = True
class LineGenerator(ABC):
@abstractmethod
def update_drawn_positions(self, dt: DeltaT):
pass
def as_renderable(self, color: SrgbaColor) -> RenderableLines:
points = [RenderablePoint(p, color) for p in self._drawn_points]
lines = [RenderableLine(points)]
return RenderableLines(lines)
class AppendableLine(LineGenerator):
"""
A line generator that allows for points to be added over time.
Simply use `line.points.extend([p1, p2])`
"""
def __init__(self, points: Optional[List[Coordinate]] = None, draw_decay_speed = 25.):
self.points: List[Coordinate] = points if points is not None else [] # when providing [] as default, it messes up instancing by reusing the same list
self._drawn_points = []
self.ready = len(self.points) == 0
self.draw_decay_speed = draw_decay_speed
def nr_of_passed_points(self):
"""The number of points passed in the animation"""
return len(self._drawn_points) - 1
def update_drawn_positions(self, dt: DeltaT):
if len(self.points) == 0:
# nothing to draw yet
return
# self._drawn_points = self.points
if len(self._drawn_points) == 0:
# create origin
self._drawn_points.append(self.points[0])
# and drawing head
self._drawn_points.append(self.points[0])
idx = len(self._drawn_points) - 1
target = self.points[idx]
if np.isclose(self._drawn_points[-1], target, atol=.05).all():
# TODO: might want to migrate to np.isclose()
if len(self._drawn_points) == len(self.points):
self.ready = True
return # done until a new point is added
# add new point as drawing head
self._drawn_points.append(self._drawn_points[-1])
self.ready = False
x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.draw_decay_speed, dt, .05)
y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.draw_decay_speed, dt, .05)
self._drawn_points[-1] = (float(x), float(y))
class ProceduralChain(LineGenerator):
"""A line that can be 'dragged' to a target. In which
it disappears."""
MOVE_DECAY_SPEED = 80 # speed at which the drawing head should approach the next point
VELOCITY_DAMPING = 10
VELOCITY_FACTOR = 2
link_size = .1 # 10cm
# angle_constraint = 5
def __init__(self, joints: List[Coordinate], scenario: DrawnScenario, use_velocity = False):
self.joints: List[Coordinate] = joints
self.target: Coordinate = joints[-1]
self.ready = False
self.move_decay_speed = self.MOVE_DECAY_SPEED
self.scenario = scenario
self.use_velocity = use_velocity
if self.use_velocity:
if len(self.joints) > 1:
self.v = np.array(self.joints[-2]) - np.array(self.joints[-1])
self.v /= np.linalg.norm(self.v) / 10
else:
self.v = np.array([0,0])
@classmethod
def from_appendable_line(cls, al: AppendableLine, scenario: DrawnScenario) -> ProceduralChain:
# TODO: create more segments:
# last added points becomes the head of the chain
points = list(reversed(al.points))
linestring = LineString(points)
linestring = linestring.segmentize(cls.link_size)
joints = list(linestring.coords)
return cls(joints, scenario)
def update_drawn_positions(self, dt: DeltaT):
if self.ready:
return
# direction = np.array(self.joints[-1] - self.target)
# TODO: check self.joints empty, and stop then
if self.use_velocity:
vx = exponentialDecayRounded(self.v[0], self.target[0] - self.joints[0][0], self.VELOCITY_DAMPING, dt, .05)
vy = exponentialDecayRounded(self.v[1], self.target[1] - self.joints[0][1], self.VELOCITY_DAMPING, dt, .05)
self.v = np.array([vx, vy])
self.joints[0] = (float(self.joints[0][0] + self.v[0] * dt * self.VELOCITY_FACTOR), float(self.joints[0][1] + self.v[1] * dt * self.VELOCITY_FACTOR))
else:
x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05)
y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05)
self.joints[0] = (float(x), float(y))
# Loop inspired by: https://github.com/argonautcode/animal-proc-anim/blob/main/Chain.pde
# see that code for angle constrains.
for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1):
diff = np.array(prev_joint) - np.array(joint)
direction = diff / np.linalg.norm(diff)
self.joints[i] = prev_joint - direction * self.link_size
if np.isclose(self.joints[0], self.target, atol=.05).all():
# self.ready = True
# TODO: smooth transition instead of cutting off
self.joints.pop(0)
self.scenario.add_anomaly_length(self.link_size)
if len(self.joints) == 0:
self.ready = True
self._drawn_points = self.joints
class DiffSegment():
"""
A segment of a prediction track, that can be diffed
with a track. The track is continously update.
If a new prediction comes in, the diff is marked as
finished. After which it is animated and added to the
Scenario's anomaly score.
"""
DRAW_DECAY_SPEED = 25
POINT_INTERVAL = 4
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._last_diff_frame_idx = 0
self.finished = False
self.line = AppendableLine( draw_decay_speed=self.DRAW_DECAY_SPEED)
self.points: List[Coordinate] = []
self._drawn_points = []
self._target_track = prediction
def finish(self):
self.finished = True
def nr_of_passed_points(self):
if isinstance(self.line, AppendableLine):
return self.line.nr_of_passed_points() * self.POINT_INTERVAL
else:
return len(self.points) * self.POINT_INTERVAL
# run on each track update received
def update_track(self, track: ProjectedTrack):
self._target_track = track
if self.finished:
# don't add new points if finished
return
# migrate SceneraioScene function
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back:
logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
else:
trajectory = track.projected_history
# from start to as far as it gets
trajectory_range = trajectory[-1*traj_diff_steps_back:]
prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
line = []
for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)):
offset_from_start = (pred_diff_steps_forward + i)
if offset_from_start % self.POINT_INTERVAL == 0:
self.line.points.extend([p1, p2])
self.points.extend([p1, p2])
self._last_diff_frame_idx = track.frame_index
# run each render tick
def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario):
if isinstance(self.line, AppendableLine):
if self.finished and self.line.ready:
# convert when fully drawn
# print(self, "CONVERT LINE")
self.line = ProceduralChain.from_appendable_line(self.line, scenario)
if isinstance(self.line, ProceduralChain):
self.line.target = self._target_track.projected_history[-1]
# if not self.finished or not self.line.ready:
self.line.update_drawn_positions(dt)
def as_renderable(self) -> RenderableLines:
color = SrgbaColor(0,0,1,1)
# lines = []
# points = [RenderablePoint(p, color) for p in self._drawn_points]
# lines = [RenderableLine(points)]
# return RenderableLines(lines)
if not self.finished or not self.line.ready:
return self.line.as_renderable(color)
return self.line.as_renderable(color)
# points = [RenderablePoint(p, color) for p in self._drawn_points]
# lines = [RenderableLine(points)]
return RenderableLines([])
class DiffSegmentScan(DiffSegment):
"""
Provide alternative diffing, in the form of a sort of scan line
Should be faster with the laser
TODO: This is work in progress, does not work yet!
"""
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._target_track = prediction
self.finished = False
self._last_diff_frame_idx = 0
def finish(self):
self.finished = True
def prediction_offset(self):
"""Difference is starting moment between track and prediction"""
return self.ptrack.frame_index - self._target_track.frame_index
def nr_of_passed_points(self):
"""Number of points of the given ptrack that have passed"""
return len(self._target_track.projected_history) - 1 - self.prediction_offset()
# len(self.points) * self.POINT_INTERVAL
# run on each track update received
def update_track(self, track: ProjectedTrack):
self._target_track = track
if self.finished:
# don't add new points if finished
return
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
self._last_diff_frame_idx = track.frame_index
# run each render tick
def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario):
# if not self.finished or not self.line.ready:
# self.line.update_drawn_positions(dt)
pass # TODO: use easing
def as_renderable(self) -> RenderableLines:
if self.finished:
return RenderableLines([])
color = SrgbaColor(0,0,1,1)
# steps_diff = self.nr_of_passed_points()
idx = self.nr_of_passed_points()
if len(self.ptrack.predictions[0]) < idx+1:
self.finish()
return RenderableLines([])
points = [self._target_track.projected_history[-1], self.ptrack.predictions[0][idx]]
points = [RenderablePoint(pos, color) for pos in points]
line = RenderableLine(points)
return RenderableLines([line])
class ScenarioScene(Enum):
DETECTED = 1
FIRST_PREDICTION = 2
CORRECTED_PREDICTION = 3
LOITERING = 4
PLAY = 4
LOST = -1
LOST_FADEOUT = 3
PREDICTION_INTERVAL: float|None = 10 # frames
PREDICTION_FADE_IN: float = 3
PREDICTION_FADE_SLOPE: float = -10
PREDICTION_FADE_AFTER_DURATION: float = 10 # seconds
PREDICTION_END_FADE = 2 #frames
# TRACK_MAX_POINTS = 100
TRACK_FADE_AFTER_DURATION = 8. # seconds
TRACK_END_FADE = 30 # points
TRACK_FADE_ASSUME_FPS = 12
# Don't render the first n points of the prediction,
# helps to avoid jitter in the line transition
# PREDICTION_OFFSET = int(TRACK_FADE_ASSUME_FPS * PREDICTION_INTERVAL * .8)
class TrackScenario(StateMachine):
detected = State(initial=True)
substantial = State()
first_prediction = State()
corrected_prediction = State()
loitering = State()
play = State()
lost = State(final=True)
receive_track = lost.from_(
detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost"
) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long")
mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play)
receive_prediction = detected.to(first_prediction) | substantial.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing")
def __init__(self):
self.track: ProjectedTrack = None
self.camera: Optional[Camera] = None
# self.first_prediction_track: Optional[Track] = None
# self.prediction_track: Optional[Track] = None
self.predictions: List[Track] = []
self._last_diff_frame_idx: Optional[int] = 0
self.diffs: List[Tuple[Coordinate, Coordinate]] = []
self.prediction_diffs: List[DiffSegment] = []
super().__init__()
def track_is_long(self, track: ProjectedTrack):
return len(track.history) > 20
def track_is_lost(self, track: ProjectedTrack):
# return self._track and self._track.created_at < time.time() - 5
return track.lost # Note, for now this is not implemented in the tacker, see check_lost()
def track_is_loitering(self, track: ProjectedTrack):
# TODO)) Change to measure displacement over the last n seconds
return len(track.history) > (track.fps * 60) # seconds after which someone is loitering
def prediction_is_stale(self, track: ProjectedTrack):
# TODO use displacement instead of time
return bool(len(self.predictions) and self.predictions[-1].created_at < (time.time() - 2))
def prediction_is_playing(self, track):
return False
def check_lost(self):
if self.current_state is not self.lost and self.track and self.track.updated_at < time.time() - 5:
self.mark_lost()
def set_track(self, track: ProjectedTrack):
if self.track and self.track.created_at > track.created_at:
# ignore old track
return
self.track = track
self.update_prediction_diff()
# check to change state
try:
self.receive_track(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def update_prediction_diff(self):
"""
gather the diffs of the trajectory with the most recent prediction
"""
if len(self.prediction_diffs) == 0:
return
for diff in self.prediction_diffs:
diff.update_track(self.track)
def add_prediction(self, track: ProjectedTrack):
if not self.track:
# in case of the unlikely event that prediction was passed sooner
self.set_track(track)
# if not self.first_prediction_track:
# self.first_prediction_track = track
if PREDICTION_INTERVAL is not None and len(self.predictions) and (track.frame_index - self.predictions[-1].frame_index) < PREDICTION_INTERVAL:
# just drop tracks if the predictions come to quick
return
if track._track.predictions is None or not len(track._track.predictions):
# don't count to predictions if no prediction is set of given track (e.g. young tracks)
return
self.predictions.append(track)
if len(self.prediction_diffs):
self.prediction_diffs[-1].finish() # existing diffing can end
# and create a new one
self.prediction_diffs.append(DiffSegment(track))
# self.prediction_diffs.append(DiffSegmentScan(track))
# check to change state
try:
self.receive_prediction(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def after_receive_track(self, track: ProjectedTrack):
print('changed state')
def on_receive_track(self, track: ProjectedTrack):
# on event, only runs once, upon first track
print('updating track!')
def on_receive_prediction(self, track: ProjectedTrack):
# on event, because it happens for every receive, despite transition
print('updating prediction!')
# self.track = track
def after_receive_prediction(self, track: ProjectedTrack):
# after
pass
# self.prediction_track = track
# if not self.first_prediction_track:
# self.first_prediction_track = track
def on_enter_corrected_prediction(self):
print('corrected!')
def on_enter_detected(self):
print("DETECTED!")
def on_enter_first_prediction(self):
print("Hello!")
def on_enter_detected(self):
print(f"enter {self.current_state.id}")
def on_enter_substantial(self):
print(f"enter {self.current_state.id}")
def on_enter_first_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_corrected_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_loitering(self):
print(f"enter {self.current_state.id}")
def on_enter_play(self):
print(f"enter {self.current_state.id}")
def on_enter_lost(self):
print(f"enter {self.current_state.id}")
self.lost_at = time.time()
def lost_for(self):
if self.current_state is self.lost:
return time.time() - self.lost_at
return None
def lost_factor(self):
l = self.lost_for()
if not l:
return 0
return l/LOST_FADEOUT
def to_lines(self) -> List[RenderableLine]:
raise RuntimeError("Not implemented yet")
class DrawnScenario(TrackScenario):
"""
Scenario contains the controls (scene, target positions)
DrawnScenario class does the actual drawing of points incl. transitions
"""
ANOMALY_DECAY = .2 # speed with which the cirlce shrinks over time
DISTANCE_ANOMALY_FACTOR = .03 # the ammount to which the difference counts to the anomaly score
MAX_HISTORY = 100 # points of history of trajectory to display (preventing too long lines)
CUT_GAP = 5 # when adding a new prediction, keep the existing prediction until that point + this CUT_GAP margin
def __init__(self):
# self.created_at = time.time()
# self.track_id = track_id
self.last_update_t = time.perf_counter()
self.drawn_position: Optional[Coordinate] = None
self.drawn_positions: List[Coordinate] = []
self.drawn_pred_history: List[Coordinate] = []
self.drawn_predictions: List[List[Coordinate]] = []
self._current_drawn_prediction: List[Coordinate] = []
self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = []
self.anomaly_score = 0 # TODO: variable
self._drawn_anomaly_score = 0
super().__init__()
def add_anomaly_length(self, length: float):
"""
append a difference in meters between point
"""
self.anomaly_score += length * self.DISTANCE_ANOMALY_FACTOR
if self.anomaly_score > 1:
self.anomaly_score = 1.
def decay_anomaly_score(self, dt: DeltaT):
if self.anomaly_score == 0:
return
self.anomaly_score = exponentialDecay(self.anomaly_score, 0, self.ANOMALY_DECAY, dt)
def update_drawn_positions(self) -> List:
'''
use dt to lerp the drawn positions in the direction of current prediction
'''
# TODO: make lerp, currently quick way to get results
def int_or_not(v):
"""quick wrapper to toggle int'ing"""
return v
# return int(v)
# 0. calculate dt
# if dt is None:
t = time.perf_counter()
dt: DeltaT = t - self.last_update_t
self.last_update_t = t
# 0. Update anomaly, slowly decreasing it over time
self.decay_anomaly_score(dt)
# for diff in self.prediction_diffs:
# diff.update_drawn_positions(dt, self)
# 1. track history, direct update
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
# self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:]
self.drawn_positions = self.track.projected_history
if self.drawn_position is None:
self.drawn_position = self.drawn_positions[-1]
else:
self.drawn_position[0] = exponentialDecay(self.drawn_position[0], self.drawn_positions[-1][0], 13, dt)
self.drawn_position[1] = exponentialDecay(self.drawn_position[1], self.drawn_positions[-1][1], 13, dt)
# 3. predictions
if len(self.drawn_predictions) < len(self.predictions):
# first prediction
if len(self.drawn_predictions) == 0:
last_pred = self.predictions[-1]
self.drawn_predictions.append(last_pred.predictions[0])
else:
# if a new prediction has arised, transition from existing one.
# First, cut existing prediction
# CUT_GAP indicates that some is lost in the transition, to prevent glitches when velocity of person changes
end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP
keep = self.drawn_predictions[-1][end_step:]
last_item: Coordinate = (keep)[-1]
self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step] # cut the old part
# print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep))
# duplicate last item, so the new one has the same nr. of points as the incoming prediction (so it can actually transition)
ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep))
keep.extend(ext)
self.drawn_predictions.append(keep)
for a, drawn_prediction in enumerate(self.drawn_predictions):
# origin = self.predictions[a].predictions[0][0]
origin = self.predictions[a].predictions[0][0]
# associated_diff = self.prediction_diffs[a]
# progress = associated_diff.nr_of_passed_points()
for i, pos in enumerate(drawn_prediction):
# TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
decay = max(3, (18/i) if i else 10) # points further away move with more delay
decay = 16
drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
pred_r, pred_angle = relativePointToPolar(origin, self.predictions[a].predictions[0][i])
r = exponentialDecay(drawn_r, pred_r, decay, dt)
# make circular coordinates transition through the smaller arc
if abs(drawn_angle - pred_angle) > math.pi:
pred_angle -= math.pi * 2
angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
x, y = relativePolarToPoint(origin, r, angle)
self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
# self.drawn_predictions = []
# for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])):
# prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track
# if next_ptrack is not None:
# # not the last one, cut off
# next_ptrack: ProjectedTrack = self.predictions[a+1]
# end_step = next_ptrack.frame_index - ptrack.frame_index
# else:
# end_step = None # not last item; show all
# self.drawn_predictions.append(ptrack.predictions[0][:end_step])
# Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s
self._drawn_anomaly_score = exponentialDecay(self._drawn_anomaly_score, self.anomaly_score, 3, dt)
# print(self.drawn_predictions)
# line = []
# for i, pos in enumerate(ptrack.predictions):
# line.append((ptrack.predictions[i][0], ptrack.predictions[i][1]))
# print(line)
# if len(self.drawn_predictions) <= a:
# self.drawn_predictions.append(line)
# else:
# self.drawn_predictions[a] = line
# if self.prediction_track and self.prediction_track.predictions:
# prediction_offset = self._track.frame_index - self.prediction_track.frame_index
# if len(self.prediction_track.predictions):
# for a, drawn_prediction in enumerate(self.drawn_predictions):
# for i, pos in enumerate(drawn_prediction):
# # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
# decay = max(3, (18/i) if i else 10) # points further away move with more delay
# decay = 16
# origin = self.drawn_positions[-1]
# drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
# pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i+prediction_offset])
# r = exponentialDecay(drawn_r, pred_r, decay, dt)
# angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
# x, y = relativePolarToPoint(origin, r, angle)
# self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
# if len(self.prediction_track.predictions) > len(self.drawn_predictions):
# for pred in self.prediction_track.predictions[len(self.drawn_predictions):]:
# self.drawn_predictions.append(pred[prediction_offset:])
def to_renderable_lines(self) -> RenderableLines:
t = time.time()
track_age = t - self.track.updated_at # Should be beginning
lines = RenderableLines([])
# track_age_in_frames = int(track_age * TRACK_FADE_ASSUME_FPS)
# track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames
# 1. Trajectory history
# drawable_points, alphas = self.drawn_positions[:self.MAX_HISTORY], [1]*len(self.drawn_positions)
# perlin/simplex noise
# dt: change speed. Divide to make slower
# amp: amplitude of noise
# frequency: make smaller to make longer waves
noisy_points = apply_perlin_noise_to_line_normal(self.drawn_positions, t/5, .3, .02)
drawable_points, alphas = points_fade_out_alpha_mask(noisy_points, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,1.,1.-self.lost_factor())
# TODO: effect configuration
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
# points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points))
# 2. Position Marker / anomaly score
if OPTION_POSITION_MARKER:
anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout
# lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color))
# last point, (but this draws line in circle, requiring a 'jump back' for the laser)
cx, cy = self.drawn_position[0], self.drawn_position[1],
radius = max(.1, self._drawn_anomaly_score * 1.) if OPTION_GROW_ANOMALY_CIRCLE else .1
steps=0
if len(self.drawn_positions) >= steps:
dx, dy = self.drawn_positions[-1][0] - self.drawn_positions[-steps][0], self.drawn_positions[-1][1] - self.drawn_positions[-steps][1],
diff = np.array([dx,dy])
diff = diff/np.linalg.norm(diff) * radius * 1.1
cx += diff[0]
cy += diff[1]
lines.append(circle_arc(
cx, cy,
radius,
0, 1,
anomaly_marker_color)
)
# 3. Predictions
if len(self.drawn_predictions):
color = SrgbaColor(0.,1,0.,1.-self.lost_factor())
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
for a, drawn_prediction in enumerate(self.drawn_predictions):
if a < (len(self.drawn_predictions) - 1):
# not the newest: fade out:
deprecation_age = t - self.predictions[a+1].created_at
if deprecation_age > PREDICTION_FADE_IN:
# old: skip drawing.
continue
else:
fade_factor = 1 - (deprecation_age / PREDICTION_FADE_IN)
color = color.as_faded(fade_factor)
prediction_track_age = time.time() - self.predictions[a].created_at
t_factor = prediction_track_age / PREDICTION_FADE_IN
associated_diff = self.prediction_diffs[a]
progress = associated_diff.nr_of_passed_points()
# drawn_prediction, alphas1 = points_fade_out_alpha_mask(drawn_prediction, prediction_track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE, no_frame_max=True)
gradient = np.linspace(0, PREDICTION_FADE_SLOPE, len(drawn_prediction))
alphas2 = np.clip(gradient + t_factor * (-1*PREDICTION_FADE_SLOPE), 0, 1)
# print(alphas)
# colors = [color.with_alpha(np.clip(t_factor*3-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))]
# colors = [color.with_alpha(np.clip(t_factor*2-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))]
# apply both fade in and fade out mask:
colors = [color.as_faded(a2) for a2 in alphas2]
# colors = [color.as_faded(a1*a2) for a1, a2 in zip(alphas1, alphas2)]
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])]
points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction, colors)]
points = points[progress//2:]
ls = LineString(drawn_prediction)
if t_factor < 1:
ls = substring(ls, 0, t_factor*ls.length, ls.length)
dashed = dashed_line(ls, 1, .5, t)
# print(dashed)
for line in dashed.geoms:
dash_points = [RenderablePoint(point, color) for point in line.coords]
lines.append(RenderableLine(dash_points))
# lines.append(RenderableLine(points))
# 4. Diffs
# for drawn_diff in self.drawn_diffs:
# color = SrgbaColor(0.,1,1.,1.-self.lost_factor())
# colors = [color.as_faded(1) for a2 in range(len(drawn_diff))]
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)]
# lines.append(RenderableLine(points))
# if OPTION_RENDER_DIFF_SEGMENT:
# for diff in self.prediction_diffs:
# lines.append_lines(diff.as_renderable())
# pass
# # print(self.current_state)
# if self.current_state is self.first_prediction or self.current_state is self.corrected_prediction:
# shape = np.array(shapes.YOUR if time.time() % 2 > 1 else shapes.FUTURE)
# text = "your" if time.time() % 2 > 1 else "future"
# color = SrgbaColor(0.5,0.5,0.5,1.-self.lost_factor())
# line = self.get_text_lines(text, shape, color)
# if not line:
# POSITION_INDEX = 50
# draw_pos = self.drawn_predictions[0][POSITION_INDEX-1]
# current_pos = self.drawn_positions[-1]
# angle = np.arctan2(draw_pos[0]-current_pos[0], draw_pos[1]-current_pos[1]) + np.pi
# # for i, line in enumerate(shape):
# # if i != 0:
# # continue
# points = np.array(shape[:,:2])
# avg_x = np.average(points[:,0])
# avg_y = np.average(points[:,1])
# minx, maxx = np.min(points[:,0]), np.max(points[:,0])
# miny, maxy = np.min(points[:,1]), np.max(points[:,1])
# sx = maxx-minx
# sy = maxy-miny
# points[:,0] -= avg_x
# points[:,1] -= avg_y
# points /= (sx * 1.5) # scale to 1
# points @= rotateMatrix(angle)
# points += draw_pos
# points = [RenderablePoint.from_list(pos, color.with_alpha(intensity)) for pos, intensity in zip(points, shape[:,2])]
# self.drawn_text = text
# self.drawn_text_lines = [RenderableLine(points)]
# lines.extend(self.drawn_text_lines)
return lines
def get_text_lines(self, text, shape, color):
if self.drawn_text == text:
return self.drawn_text_lines
return None
# def circle_points(cx, cy, r, c: Color): PointList
# # r = r
# steps = 30
# pointlist: list[LaserPoint] = []
# for i in range(steps):
# x = int(cx + math.cos(i * (2*math.pi)/steps) * r)
# y = int(cy + math.sin(i * (2*math.pi)/steps)* r)
# pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0)))
# return pointlist
def points_fade_out_alpha_mask(positions: List, track_age_seconds: float, fade_after_duration: float, fade_frames: int, no_frame_max=False):
track_age_in_frames = int(track_age_seconds * TRACK_FADE_ASSUME_FPS)
if not no_frame_max:
track_max_points = int(fade_after_duration * TRACK_FADE_ASSUME_FPS) - track_age_in_frames
else:
if track_age_seconds < fade_after_duration:
track_max_points = len(positions) #+ fade_frames
else:
FADE_DURATION = 2
t_fade = max(0, track_age_seconds - fade_after_duration) / FADE_DURATION
track_max_points = int(t_fade * len(positions) / FADE_DURATION)
drawable_points = positions[-track_max_points:]
alphas = []
for i, point in enumerate(drawable_points):
reverse_i = len(drawable_points) - i
fade_i = reverse_i - (track_max_points - fade_frames) # -90
fade_i /= fade_frames # / 10
fade_i = np.clip(fade_i, 0, 1)
alpha = 1 - fade_i
if alpha > 0:
alphas.append(alpha)
return drawable_points, alphas
# drawn_pred_history
# drawn_predictions
# @dataclass
# class RenderablePosition():
# x: float
# y: float
# @classmethod
# def from_list(cls, l: List[float, float]) -> RenderablePosition:
# return cls(x = float(l[0]), y=float(l[1]))
# TODO)) Or Shapely point?
class Stage(Node):
"""
Render a stage, on which different TrackScenarios take place to a
single image of lines. Which can be passed to different renderers
E.g. the laser or image renderers.
"""
FPS = 60
def setup(self):
# self.scenarios: List[DrawnScenario] = []
self.scenarios: Dict[str, DrawnScenario] = defaultdict(lambda: DrawnScenario())
self.trajectory_sock = self.sub(self.config.zmq_trajectory_addr)
self.prediction_sock = self.sub(self.config.zmq_prediction_addr)
self.stage_sock = self.pub(self.config.zmq_stage_addr)
self.counter = CounterSender()
self.camera: Optional[DistortedCamera] = None
def run(self):
prev_time = time.perf_counter()
while self.is_running.is_set():
self.tick()
# 1) poll & update
self.loop_receive()
# 2) render
self.loop_render()
# 3) calculate latency for desired FPS
now = time.perf_counter()
time_diff = (now - prev_time)
if time_diff < 1/self.FPS:
# print(f"sleep {1/self.FPS - time_diff}")
time.sleep(1/self.FPS - time_diff)
now += 1/self.FPS - time_diff
prev_time = now
def loop_receive(self):
# 1) receive predictions
try:
prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in prediction_frame.tracks.items():
proj_track = ProjectedTrack(track, prediction_frame.camera)
self.scenarios[track_id].add_prediction(proj_track)
except zmq.ZMQError as e:
self.logger.debug(f'reuse prediction')
# 2) receive tracker tracks
try:
trajectory_frame: Frame = self.trajectory_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in trajectory_frame.tracks.items():
proj_track = ProjectedTrack(track, trajectory_frame.camera)
# if not self.scenarios[track_id].camera:
# self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera!
self.scenarios[track_id].set_track(proj_track)
except zmq.ZMQError as e:
self.logger.debug(f'reuse tracks')
# 3) Remove stale tracks
for track_id, scenario in list(self.scenarios.items()):
# check when last tracker update was received
scenario.check_lost()
if scenario.lost_factor() > 1:
self.logger.info(f"rm track {track_id}")
del self.scenarios[track_id]
def loop_render(self):
lines = RenderableLines([])
for track_id, scenario in self.scenarios.items():
scenario.update_drawn_positions()
lines.append_lines(scenario.to_renderable_lines())
# print(lines)
# rl = RenderableLines(lines)
# with open('/tmp/lines.pcl', 'wb') as fp:
# pickle.dump(rl, fp)
rl = lines.as_simplified(SimplifyMethod.RDP, .003) # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines.lines))
self.counter.set("stage.points_orig", lines.point_count())
self.counter.set("stage.points", rl.point_count())
# print(rl.__dict__)
self.stage_sock.send_json(obj=rl, cls=DataclassJSONEncoder)
# print(json.dumps(rl, cls=DataclassJSONEncoder))
@classmethod
def arg_parser(cls) -> ArgumentParser:
argparser = ArgumentParser()
argparser.add_argument('--zmq-trajectory-addr',
help='Manually specity communication addr for the trajectory messages',
type=str,
default="ipc:///tmp/feeds_traj")
argparser.add_argument('--zmq-prediction-addr',
help='Manually specity communication addr for the prediction messages',
type=str,
default="ipc:///tmp/feeds_preds")
argparser.add_argument('--zmq-stage-addr',
help='Manually specity communication addr for the stage messages (the rendered lines)',
type=str,
default="tcp://0.0.0.0:99174")
return argparser
# TODO place somewhere else:
# Gemma3:27b prompt: "python. Given a list of coordinates, that describes a line: `drawable_points: List[Tuple[float,float]]` apply perlin noise over the normal of the line, that changes over time `dt`."
def apply_perlin_noise_to_line_normal(drawable_points: np.ndarray, dt: float, amplitude: float = 1.0, frequency: float = 1.0, fade_over_n_points = 8) -> np.ndarray:
"""
Applies Perlin noise to the normals of a line described by a list of coordinates, changing over time.
Args:
drawable_points: A list of (x, y) tuples representing the points of the line.
dt: The time delta, used to animate the Perlin noise.
amplitude: The strength of the Perlin noise effect.
frequency: The frequency of the Perlin noise (how many waves per unit).
Returns:
A new list of (x, y) tuples representing the line with Perlin noise applied to the normals. If drawable_points
has fewer than 2 points, it returns the original list unchanged.
Raises:
TypeError: If drawable_points is not a list or dt is not a float.
ValueError: If the input points are not tuples of length 2.
"""
# if not isinstance(drawable_points, list):
# print(drawable_points, type(drawable_points))
# raise TypeError("drawable_points must be a list.")
if not isinstance(dt, float):
raise TypeError("dt must be a float.")
if len(drawable_points) < 2:
return drawable_points # Nothing to do with fewer than 2 points
# for point in drawable_points:
# if not isinstance(point, tuple) or len(point) != 2:
# raise ValueError("Each point in drawable_points must be a tuple of length 2.")
# noise = PerlinNoise(octaves=4) # You can adjust octaves for different noise patterns
new_points = []
for i in range(len(drawable_points)):
x, y = drawable_points[i]
# Calculate the normal vector. We'll approximate it using the previous and next points.
if i == 0:
# For the first point, use the next point to estimate the normal
next_x, next_y = drawable_points[i + 1]
normal_x = next_y - y
normal_y = -(next_x - x)
elif i == len(drawable_points) - 1:
# For the last point, use the previous point
prev_x, prev_y = drawable_points[i - 1]
normal_x = y - prev_y
normal_y = -(x - prev_x)
else:
prev_x, prev_y = drawable_points[i - 1]
next_x, next_y = drawable_points[i + 1]
normal_x = next_y - prev_y
normal_y = -(next_x - prev_x)
# Normalize the normal vector
norm = np.sqrt(normal_x**2 + normal_y**2)
if norm > 0:
normal_x /= norm
normal_y /= norm
# Apply Perlin noise to the normal
# noise_x = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_x
# noise_y = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_y
noise = snoise2(i * frequency, dt % 1000, octaves=4)
use_amp = amplitude
if fade_over_n_points > 0:
rev_step = len(drawable_points) - i
amp_factor = rev_step / fade_over_n_points
if amp_factor < 1:
use_amp *= amp_factor
noise_x = noise * use_amp * normal_x
noise_y = noise * use_amp * normal_y
# print(noise_x, noise_y, dt, frequency, i, dt, snoise2(i * frequency, dt % 1000, octaves=4))
# Add the noise to the point's coordinates
new_x = x + noise_x
new_y = y + noise_y
new_points.append((new_x, new_y))
# print(drawable_points, new_points)
return np.array(new_points)
import math
def distance(p1, p2):
return math.hypot(p2[0] - p1[0], p2[1] - p1[1])
def dashed_line(line: LineString, dash_len: float, gap_len: float, offset: float = 0) -> MultiLineString:
total_length = line.length
segments = []
pos = offset % (dash_len + gap_len)
if pos > gap_len:
segments.append(substring(line, 0, pos - gap_len))
while pos < total_length:
end = min(pos + dash_len, total_length)
if pos < end:
dash = substring(line, pos, end)
segments.append(dash)
pos += dash_len + gap_len
return MultiLineString(segments)