anomaly animation

This commit is contained in:
Ruben van de Ven 2025-05-16 12:57:08 +02:00
parent 68e785f0fa
commit ad7328f7c0

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
@ -8,8 +9,9 @@ import logging
import pickle import pickle
import time import time
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from matplotlib.pyplot import isinteractive
import numpy as np import numpy as np
from shapely import line_locate_point from shapely import LineString, line_locate_point, linestrings
from statemachine import Event, State, StateMachine from statemachine import Event, State, StateMachine
from statemachine.exceptions import TransitionNotAllowed from statemachine.exceptions import TransitionNotAllowed
import zmq import zmq
@ -29,89 +31,33 @@ from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointT
logger = logging.getLogger('trap.stage') logger = logging.getLogger('trap.stage')
Coordinate = Tuple[float, float] Coordinate = Tuple[float, float]
DeltaT = float # delta_t in seconds
# current_fraction = line_locate_point(new_line_string, Point(old_ls.coords[-1]), normalized=True) # current_fraction = line_locate_point(new_line_string, Point(old_ls.coords[-1]), normalized=True)
# new_fraction = current_fraction + stepsize # new_fraction = current_fraction + stepsize
# grown_string = shapely.ops.substring(new_line_string, 0, new_fraction, normalized=True) # grown_string = shapely.ops.substring(new_line_string, 0, new_fraction, normalized=True)
class ProceduralChain(): class LineGenerator(ABC):
link_size = .1 # 10cm @abstractmethod
# angle_constraint = 5 def update_drawn_positions(self, dt: DeltaT):
def __init__(self, first_joint: Coordinate, auto_grow_from: Optional[Coordinate]):
self.joints: List[Coordinate] = []
self.auto_grow_from = auto_grow_from
pass pass
def move(self, position): def as_renderable(self, color: SrgbaColor) -> RenderableLines:
# points = [RenderablePoint(p, color) for p in self._drawn_points]
pass lines = [RenderableLine(points)]
return RenderableLines(lines)
if self.auto_grow_from is not None: class AppendableLine(LineGenerator):
pass """
# if distance self.joints[-1] - self.auto_grow_from > link_size: A line generator that allows for points to be added over time.
self.joints.append(self.auto_grow_from) Simply use `line.points.extend([p1, p2])`
"""
def arrived(self, point: Coordinate): def __init__(self, points: Optional[List[Coordinate]] = None, draw_decay_speed = 25.):
[j == point for j in self.joints] self.points: List[Coordinate] = points if points is not None else [] # when providing [] as default, it messes up instancing by reusing the same list
return False
DeltaT = float # delta_t
class DiffSegment():
DRAW_DECAY_SPEED = 25
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._last_diff_frame_idx = 0
self.finished = False
self.ready = False
self.points: List[Coordinate] = []
self._drawn_points = [] self._drawn_points = []
self.ready = len(self.points) == 0
self.draw_decay_speed = draw_decay_speed
def finish(self):
self.finished = True
# run on each track update received
def update_track(self, track: ProjectedTrack):
# migrate SceneraioScene function
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back:
logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
else:
trajectory = track.projected_history
# from start to as far as it gets
trajectory_range = trajectory[-1*traj_diff_steps_back:]
prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
line = []
for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)):
offset_from_start = (pred_diff_steps_forward + i)
if offset_from_start % 4 == 0:
self.points.extend([p1, p2])
self._last_diff_frame_idx = track.frame_index
# pass
# # rewrite:
# if not self.finished:
# # build the coordinates for the line
# pass
# if not self.chain.arrived(self.points[-1]):
# pass # wait for drawing to complete
# else:
# # Move head towards person stats
# pass
# run each render tick
def update_drawn_positions(self, dt: DeltaT): def update_drawn_positions(self, dt: DeltaT):
if len(self.points) == 0: if len(self.points) == 0:
# nothing to draw yet # nothing to draw yet
@ -137,16 +83,172 @@ class DiffSegment():
self._drawn_points.append(self._drawn_points[-1]) self._drawn_points.append(self._drawn_points[-1])
self.ready = False self.ready = False
x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.DRAW_DECAY_SPEED, dt, .05) x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.draw_decay_speed, dt, .05)
y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.DRAW_DECAY_SPEED, dt, .05) y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.draw_decay_speed, dt, .05)
self._drawn_points[-1] = (float(x), float(y)) self._drawn_points[-1] = (float(x), float(y))
class ProceduralChain(LineGenerator):
MOVE_DECAY_SPEED = 50
link_size = .1 # 10cm
# angle_constraint = 5
def __init__(self, joints: List[Coordinate]):
self.joints: List[Coordinate] = joints
self.target: Coordinate = joints[-1]
self.ready = False
self.move_decay_speed = self.MOVE_DECAY_SPEED
@classmethod
def from_appendable_line(cls, al: AppendableLine) -> ProceduralChain:
# TODO: create more segments:
# last added points becomes the head of the chain
points = list(reversed(al.points))
linestring = LineString(points)
linestring = linestring.segmentize(cls.link_size)
joints = list(linestring.coords)
return cls(joints)
def update_drawn_positions(self, dt: DeltaT):
if self.ready:
return
# direction = np.array(self.joints[-1] - self.target)
# TODO: check self.joints empty, and stop then
x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05)
y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05)
self.joints[0] = (float(x), float(y))
for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1):
diff = np.array(prev_joint) - np.array(joint)
direction = diff / np.linalg.norm(diff)
self.joints[i] = prev_joint - direction * self.link_size
if np.isclose(self.joints[0], self.target, atol=.05).all():
# self.ready = True
self.joints.pop(0)
if len(self.joints) == 0:
self.ready = True
self._drawn_points = self.joints
# void resolve(PVector pos) {
# angles.set(0, PVector.sub(pos, joints.get(0)).heading());
# joints.set(0, pos);
# for (int i = 1; i < joints.size(); i++) {
# float curAngle = PVector.sub(joints.get(i - 1), joints.get(i)).heading();
# angles.set(i, constrainAngle(curAngle, angles.get(i - 1), angleConstraint));
# joints.set(i, PVector.sub(joints.get(i - 1), PVector.fromAngle(angles.get(i)).setMag(linkSize)));
# }
# }
class DiffSegment():
DRAW_DECAY_SPEED = 25
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._last_diff_frame_idx = 0
self.finished = False
self.line = AppendableLine( draw_decay_speed=self.DRAW_DECAY_SPEED)
self.points: List[Coordinate] = []
self._drawn_points = []
self._target_track = prediction
def finish(self):
self.finished = True
# run on each track update received
def update_track(self, track: ProjectedTrack):
self._target_track = track
if self.finished:
# don't add new points if finished
return
# migrate SceneraioScene function
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back:
logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
else:
trajectory = track.projected_history
# from start to as far as it gets
trajectory_range = trajectory[-1*traj_diff_steps_back:]
prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
line = []
for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)):
offset_from_start = (pred_diff_steps_forward + i)
if offset_from_start % 4 == 0:
self.line.points.extend([p1, p2])
self.points.extend([p1, p2])
self._last_diff_frame_idx = track.frame_index
# run each render tick
def update_drawn_positions(self, dt: DeltaT):
if isinstance(self.line, AppendableLine):
if self.finished and self.line.ready:
# convert when fully drawn
# print(self, "CONVERT LINE")
self.line = ProceduralChain.from_appendable_line(self.line)
if isinstance(self.line, ProceduralChain):
self.line.target = self._target_track.projected_history[-1]
# if len(self.points) == 0:
# # nothing to draw yet
# return
# # self._drawn_points = self.points
# if len(self._drawn_points) == 0:
# # create origin
# self._drawn_points.append(self.points[0])
# # and drawing head
# self._drawn_points.append(self.points[0])
# idx = len(self._drawn_points) - 1
# target = self.points[idx]
# if np.isclose(self._drawn_points[-1], target, atol=.05).all():
# # TODO: might want to migrate to np.isclose()
# if len(self._drawn_points) == len(self.points):
# self.ready = True
# return # done until a new point is added
# # add new point as drawing head
# self._drawn_points.append(self._drawn_points[-1])
# self.ready = False
# x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.DRAW_DECAY_SPEED, dt, .05)
# y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.DRAW_DECAY_SPEED, dt, .05)
# self._drawn_points[-1] = (float(x), float(y))
# if not self.finished or not self.line.ready:
self.line.update_drawn_positions(dt)
def as_renderable(self) -> RenderableLines: def as_renderable(self) -> RenderableLines:
# lines = []
color = SrgbaColor(0,0,1,1) color = SrgbaColor(0,0,1,1)
points = [RenderablePoint(p, color) for p in self._drawn_points] # lines = []
lines = [RenderableLine(points)] # points = [RenderablePoint(p, color) for p in self._drawn_points]
return RenderableLines(lines) # lines = [RenderableLine(points)]
# return RenderableLines(lines)
if not self.finished or not self.line.ready:
return self.line.as_renderable(color)
return self.line.as_renderable(color)
# points = [RenderablePoint(p, color) for p in self._drawn_points]
# lines = [RenderableLine(points)]
return RenderableLines([])
class ScenarioScene(Enum): class ScenarioScene(Enum):
@ -247,34 +349,9 @@ class TrackScenario(StateMachine):
if len(self.prediction_diffs) == 0: if len(self.prediction_diffs) == 0:
return return
self.prediction_diffs[-1].update_track(self.track) for diff in self.prediction_diffs:
diff.update_track(self.track)
# ptrack = self.predictions[-1]
# start_frame_idx = max(ptrack.frame_index, self._last_diff_frame_idx)
# traj_diff_steps_back = self.track.frame_index - start_frame_idx # positive value
# pred_diff_steps_forward = start_frame_idx - ptrack.frame_index # positive value
# if traj_diff_steps_back < 0 or len(self.track.history) < traj_diff_steps_back:
# logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# # elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# # logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
# else:
# trajectory = self.track.get_projected_history(camera=self.camera)
# # from start to as far as it gets
# trajectory_range = trajectory[-1*traj_diff_steps_back:]
# prediction_range = ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
# line = []
# for p1, p2 in zip(trajectory_range[::4], prediction_range[::4]):
# self.diffs.append((p1, p2))
# # print(f"Diff for {self.track.frame_index}")
# # print(f"Start at {start_frame_idx=}, which is {traj_diff_steps_back} steps back and {pred_diff_steps_forward}steps forward")
# self._last_diff_frame_idx = self.track.frame_index
def add_prediction(self, track: ProjectedTrack): def add_prediction(self, track: ProjectedTrack):
@ -379,7 +456,7 @@ class DrawnScenario(TrackScenario):
self.drawn_text = "" self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = [] self.drawn_text_lines: List[RenderableLine] = []
self.anomly_score = .3 # TODO: variable self.anomly_score = 0 # TODO: variable
super().__init__() super().__init__()
def update_drawn_positions(self) -> List: def update_drawn_positions(self) -> List:
@ -524,7 +601,13 @@ class DrawnScenario(TrackScenario):
# 2. Position Marker # 2. Position Marker
anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout
lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color)) # lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color))
lines.append(circle_arc(
self.drawn_positions[-1][0], self.drawn_positions[-1][1],
max(.1, self.anomly_score * 2),
0, 1,
anomaly_marker_color)
)
# 3. Predictions # 3. Predictions
if len(self.drawn_predictions): if len(self.drawn_predictions):