More animations for predictions

This commit is contained in:
Ruben van de Ven 2025-10-29 12:05:18 +01:00
parent e6d1457320
commit afe5accb9c
2 changed files with 223 additions and 39 deletions

View file

@ -4,12 +4,14 @@ from abc import ABC, abstractmethod
import copy
from dataclasses import dataclass
from enum import Enum, IntEnum
from functools import partial
import math
from pathlib import Path
import time
from typing import Dict, List, Optional, Tuple
from typing import Callable, Dict, List, NamedTuple, Optional, Tuple, Type, TypeVar
import numpy as np
from pyparsing import Opt
import shapely
import shapely.ops
from simplification.cutil import simplify_coords_idx, simplify_coords_vw_idx
@ -117,6 +119,13 @@ class RenderableLine():
if not is_last:
points.append(RenderablePoint(line.coords[-1], color.as_faded(0)))
return RenderableLine(points)
@classmethod
def from_linestring(cls, ls: shapely.geometry.LineString, color: SrgbaColor) -> RenderableLine:
points: List[RenderablePoint] = []
for coord in ls.coords:
points.append(RenderablePoint(coord, color))
return RenderableLine(points)
@dataclass
@ -466,7 +475,7 @@ class LineAnimator(StaticLine):
raise RuntimeError("Not Implemented")
def is_ready(self):
return self.ready and self.target.is_ready()
return (self.ready or self.skip) and self.target.is_ready()
def start(self):
self.target.start()
@ -475,7 +484,7 @@ class LineAnimator(StaticLine):
return True
def running_for(self):
if self.start:
if self.start_t:
return time.time() - self.start_t
return 0.
@ -497,7 +506,7 @@ class AppendableLineAnimator(LineAnimator):
def apply(self, target_line, dt: DeltaT) -> RenderableLine:
if len(target_line) == 0:
# nothing to draw yet
return RenderableLine()
return RenderableLine([])
@ -563,6 +572,37 @@ class FadeOutLine(LineAnimator):
point.color = point.color.as_faded(self.alpha)
return target_line
class FadeOutJitterLine(LineAnimator):
"""
Fade the line providing an alpha, 1 by default
"""
def __init__(self, target_line = None, frequency=.7, t_factor = 1.0):
"""
To make shape static over time, set t_factor = 0
"""
super().__init__(target_line)
self.alpha = 1
self.frequency = frequency
self.t_factor = t_factor
self.ready = True # filter holds no state, so always ready
def set_alpha(self, alpha: float):
self.alpha = min(1, max(0, alpha))
def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine:
for i, point in enumerate(target_line.points):
noise = abs(snoise2(i * self.frequency, self.running_for() * self.t_factor % 1000, octaves=1))
alpha = np.clip(noise + 2 * self.alpha, 1, 2) - 1
point.color = point.color.as_faded(alpha)
return target_line
class CropLine(LineAnimator):
@ -739,16 +779,46 @@ class NoiseLine(LineAnimator):
return RenderableLine(points)
class SegmentLine(LineAnimator):
def __init__(self, target_line = None):
def __init__(self, target_line = None, length = 0.3, duration = 1., anim_f: Optional[Callable] = None):
super().__init__(target_line)
self.length = length
self.duration = duration
self.anim_f = anim_f or partial(self.anim_arrive, length=self.length)
@classmethod
def anim_arrive(cls, t: float, ls: shapely.geometry.LineString, length: float):
t = 1-t # reverse
start_pos = t * ls.length
end_pos = start_pos + length
return (start_pos, end_pos)
@classmethod
def anim_grow(cls, t: float, ls: shapely.geometry.LineString, reverse=False):
if reverse:
return (ls.length * t, ls.length)
else:
return (0, ls.length * t)
def apply(self, target_line: RenderableLine, dt: DeltaT):
if len(target_line) < 2:
self.ready = True
return target_line
i = self.running_for()
ls = target_line.as_linestring()
t = min(self.running_for()/self.duration, 1)
self.ready = t >= 1
start_pos, end_pos = self.anim_f(t, ls)
line = shapely.ops.substring(ls, start_pos, end_pos)
return RenderableLine.from_linestring(line, target_line.points[0].color)
return super().apply(target_line, dt)
class DashedLine(LineAnimator):
"""
@ -808,7 +878,8 @@ class DashedLine(LineAnimator):
multilinestring = self.dashed_line(ls, self.dash_len, self.gap_len, self.t_factor * self.running_for(), self.loop_offset)
self.ready = not bool(len(multilinestring.geoms))
# when looping, it is always ready, otherwise, only if totally gone
self.ready = self.loop_offset or not bool(len(multilinestring.geoms))
color = target_line.points[0].color
@ -864,4 +935,80 @@ class LineStringIncrementingDistanceOffset():
segment_length = line.coords[i+1].distance(line.coords[i])
start_at = cumulative_length
cumulative_length += float(segment_length)
yield (i, i+1), (start_at, cumulative_length)
yield (i, i+1), (start_at, cumulative_length)
L = TypeVar('U', bound=StaticLine)
class LineAnimationStack():
def __init__(self, line: StaticLine):
self.stack: Dict[Type[StaticLine], StaticLine] = {}
self.root = line
self.stack[line.__class__] = line
self.tail = line
def get(self, anim: Type[L]) -> L:
"""
Get item from the stack. E.g. `line.get(NoiseLine)`
"""
if anim in self.stack:
return self.stack[anim]
raise RuntimeError(f"Not in line stack {anim.__name__}")
@property
def last(self):
return self.tail
def add(self, line: LineAnimator):
if line.__class__ in self.stack:
raise RuntimeError(f"Cannot add twice {line.__class__.__name__}")
self.stack[line.__class__] = line
self.tail = line
def as_renderable_line(self, dt: DeltaT):
return self.tail.as_renderable_line(dt)
def start(self):
return self.tail.start()
def is_ready(self):
return self.tail.is_ready()
class LineAnimationSequenceStep(NamedTuple):
line: LineAnimator
ready_function: Callable
class LineAnimationSequence():
"""
WORK IN PROGRESS
"""
def __init__(self, target_line: StaticLine):
self.seq: List[LineAnimationSequenceStep] = []
self.ready = False
self.idx = 0
self.target = target_line
def steps(self):
for step in self.seq:
yield step
@classmethod
def ready_if_ready(cls, line: LineAnimator):
return line.is_ready()
@classmethod
def ready_after(cls, line: LineAnimator, duration: DeltaT):
return line.running_for() >= duration
def start(self):
self.start_at = time.time()
self.idx = 0
def as_renderable_line(self, dt: DeltaT):
return self.seq[self.idx].as_renderable_line(dt)

View file

@ -2,17 +2,18 @@ from argparse import ArgumentParser
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from functools import partial
import logging
import time
import threading
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Type, TypeVar
import zmq
from trap.anomaly import DiffSegment, calc_anomaly, calculate_loitering_scores
from trap.base import DataclassJSONEncoder, Frame, ProjectedTrack, Track
from trap.counter import CounterSender
from trap.lines import AppendableLine, AppendableLineAnimator, Coordinate, CropLine, DashedLine, DeltaT, FadeOutLine, FadedTailLine, NoiseLine, RenderableLayers, RenderableLine, RenderableLines, SimplifyMethod, SrgbaColor, StaticLine, load_lines_from_svg
from trap.lines import AppendableLine, AppendableLineAnimator, Coordinate, CropLine, DashedLine, DeltaT, FadeOutJitterLine, FadeOutLine, FadedTailLine, LineAnimationStack, LineAnimator, NoiseLine, RenderableLayers, RenderableLine, RenderableLines, SegmentLine, SimplifyMethod, SrgbaColor, StaticLine, load_lines_from_svg
from trap.node import Node
@ -27,7 +28,7 @@ OPTION_TRACK_NOISE = False
TRACK_ASSUMED_FPS = 12
TAKEOVER_FADEOUT = 3
LOST_FADEOUT = 3
LOST_FADEOUT = 2 # seconds
PREDICTION_INTERVAL: float|None = 20 # frames
PREDICTION_FADE_IN: float = 3
PREDICTION_FADE_SLOPE: float = -10
@ -238,6 +239,8 @@ class Scenario:
self.update_state()
class DrawnScenario(Scenario):
"""
@ -254,47 +257,81 @@ class DrawnScenario(Scenario):
self.last_update_t = time.perf_counter()
history_color = SrgbaColor(1.,0.,1.,1.)
self.line_history = StaticLine([], history_color)
self.line_history_smooth = AppendableLineAnimator(self.line_history, draw_decay_speed=25)
self.line_history_capped = CropLine(self.line_history_smooth, self.MAX_HISTORY)
self.line_history_disappearing = FadedTailLine(self.line_history_capped, TRACK_FADE_AFTER_DURATION * TRACK_ASSUMED_FPS, TRACK_END_FADE)
self.line_history_noisy = NoiseLine(self.line_history_disappearing, amplitude=0, t_factor=.3)
self.line_history_faded = FadeOutLine(self.line_history_noisy)
self.line_history_drawn = self.line_history_faded
history = StaticLine([], history_color)
self.line_history = LineAnimationStack(history)
self.line_history.add(AppendableLineAnimator(self.line_history.tail, draw_decay_speed=25))
self.line_history.add(CropLine(self.line_history.tail, self.MAX_HISTORY))
self.line_history.add(FadedTailLine(self.line_history.tail, TRACK_FADE_AFTER_DURATION * TRACK_ASSUMED_FPS, TRACK_END_FADE))
self.line_history.add(NoiseLine(self.line_history.tail, amplitude=0, t_factor=.3))
self.line_history.add(FadeOutJitterLine(self.line_history.tail, frequency=5, t_factor=.5))
self.active_ptrack: Optional[ProjectedTrack] = None
self.prediction_color = SrgbaColor(0,1,0,1)
self.line_prediction =StaticLine([], self.prediction_color)
self.line_prediction_dashed = DashedLine(self.line_prediction, t_factor=8)
self.line_prediction_dashed.skip = True
self.line_prediction_faded = FadeOutLine(self.line_prediction_dashed)
self.line_prediction_drawn = self.line_prediction_faded
self.line_prediction = LineAnimationStack(StaticLine([], self.prediction_color))
self.line_prediction.add(SegmentLine(self.line_prediction.tail, duration=.5))
self.line_prediction.add(DashedLine(self.line_prediction.tail, t_factor=4, loop_offset=True))
self.line_prediction.get(DashedLine).skip = True
self.line_prediction.add(FadeOutLine(self.line_prediction.tail))
# self.line_prediction_drawn = self.line_prediction_faded
def update(self):
super().update()
if self.track:
self.line_history.points = self.track.projected_history
self.line_history.root.points = self.track.projected_history
if len(self.prediction_tracks):
# TODO: only when animation is ready for it? or collect lines
if not self.active_ptrack:
self.active_ptrack = self.prediction_tracks[-1]
self.line_prediction.start() # reset positions
elif self.active_ptrack._track.updated_at < self.prediction_tracks[-1]._track.updated_at:
# logger.debug('pending change?', self.line_prediction_drawn.is_ready())
# switch only if ready
if self.line_prediction_drawn.is_ready():
# switch only if drawing animation is ready
if self.line_prediction.is_ready():
self.active_ptrack = self.prediction_tracks[-1]
self.line_prediction_drawn.start() # reset positions
self.line_prediction.get(SegmentLine).anim_f = partial(SegmentLine.anim_arrive, length=.3)
self.line_prediction.get(SegmentLine).duration = .5
self.line_prediction.get(DashedLine).skip = True
# print('restart')
self.line_prediction.start() # reset positions
# print(self.line_prediction.get(SegmentLine).running_for())
else:
if self.line_prediction.is_ready():
# little hack: check is dashedline skips, to only run this once per animation:
if self.line_prediction.get(DashedLine).skip:
# no new yet, but ready with anim, start stage 2
self.line_prediction.get(SegmentLine).anim_f = partial(SegmentLine.anim_grow)
self.line_prediction.get(SegmentLine).duration = 1
# self.line_prediction.get(SegmentLine).skip = True
self.line_prediction.get(DashedLine).skip = False
self.line_prediction.start()
else:
self.line_prediction.get(SegmentLine).anim_f = partial(SegmentLine.anim_grow, reverse=True)
self.line_prediction.get(SegmentLine).duration = 2
self.line_prediction.get(SegmentLine).start()
# self.line_prediction_dashed.set_offset_t(self.active_ptrack._track.track_update_dt() * 4)
self.line_prediction.points = self.active_ptrack._track.predictions[0]
self.line_prediction.root.points = self.active_ptrack._track.predictions[0]
if self.scene is ScenarioScene.CORRECTED_PREDICTION:
self.line_prediction_dashed.skip = False
if self.scene is ScenarioScene.LOITERING:
# special case: PLAY
transition = min(1, (time.time() - self.state_change_at)/1.4)
# TODO: transition fade, using to_alpha(), so it can fade back in again:
self.line_history.get(FadeOutJitterLine).set_alpha(1 - transition)
if transition > .999:
# fetch lines nearby
pass
elif self.scene is ScenarioScene.PLAY:
# special case: PLAY
pass
# if self.scene is ScenarioScene.CORRECTED_PREDICTION:
# self.line_prediction.get(DashedLine).skip = False
@ -303,21 +340,21 @@ class DrawnScenario(Scenario):
# 1) history, fade out when lost
self.line_history_faded.set_alpha(1-self.lost_factor())
self.line_prediction_faded.set_alpha(1-self.lost_factor())
self.line_history_noisy.amplitude = self.lost_factor()
self.line_history.get(FadeOutJitterLine).set_alpha(1-self.lost_factor())
self.line_prediction.get(FadeOutLine).set_alpha(1-self.lost_factor())
self.line_history.get(NoiseLine).amplitude = self.lost_factor()
# fade out history after max duration, given in frames
track_age_in_frames = self.track_age() * TRACK_ASSUMED_FPS
self.line_history_disappearing.set_frame_offset(track_age_in_frames)
self.line_history.get(FadedTailLine).set_frame_offset(track_age_in_frames)
# 2) also fade-out when moving into loitering mode.
# when fading out is done, start drawing historical data
history_line = self.line_history_drawn.as_renderable_line(dt)
prediction_line = self.line_prediction_drawn.as_renderable_line(dt)
history_line = self.line_history.as_renderable_line(dt)
prediction_line = self.line_prediction.as_renderable_line(dt)
# print(history_line)
# print(self.track_id, len(self.line_history.points), len(history_line))