Animating diff lines

This commit is contained in:
Ruben van de Ven 2025-05-15 18:53:43 +02:00
parent 4415e2dcb6
commit 8ea57efc9f
5 changed files with 313 additions and 69 deletions

View file

@ -315,6 +315,18 @@ class Detection:
def to_ltrb(self):
return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
# Proxy'd Track, which caches projected history
class ProjectedTrack(object):
def __init__(self, track: Track, camera: Camera):
self._track = track
self.camera = camera # keep to wrap other calls
self.projected_history = track.get_projected_history(camera=camera)
# TODO wrap functions of Track()
def __getattr__(self, attr):
return getattr(self._track, attr)
@dataclass
class Track:
@ -331,10 +343,13 @@ class Track:
lost: bool = False
created_at: Optional[float] = None
frame_index: int = 0
updated_at: Optional[float] = None
def __post_init__(self):
if not self.created_at:
self.created_at = time.time()
if not self.updated_at:
self.update_at = time.time()
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[DistortedCamera]= None) -> np.array:
foot_coordinates = [d.get_foot_coords() for d in self.history]
@ -394,7 +409,8 @@ class Track:
self.source,
self.lost,
self.created_at,
self.frame_index)
self.frame_index,
self.updated_at)
def is_complete(self):
diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])]
@ -417,7 +433,8 @@ class Track:
self.source,
self.lost,
self.created_at,
self.frame_index)
self.frame_index,
self.updated_at)
def get_simplified_history(self, distance: float, camera: Camera) -> list[tuple[float, float]]:
# TODO)) Simplify to get a point every n-th meter
@ -622,6 +639,8 @@ class DataclassJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, np.ndarray):
return o.tolist()
# if isinstance(o, np.float32):
# return "float32!{o}"
if dataclasses.is_dataclass(o):
if isinstance(o, Frame):
tracks = {}

View file

@ -4,6 +4,7 @@ from dataclasses import dataclass
from enum import Enum
import math
from typing import List, Tuple
import numpy as np
from simplification.cutil import simplify_coords_idx, simplify_coords_vw_idx
@ -31,6 +32,14 @@ class RenderablePoint():
position: RenderablePosition
color: SrgbaColor
def __post_init__(self):
if type(self.position) is np.ndarray:
# convert if wrong type, so it can be serialised
# print('convert')
self.position = tuple(self.position.tolist())
# self.position = (float(self.position[0]), float(self.position[0]))
# pass
@classmethod
def from_list(cls, l: List[float, float], color: SrgbaColor) -> RenderablePoint:
return cls([float(l[0]), float(l[1])], color)
@ -65,6 +74,14 @@ class RenderableLines():
return RenderableLines(
[line.as_simplified(method) for line in self.lines]
)
def append(self, rl: RenderableLine):
self.lines.append(rl)
def append_lines(self, rls: RenderableLines):
self.lines.extend(rls.lines)
# def merge(self, rl: RenderableLines):

View file

@ -9,6 +9,7 @@ import pickle
import time
from typing import Dict, List, Optional, Tuple
import numpy as np
from shapely import line_locate_point
from statemachine import Event, State, StateMachine
from statemachine.exceptions import TransitionNotAllowed
import zmq
@ -16,17 +17,135 @@ import zmq
from sgan.sgan import data
from trap import shapes
from trap.base import Camera, DataclassJSONEncoder, DistortedCamera, Frame, Track
from trap.base import Camera, DataclassJSONEncoder, DistortedCamera, Frame, ProjectedTrack, Track
from trap.counter import CounterSender
from trap.laser_renderer import circle_points, rotateMatrix
from trap.lines import RenderableLine, RenderableLines, RenderablePoint, SrgbaColor, circle_arc
from trap.lines import RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SrgbaColor, circle_arc
from trap.node import Node
from trap.timer import Timer
from trap.utils import exponentialDecay, relativePointToPolar, relativePolarToPoint
from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointToPolar, relativePolarToPoint
logger = logging.getLogger('trap.stage')
Coordinate = Tuple[float, float]
# current_fraction = line_locate_point(new_line_string, Point(old_ls.coords[-1]), normalized=True)
# new_fraction = current_fraction + stepsize
# grown_string = shapely.ops.substring(new_line_string, 0, new_fraction, normalized=True)
class ProceduralChain():
link_size = .1 # 10cm
# angle_constraint = 5
def __init__(self, first_joint: Coordinate, auto_grow_from: Optional[Coordinate]):
self.joints: List[Coordinate] = []
self.auto_grow_from = auto_grow_from
pass
def move(self, position):
#
pass
if self.auto_grow_from is not None:
pass
# if distance self.joints[-1] - self.auto_grow_from > link_size:
self.joints.append(self.auto_grow_from)
def arrived(self, point: Coordinate):
[j == point for j in self.joints]
return False
DeltaT = float # delta_t
class DiffSegment():
DRAW_DECAY_SPEED = 5
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._last_diff_frame_idx = 0
self.finished = False
self.points: List[Coordinate] = []
self._drawn_points = []
def finish(self):
self.finished = True
# run on each track update received
def update_track(self, track: ProjectedTrack):
# migrate SceneraioScene function
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back:
logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
else:
trajectory = track.projected_history
# from start to as far as it gets
trajectory_range = trajectory[-1*traj_diff_steps_back:]
prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
line = []
for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)):
offset_from_start = (pred_diff_steps_forward + i)
if offset_from_start % 4 == 0:
self.points.extend([p1, p2])
self._last_diff_frame_idx = track.frame_index
# pass
# # rewrite:
# if not self.finished:
# # build the coordinates for the line
# pass
# if not self.chain.arrived(self.points[-1]):
# pass # wait for drawing to complete
# else:
# # Move head towards person stats
# pass
# run each render tick
def update_drawn_positions(self, dt: DeltaT):
if len(self.points) == 0:
# nothing to draw yet
return
# self._drawn_points = self.points
if len(self._drawn_points) == 0:
# create origin
self._drawn_points.append(self.points[0])
# and drawing head
self._drawn_points.append(self.points[0])
idx = len(self._drawn_points) - 1
target = self.points[idx]
if np.isclose(self._drawn_points[-1], target, atol=.05).all():
# TODO: might want to migrate to np.isclose()
if len(self._drawn_points) == len(self.points):
return # done until a new point is added
# add new point as drawing head
self._drawn_points.append(self._drawn_points[-1])
x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.DRAW_DECAY_SPEED, dt, .05)
y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.DRAW_DECAY_SPEED, dt, .05)
self._drawn_points[-1] = (float(x), float(y))
def as_renderable(self) -> RenderableLines:
# lines = []
color = SrgbaColor(0,0,1,1)
points = [RenderablePoint(p, color) for p in self._drawn_points]
lines = [RenderableLine(points)]
return RenderableLines(lines)
class ScenarioScene(Enum):
DETECTED = 1
FIRST_PREDICTION = 2
@ -68,80 +187,132 @@ class TrackScenario(StateMachine):
receive_prediction = detected.to(first_prediction) | substantial.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing")
def __init__(self):
self._track = None
self.track: ProjectedTrack = None
self.camera: Optional[Camera] = None
# self.first_prediction_track: Optional[Track] = None
# self.prediction_track: Optional[Track] = None
self._predictions: List[Track] = []
self.predictions: List[Track] = []
self._last_diff_frame_idx: Optional[int] = 0
self.diffs: List[Tuple[Coordinate, Coordinate]] = []
self.prediction_diffs: List[DiffSegment] = []
super().__init__()
def track_is_long(self, track: Track):
def track_is_long(self, track: ProjectedTrack):
return len(track.history) > 20
def track_is_lost(self, track: Track):
def track_is_lost(self, track: ProjectedTrack):
# return self._track and self._track.created_at < time.time() - 5
return track.lost # Note, for now this is not implemented in the tacker, see check_lost()
def track_is_loitering(self, track: Track):
def track_is_loitering(self, track: ProjectedTrack):
# TODO)) Change to measure displacement over the last n seconds
return len(track.history) > (track.fps * 60) # seconds after which someone is loitering
def prediction_is_stale(self, track: Track):
def prediction_is_stale(self, track: ProjectedTrack):
# TODO use displacement instead of time
return bool(len(self._predictions) and self._predictions[-1].created_at < (time.time() - 2))
return bool(len(self.predictions) and self.predictions[-1].created_at < (time.time() - 2))
def prediction_is_playing(self, Track):
def prediction_is_playing(self, track):
return False
def check_lost(self):
if self.current_state is not self.lost and self._track and self._track.created_at < time.time() - 5:
if self.current_state is not self.lost and self.track and self.track.created_at < time.time() - 5:
self.mark_lost()
def set_track(self, track: Track):
if self._track and self._track.created_at > track.created_at:
def set_track(self, track: ProjectedTrack):
if self.track and self.track.created_at > track.created_at:
# ignore old track
return
self._track = track
self.track = track
self.update_prediction_diff()
# check to change state
try:
self.receive_track(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def add_prediction(self, track: Track):
if not self._track:
def update_prediction_diff(self):
"""
gather the diffs of the trajectory with the most recent prediction
"""
if len(self.prediction_diffs) == 0:
return
self.prediction_diffs[-1].update_track(self.track)
# ptrack = self.predictions[-1]
# start_frame_idx = max(ptrack.frame_index, self._last_diff_frame_idx)
# traj_diff_steps_back = self.track.frame_index - start_frame_idx # positive value
# pred_diff_steps_forward = start_frame_idx - ptrack.frame_index # positive value
# if traj_diff_steps_back < 0 or len(self.track.history) < traj_diff_steps_back:
# logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# # elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# # logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
# else:
# trajectory = self.track.get_projected_history(camera=self.camera)
# # from start to as far as it gets
# trajectory_range = trajectory[-1*traj_diff_steps_back:]
# prediction_range = ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
# line = []
# for p1, p2 in zip(trajectory_range[::4], prediction_range[::4]):
# self.diffs.append((p1, p2))
# # print(f"Diff for {self.track.frame_index}")
# # print(f"Start at {start_frame_idx=}, which is {traj_diff_steps_back} steps back and {pred_diff_steps_forward}steps forward")
# self._last_diff_frame_idx = self.track.frame_index
def add_prediction(self, track: ProjectedTrack):
if not self.track:
# in case of the unlikely event that prediction was passed sooner
self.set_track(track)
# if not self.first_prediction_track:
# self.first_prediction_track = track
if PREDICTION_INTERVAL is not None and len(self._predictions) and (track.frame_index - self._predictions[-1].frame_index) < PREDICTION_INTERVAL:
if PREDICTION_INTERVAL is not None and len(self.predictions) and (track.frame_index - self.predictions[-1].frame_index) < PREDICTION_INTERVAL:
# just drop tracks if the predictions come to quick
return
self._predictions.append(track)
self.predictions.append(track)
if len(self.prediction_diffs):
self.prediction_diffs[-1].finish() # existing diffing can end
# and create a new one
self.prediction_diffs.append(DiffSegment(track))
# check to change state
try:
self.receive_prediction(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def after_receive_track(self, track: Track):
print('change state')
def after_receive_track(self, track: ProjectedTrack):
print('changed state')
def on_receive_track(self, track: Track):
# on event, because it happens for every receive, despite transition
def on_receive_track(self, track: ProjectedTrack):
# on event, only runs once, upon first track
print('updating track!')
# self.track = track
def on_receive_prediction(self, track: Track):
def on_receive_prediction(self, track: ProjectedTrack):
# on event, because it happens for every receive, despite transition
print('updating prediction!')
# self.track = track
def after_receive_prediction(self, track: Track):
def after_receive_prediction(self, track: ProjectedTrack):
# after
pass
# self.prediction_track = track
@ -198,9 +369,9 @@ class DrawnScenario(TrackScenario):
# self.track_id = track_id
self.last_update_t = time.perf_counter()
self.drawn_positions: List[Tuple[float,float]] = []
self.drawn_pred_history: List[Tuple[float,float]] = []
self.drawn_predictions: List[List[Tuple[float,float]]] = []
self.drawn_positions: List[Coordinate] = []
self.drawn_pred_history: List[Coordinate] = []
self.drawn_predictions: List[List[Coordinate]] = []
self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = []
@ -222,13 +393,17 @@ class DrawnScenario(TrackScenario):
# 0. calculate dt
# if dt is None:
t = time.perf_counter()
dt = t - self.last_update_t
dt: DeltaT = t - self.last_update_t
self.last_update_t = t
for diff in self.prediction_diffs:
diff.update_drawn_positions(dt)
# 1. track history, direct update
MAX_HISTORY = 80
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
self.drawn_positions = self._track.get_projected_history(None, self.camera)
self.drawn_positions = self.track.projected_history
# self.drawn_positions = self.track.get_projected_history(None, self.camera)
# TODO)) Limit history to N points, or N lenght
# for i, pos in enumerate(self.drawn_positions):
# self.drawn_positions[i][0] = positions[i][0]
@ -250,34 +425,44 @@ class DrawnScenario(TrackScenario):
# 3. predictions
self.drawn_predictions = []
self.drawn_diffs = []
for a, (ptrack, next_ptrack) in enumerate(zip(self._predictions, [*self._predictions[1:], None])):
for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])):
prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track
if next_ptrack is not None:
# not the last one, cut off
next_ptrack: Track = self._predictions[a+1]
next_ptrack: ProjectedTrack = self.predictions[a+1]
end_step = next_ptrack.frame_index - ptrack.frame_index
# diff
diff_steps_back = ptrack.frame_index - self._track.frame_index
if len(self.drawn_positions) < -1*diff_steps_back:
logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
pass
else:
# trajectory_range = self.camera.[d.get_foot_coords() for d in trajectory_det_range] # in frame coordinate space
trajectory_range = self.drawn_positions[diff_steps_back:diff_steps_back+end_step]
prediction_range = ptrack.predictions[0][:end_step] # in world coordinate space
line = []
for p1, p2 in zip(trajectory_range[::4], prediction_range[::4]):
line.extend([
p1, p2
])
if len(line):
self.drawn_diffs.append(line)
else:
end_step = None # not last item; show all
self.drawn_predictions.append(ptrack.predictions[0][:end_step])
# # diff
# diff_steps_back = ptrack.frame_index - self.track.frame_index
# if len(self.drawn_positions) < -1*diff_steps_back:
# logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# pass
# else:
# # trajectory_range = self.camera.[d.get_foot_coords() for d in trajectory_det_range] # in frame coordinate space
# trajectory_range = self.drawn_positions[diff_steps_back:diff_steps_back+end_step]
# prediction_range = ptrack.predictions[0][:end_step] # in world coordinate space
# line = []
# for p1, p2 in zip(trajectory_range[::4], prediction_range[::4]):
# line.extend([
# p1, p2
# ])
# if len(line):
# self.drawn_diffs.append(line)
diff_line = []
for p1, p2 in self.diffs[::4]:
diff_line.extend([p1, p2])
# self.drawn_diffs.append(diff_line)
self.drawn_diffs = [diff_line]
# Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s
@ -320,26 +505,28 @@ class DrawnScenario(TrackScenario):
def to_renderable_lines(self) -> RenderableLines:
t = time.time()
track_age = t - self._track.created_at
lines: List[RenderableLine] = []
track_age = t - self.track.created_at
lines = RenderableLines([])
drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
# track_age_in_frames = int(track_age * TRACK_FADE_ASSUME_FPS)
# track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames
# 1. Trajectory history
drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points))
# 2. Position Marker
anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout
lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color))
# 3. Predictions
if len(self.drawn_predictions):
color = SrgbaColor(0.,1,0.,1.-self.lost_factor())
prediction_track_age = time.time() - self._predictions[0].created_at
prediction_track_age = time.time() - self.predictions[0].created_at
t_factor = prediction_track_age / PREDICTION_FADE_IN
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
for drawn_prediction in self.drawn_predictions:
@ -362,11 +549,16 @@ class DrawnScenario(TrackScenario):
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])]
points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction, colors)]
lines.append(RenderableLine(points))
for drawn_diff in self.drawn_diffs:
color = SrgbaColor(0.,1,1.,1.-self.lost_factor())
colors = [color.as_faded(1) for a2 in range(len(drawn_diff))]
points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)]
lines.append(RenderableLine(points))
# 4. Diffs
# for drawn_diff in self.drawn_diffs:
# color = SrgbaColor(0.,1,1.,1.-self.lost_factor())
# colors = [color.as_faded(1) for a2 in range(len(drawn_diff))]
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)]
# lines.append(RenderableLine(points))
for diff in self.prediction_diffs:
lines.append_lines(diff.as_renderable())
# # print(self.current_state)
# if self.current_state is self.first_prediction or self.current_state is self.corrected_prediction:
@ -516,7 +708,8 @@ class Stage(Node):
try:
prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in prediction_frame.tracks.items():
self.scenarios[track_id].add_prediction(track)
proj_track = ProjectedTrack(track, prediction_frame.camera)
self.scenarios[track_id].add_prediction(proj_track)
except zmq.ZMQError as e:
self.logger.debug(f'reuse prediction')
@ -524,8 +717,10 @@ class Stage(Node):
try:
trajectory_frame: Frame = self.trajectory_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in trajectory_frame.tracks.items():
self.scenarios[track_id].set_track(track)
self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera!
proj_track = ProjectedTrack(track, trajectory_frame.camera)
# if not self.scenarios[track_id].camera:
# self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera!
self.scenarios[track_id].set_track(proj_track)
except zmq.ZMQError as e:
self.logger.debug(f'reuse tracks')
@ -539,18 +734,21 @@ class Stage(Node):
del self.scenarios[track_id]
def loop_render(self):
lines: RenderableLine = []
lines = RenderableLines([])
for track_id, scenario in self.scenarios.items():
scenario.update_drawn_positions()
lines.extend(scenario.to_renderable_lines())
lines.append_lines(scenario.to_renderable_lines())
# print(lines)
rl = RenderableLines(lines)
# rl = RenderableLines(lines)
# with open('/tmp/lines.pcl', 'wb') as fp:
# pickle.dump(rl, fp)
rl = rl.as_simplified() # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines))
rl = lines.as_simplified() # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines.lines))
# print(rl.__dict__)
self.stage_sock.send_json(rl, cls=DataclassJSONEncoder)
# print(json.dumps(rl, cls=DataclassJSONEncoder))

View file

@ -466,6 +466,7 @@ class Tracker:
track.track_id = detection.track_id # for new tracks
track.fps = frame.camera.fps
track.frame_index = frame.index
track.updated_at = time.time()
# track.fps = self.config.camera.fps # for new tracks
track.history.append(detection) # add to history

View file

@ -31,6 +31,15 @@ def inv_lerp(a: float, b: float, v: float) -> float:
def exponentialDecayRounded(a, b, decay, dt, abs_tolerance):
"""Exponential decay as alternative to Lerp
Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ
"""
c = b + (a-b) * math.exp(-decay * dt)
if abs(b-c) < abs_tolerance:
return b
return c
def exponentialDecay(a, b, decay, dt):
"""Exponential decay as alternative to Lerp
Introduced by Freya Holmér: https://www.youtube.com/watch?v=LSNQuFEDOyQ