From c4cdda64e1d25f38a12fad6450031fbe1d49fa91 Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Tue, 28 Oct 2025 16:03:37 +0100 Subject: [PATCH] Rework stage line animations --- trap/counter.py | 7 + trap/lines.py | 685 +++++++++++++++++++++- trap/node.py | 4 +- trap/stage.py | 1384 +++++++++++---------------------------------- trap/stage_old.py | 1009 +++++++++++++++++++++++++++++++++ 5 files changed, 2020 insertions(+), 1069 deletions(-) create mode 100644 trap/stage_old.py diff --git a/trap/counter.py b/trap/counter.py index 540ff92..ec9cdfb 100644 --- a/trap/counter.py +++ b/trap/counter.py @@ -37,8 +37,15 @@ class CounterFpsSender(): self.is_finished = threading.Event() def tick(self): + """ + returns dt since previous tock + """ self.iterations += 1 self.snapshot() + if len(self.tocs) > 1: + return float(self.tocs[-1][0] - self.tocs[-2][0]) + else: + return 0. def snapshot(self): self.tocs.append((time.perf_counter(), self.iterations)) diff --git a/trap/lines.py b/trap/lines.py index e8c4bb8..b5ccef1 100644 --- a/trap/lines.py +++ b/trap/lines.py @@ -1,20 +1,33 @@ from __future__ import annotations +from abc import ABC, abstractmethod +import copy from dataclasses import dataclass from enum import Enum, IntEnum import math from pathlib import Path -from typing import Dict, List, Tuple +import time +from typing import Dict, List, Optional, Tuple import numpy as np +import shapely +import shapely.ops from simplification.cutil import simplify_coords_idx, simplify_coords_vw_idx import svgpathtools + +from noise import snoise2 + +from trap.utils import exponentialDecayRounded, inv_lerp + + """ See [notebook](../test_path_transforms.ipynb) for examples """ RenderablePosition = Tuple[float,float] +Coordinate = Tuple[float, float] +DeltaT = float # delta_t in seconds class CoordinateSpace(IntEnum): CAMERA = 1 @@ -34,6 +47,10 @@ class SrgbaColor(): def as_faded(self, alpha: float) -> SrgbaColor: return SrgbaColor(self.red, self.green, self.blue, self.alpha * alpha) + + def __eq__(self, other): + return math.isclose(self.red, other.red) and math.isclose(self.green, other.green) and math.isclose(self.blue, other.blue) and math.isclose(self.alpha, other.alpha) + @dataclass class RenderablePoint(): @@ -63,6 +80,9 @@ class SimplifyMethod(Enum): class RenderableLine(): points: List[RenderablePoint] + def __len__(self): + return len(self.points) + def as_simplified(self, method: SimplifyMethod = SimplifyMethod.RDP, factor = SIMPLIFY_FACTOR_RDP): linestring = [p.position for p in self.points] if method == SimplifyMethod.RDP: @@ -72,6 +92,32 @@ class RenderableLine(): points = [self.points[i] for i in indexes] return RenderableLine(points) + def as_linestring(self): + return shapely.geometry.LineString([p.position for p in self.points]) + + @classmethod + def from_multilinestring(cls, mls: shapely.geometry.MultiLineString, color: SrgbaColor) -> RenderableLine: + points: List[RenderablePoint] = [] + for i, line in enumerate(mls.geoms): + if len(line.coords) < 2: + continue + + is_first = i == 0 + is_last = i == (len(mls.geoms) - 1) + + # blank point + if not is_first: + points.append(RenderablePoint(line.coords[0], color.as_faded(0))) + + points.extend( + [RenderablePoint(pos, color) for pos in line.coords] + ) + + # blank point + if not is_last: + points.append(RenderablePoint(line.coords[-1], color.as_faded(0))) + return RenderableLine(points) + @dataclass class RenderableLines(): @@ -137,7 +183,7 @@ def cross_points(cx, cy, r, c: SrgbaColor): return [path, path2] -def load_lines_from_svg(svg_path: Path, scale: float, c: SrgbaColor) -> List[RenderableLine]: +def load_lines_from_svg(svg_path: Path, scale: float, c: SrgbaColor, max_len = 0.3) -> List[RenderableLine]: lines = [] paths, attributes = svgpathtools.svg2paths(svg_path) @@ -181,12 +227,641 @@ def load_lines_from_svg(svg_path: Path, scale: float, c: SrgbaColor) -> List[Ren # Create LineString from coordinates if len(coordinates) > 1: coordinates = (np.array(coordinates) / scale).tolist() - points = [RenderablePoint(pos, c) for pos in coordinates] + + # cut into smaller segments, so the laser corrections apply nicely + linestring = shapely.geometry.LineString(coordinates) + linestring = shapely.segmentize(linestring, max_len) + + + points = [RenderablePoint(pos, c) for pos in linestring.coords] line = RenderableLine(points) lines.append(line) - # linestring = shapely.geometry.LineString(coordinates) # linestrings.append(linestring) except Exception as e: print(f"Error processing path: {e}") - return lines \ No newline at end of file + return lines + + + +class LineGenerator(ABC): + @abstractmethod + def update_drawn_positions(self, dt: DeltaT): + pass + + def extend(self, *args): + self.points.extend(args) + + + def as_renderable(self, dt: DeltaT, color: SrgbaColor) -> RenderableLines: + points = [RenderablePoint(p, color) for p in self.get_drawn_points(dt)] + lines = [RenderableLine(points)] + return RenderableLines(lines) + +class StaticLine(LineGenerator): + def __init__(self, points: Optional[List[Coordinate]] = None): + self.target_points: List[Coordinate] = points if points is not None else [] + self._drawn_points = points + + def get_drawn_points(self, dt): + return self._drawn_points + + + def update_drawn_positions(self, dt): + # nothing to update + pass + +class AppendableLine(LineGenerator): + """ + A line generator that allows for points to be added over time. + Simply use `line.points.extend([p1, p2])` and it will animate in + """ + def __init__(self, points: Optional[List[Coordinate]] = None, draw_decay_speed = 25.): + self.target_points: List[Coordinate] = points if points is not None else [] # when providing [] as default, it messes up instancing by reusing the same list + self.drawn_points = [] + self.ready = len(self.target_points) == 0 + self.draw_decay_speed = draw_decay_speed + + def nr_of_passed_points(self): + """The number of points passed in the animation""" + return len(self.drawn_points) - 1 + + def update_drawn_positions(self, dt: DeltaT): + if len(self.target_points) == 0: + # nothing to draw yet + return + + # self._drawn_points = self.points + + if len(self.drawn_points) == 0: + # create origin + self.drawn_points.append(self.target_points[0]) + # and drawing head + self.drawn_points.append(self.target_points[0]) + + idx = len(self.drawn_points) - 1 + target = self.target_points[idx] + + if np.isclose(self.drawn_points[-1], target, atol=.05).all(): + # TODO: might want to migrate to np.isclose() + if len(self.drawn_points) == len(self.target_points): + self.ready = True + return # done until a new point is added + + # add new point as drawing head + self.drawn_points.append(self.drawn_points[-1]) + self.ready = False + + x = exponentialDecayRounded(self.drawn_points[-1][0], target[0], self.draw_decay_speed, dt, .05) + y = exponentialDecayRounded(self.drawn_points[-1][1], target[1], self.draw_decay_speed, dt, .05) + self.drawn_points[-1] = (float(x), float(y)) + +class ProceduralChain(LineGenerator): + """A line that can be 'dragged' to a target. In which + it disappears.""" + MOVE_DECAY_SPEED = 80 # speed at which the drawing head should approach the next point + VELOCITY_DAMPING = 10 + VELOCITY_FACTOR = 2 + link_size = .1 # 10cm + # angle_constraint = 5 + + def __init__(self, joints: List[Coordinate], use_velocity = False): + self.joints: List[Coordinate] = joints + self.target: Coordinate = joints[-1] + self.ready = False + self.move_decay_speed = self.MOVE_DECAY_SPEED + + self.use_velocity = use_velocity + if self.use_velocity: + if len(self.joints) > 1: + self.v = np.array(self.joints[-2]) - np.array(self.joints[-1]) + self.v /= np.linalg.norm(self.v) / 10 + else: + self.v = np.array([0,0]) + + @classmethod + def from_appendable_line(cls, al: AppendableLine) -> ProceduralChain: + # TODO: create more segments: + # last added points becomes the head of the chain + points = list(reversed(al.target_points)) + linestring = shapely.LineString(points) + linestring = linestring.segmentize(cls.link_size) + joints = list(linestring.coords) + + return cls(joints) + + def update_drawn_positions(self, dt: DeltaT): + if self.ready: + return + # direction = np.array(self.joints[-1] - self.target) + + # TODO: check self.joints empty, and stop then + if self.use_velocity: + vx = exponentialDecayRounded(self.v[0], self.target[0] - self.joints[0][0], self.VELOCITY_DAMPING, dt, .05) + vy = exponentialDecayRounded(self.v[1], self.target[1] - self.joints[0][1], self.VELOCITY_DAMPING, dt, .05) + self.v = np.array([vx, vy]) + self.joints[0] = (float(self.joints[0][0] + self.v[0] * dt * self.VELOCITY_FACTOR), float(self.joints[0][1] + self.v[1] * dt * self.VELOCITY_FACTOR)) + else: + x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05) + y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05) + self.joints[0] = (float(x), float(y)) + + # Loop inspired by: https://github.com/argonautcode/animal-proc-anim/blob/main/Chain.pde + # see that code for angle constrains. + for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1): + diff = np.array(prev_joint) - np.array(joint) + direction = diff / np.linalg.norm(diff) + self.joints[i] = prev_joint - direction * self.link_size + + if np.isclose(self.joints[0], self.target, atol=.05).all(): + # self.ready = True + # TODO: smooth transition instead of cutting off + self.joints.pop(0) + if len(self.joints) == 0: + self.ready = True + + self._drawn_points = self.joints + +# class AnimatedLineMask(): +# """ +# given a line, create an animation by masking/skewing it +# """ +# def __init__(self, line: LineGenerator): +# self.line = line +# self.start_at = time.time() + +# @abstractmethod +# def apply(self, dt: DeltaT, color: SrgbaColor) -> RenderableLines: +# pass + +# class FadeoutMask(AnimatedLineMask): +# def __init__(self, line: LineGenerator, max_len = 200, fade_len = 20): +# self.line = LineGenerator + +# def apply(self, lines: RenderableLines, factor): +# for line in lines.lines: +# for point in line.points: +# point.color = point.color.as_faded(factor) + + +# class FadeoutTailMask(AnimatedLineMask): +# def __init__(self, line: LineGenerator, max_len = 200, fade_len = 20): +# self.line = LineGenerator + +class StaticLine(): + """ + Always a continuous line of monotonous color + """ + def __init__(self, points = None, color: SrgbaColor = None): + self.points: List[Coordinate] = points if points is not None else [] # when providing [] as default, it messes up instancing by reusing the same list + self.color = color if color else SrgbaColor(0,0,0,1) + + def get_positions(self): + return self.points + + def __len__(self): + return len(self.points) + + def extend(self, coords: List[Coordinate]): + self.points.extend(coords) + + def as_renderable_line(self, dt: DeltaT) -> RenderableLine: + points = [RenderablePoint(p, self.color) for p in self.points] + line = RenderableLine(points) + return line + + def is_ready(self): + return True + + def start(self): + return True + + + +class LineAnimator(StaticLine): + """ + Animate a line, can/should be nested. Always derives from a StaticLine + """ + def __init__(self, target_line: Optional[StaticLine] = None): + # print(self, target_line, bool(target_line), target_line is not None) + self.target = target_line if target_line is not None else StaticLine() + self.ready = len(self.target) == 0 + self.start_t = time.time() + self.skip = False + + def extend(self, coords): + return self.target.extend(coords) + + def __len__(self): + return len(self.target) + + def as_renderable_line(self, dt: DeltaT) -> RenderableLine: + target = self.target.as_renderable_line(dt) + if self.skip: + return target + + return self.apply(target, dt) + + def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine: + raise RuntimeError("Not Implemented") + + def is_ready(self): + return self.ready and self.target.is_ready() + + def start(self): + self.target.start() + + self.start_t = time.time() + return True + + def running_for(self): + if self.start: + return time.time() - self.start_t + return 0. + + + +class AppendableLineAnimator(LineAnimator): + """ + Animate a line, can/should be nested. Always derives from a StaticLine + """ + def __init__(self, target_line = None, draw_decay_speed: float = 25., draw_decay_speed_init = 1000, transition_in_on_init = True): + super().__init__(target_line) + self.drawn_points: List[RenderablePoint] = [] + self.draw_decay_speed = draw_decay_speed + self.draw_decay_speed_init = draw_decay_speed_init # faster drawing until catching up + self.transition_in_on_init = transition_in_on_init # when False, immediately draw the full line + self.is_init = False + + + def apply(self, target_line, dt: DeltaT) -> RenderableLine: + if len(target_line) == 0: + # nothing to draw yet + return RenderableLine() + + + + if len(self.drawn_points) == 0: + if self.transition_in_on_init: + # create origin + self.drawn_points.append(target_line.points[0]) + else: + self.drawn_points.extend(target_line.points[:-1]) + # and a copy as drawing head + self.drawn_points.append(copy.deepcopy(self.drawn_points[-1])) + + idx = len(self.drawn_points) - 1 + target = target_line.points[idx] + + if np.isclose(self.drawn_points[-1].position, target.position, atol=.05).all(): + # TODO: might want to migrate to np.isclose() + if len(self.drawn_points) == len(target_line): + self.ready = True + self.is_init = True + return RenderableLine(self.drawn_points) # done until a new point is added + + # add new point as drawing head, by duplicating the current point + self.drawn_points.append(copy.deepcopy(self.drawn_points[-1]) ) + self.drawn_points[-1].color = target.color # set to color of the next point + self.ready = False + + decay_speed = self.draw_decay_speed if self.is_init else self.draw_decay_speed_init + x = exponentialDecayRounded(self.drawn_points[-1].position[0], target.position[0], decay_speed, dt, .05) + y = exponentialDecayRounded(self.drawn_points[-1].position[1], target.position[1], decay_speed, dt, .05) + + # handle color gradient: + # if self.drawn_points[-1].color != target.color: + # # t + # tx = inv_lerp(self.drawn_points[-2].position[0], target.position[0], x) + # ty = inv_lerp(self.drawn_points[-2].position[1], target.position[1], y) + # t = (tx+ty) / 2 + # TODO: this should set color to intermediate color, but this is hardly used, so skip it for sake of speed + + self.drawn_points[-1].position = (float(x), float(y)) + + + + return RenderableLine(self.drawn_points) + + +class FadeOutLine(LineAnimator): + """ + Fade the line providing an alpha, 1 by default + """ + def __init__(self, target_line = None): + super().__init__(target_line) + self.alpha = 1 + self.ready = True # filter holds no state, so always ready + + def set_alpha(self, alpha: float): + self.alpha = min(1, max(0, alpha)) + + + def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine: + + for point in target_line.points: + point.color = point.color.as_faded(self.alpha) + + return target_line + + +class CropLine(LineAnimator): + """ + Crop the line at a max nr of points (thus not actual lenght!) + Keeps the tail, removes the start + """ + def __init__(self, target_line = None, max_points = 200): + super().__init__(target_line) + self.max_points = max_points + self.ready = True # static filter, always ready + + def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine: + target_line.points = target_line.points[-1 * self.max_points:] + return target_line + +class FadedTailLine(LineAnimator): + """ + Fade the tail of the line, proving a max length + """ + def __init__(self, target_line = None, fade_after: int = 170, fade_frames: int = 30): + super().__init__(target_line) + self.fade_after = fade_after + self.fade_frames = fade_frames + self.frame_offset = 0 + + def set_frame_offset(self, frame_offset: int): + """ + This can be used to provide an additional offset between fade_frames and the lenght of + the actual track. Can e.g. be used to process the diff between last tracking time and + actual wall time (in case a track is lost, it keeps getting 'eaten') + """ + self.frame_offset = frame_offset + + + def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine: + l = len(target_line.points) + points = [] + for i, point in enumerate(target_line.points): + reverse_i = l - i + fade_i = reverse_i - self.fade_after + self.frame_offset # -90 + fade_i /= self.fade_frames # / 10 + # fade_i = np.clip(fade_i, 0, 1) + alpha = 1 - fade_i + if alpha >= 0: + alpha = min(1, alpha) + point.color = point.color.as_faded(alpha) + points.append(point) + + self.ready = not bool(len(points)) + + return RenderableLine(points) + + +class NoiseLine(LineAnimator): + """ + Apply animated noise to line normals + """ + def __init__(self, target_line = None, amplitude=.3, frequency=.02, t_factor = 1.0): + super().__init__(target_line) + self.amplitude = amplitude + self.frequency = frequency + self.t_factor = t_factor + self.t = 0 + self.ready = True # static filter, always ready + + # Gemma3:27b prompt: "python. Given a list of coordinates, that describes a line: `drawable_points: List[Tuple[float,float]]` apply perlin noise over the normal of the line, that changes over time `dt`." + @classmethod + def apply_perlin_noise_to_line_normal(cls, drawable_points: np.ndarray, t: float, amplitude: float = 1.0, frequency: float = 1.0, fade_over_n_points = 8) -> List[RenderablePosition]: + """ + Applies Perlin noise to the normals of a line described by a list of coordinates, changing over time. + + Args: + drawable_points: A list of (x, y) tuples representing the points of the line. + dt: The time delta, used to animate the Perlin noise. + amplitude: The strength of the Perlin noise effect. + frequency: The frequency of the Perlin noise (how many waves per unit). + + Returns: + A new list of (x, y) tuples representing the line with Perlin noise applied to the normals. If drawable_points + has fewer than 2 points, it returns the original list unchanged. + + Raises: + TypeError: If drawable_points is not a list or dt is not a float. + ValueError: If the input points are not tuples of length 2. + """ + + if len(drawable_points) < 2: + return drawable_points # Nothing to do with fewer than 2 points + + # for point in drawable_points: + # if not isinstance(point, tuple) or len(point) != 2: + # raise ValueError("Each point in drawable_points must be a tuple of length 2.") + + + # noise = PerlinNoise(octaves=4) # You can adjust octaves for different noise patterns + + new_points = [] + for i in range(len(drawable_points)): + x, y = copy.deepcopy(drawable_points[i]) + + # Calculate the normal vector. We'll approximate it using the previous and next points. + if i == 0: + # For the first point, use the next point to estimate the normal + next_x, next_y = drawable_points[i + 1] + normal_x = next_y - y + normal_y = -(next_x - x) + elif i == len(drawable_points) - 1: + # For the last point, use the previous point + prev_x, prev_y = drawable_points[i - 1] + normal_x = y - prev_y + normal_y = -(x - prev_x) + else: + prev_x, prev_y = drawable_points[i - 1] + next_x, next_y = drawable_points[i + 1] + normal_x = next_y - prev_y + normal_y = -(next_x - prev_x) + + # Normalize the normal vector + norm = np.sqrt(normal_x**2 + normal_y**2) + if norm > 0: + normal_x /= norm + normal_y /= norm + + # Apply Perlin noise to the normal + # noise_x = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_x + # noise_y = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_y + noise = snoise2(i * frequency, t % 1000, octaves=4) + + use_amp = amplitude + if fade_over_n_points > 0: + rev_step = len(drawable_points) - i + amp_factor = rev_step / fade_over_n_points + if amp_factor < 1: + use_amp *= amp_factor + + noise_x = noise * use_amp * normal_x + noise_y = noise * use_amp * normal_y + + # print(noise_x, noise_y, dt, frequency, i, dt, snoise2(i * frequency, dt % 1000, octaves=4)) + + + # Add the noise to the point's coordinates + new_x = x + noise_x + new_y = y + noise_y + + new_points.append((new_x, new_y)) + + + return new_points + + def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine: + + self.t += dt + + if self.amplitude < 0.01: + return target_line + + positions = np.array([p.position for p in target_line.points]) + new_positions = self.apply_perlin_noise_to_line_normal( + positions, + self.t * self.t_factor, + self.amplitude, + self.frequency + ) + + points = [] + for point, pos in zip(target_line.points, new_positions): + p = copy.deepcopy(point) + p.position = pos + points.append(p) + + + return RenderableLine(points) + +class SegmentLine(LineAnimator): + def __init__(self, target_line = None): + super().__init__(target_line) + + def apply(self, target_line: RenderableLine, dt: DeltaT): + if len(target_line) < 2: + return target_line + i = self.running_for() + ls = target_line.as_linestring() + + return super().apply(target_line, dt) + +class DashedLine(LineAnimator): + """ + Dashed line + """ + def __init__(self, target_line = None, dash_len: float = 1., gap_len: float = .5, t_factor: float = 1., loop_offset: bool = False): + super().__init__(target_line) + self.dash_len = dash_len + self.gap_len = gap_len + self.loop_offset = loop_offset + self.t_factor = t_factor + + # def set_offset_t(self, dt: DeltaT): + # self.offset_t = dt + + + @classmethod + def dashed_line(cls, line: shapely.geometry.LineString, dash_len: float, gap_len: float, offset: float = 0, loop_offset = True) -> shapely.geometry.MultiLineString: + total_length = line.length + + # index_helper = LineStringIncrementingDistanceOffset(line) + + segments = [] + + if loop_offset: + # by default, prepend skipped gap + pos = offset % (dash_len + gap_len) + + if pos > gap_len: + # TODO: use index_helper.get_index_and_offset to lerp the colors of all points on substring + segments.append(shapely.ops.substring(line, 0, pos - gap_len)) + else: + pos = offset + + while pos < total_length: + end = min(pos + dash_len, total_length) + if pos < end: + # TODO: use index_helper.get_index_and_offset to lerp the colors of all points on substring + dash = shapely.ops.substring(line, pos, end) + segments.append(dash) + pos += dash_len + gap_len + + # TODO: return all color together with the points + return shapely.geometry.MultiLineString(segments) + + + def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine: + """ + warning, dashing (for now) removes all color + """ + + if len(target_line) < 2: + self.ready = True + return target_line + + ls = target_line.as_linestring() + multilinestring = self.dashed_line(ls, self.dash_len, self.gap_len, self.t_factor * self.running_for(), self.loop_offset) + + + self.ready = not bool(len(multilinestring.geoms)) + + color = target_line.points[0].color + + return RenderableLine.from_multilinestring(multilinestring, color) + + +IndexAndOffset = Tuple[int, float] + +class LineStringIncrementingDistanceOffset(): + """ + Helper for linestrings: find the index + lerp-offset, ASAP provided that + distance to look for is always increasing + """ + def __init__(self, ls: shapely.geometry.LineString): + self.ls = ls + self.gen = self.index_distances(self.ls) + self.current_range = next(self.gen) + self.pos = 0 + + def get_index_and_offset(self, length: float) -> IndexAndOffset: + if length > self.pos: + raise RuntimeError("Cannot reverse") + self.pos = length + + while True: + index_range = self.current_range[0] + start_distance = self.current_range[1][0] + end_distance = self.current_range[1][1] + + if length < end_distance: + # fill here + + segment_distance = length - start_distance + segment_length = end_distance - start_distance + + lerp = segment_distance / segment_length + + return index_range[0], lerp + else: + try: + self.current_range = next(self.gen) + except StopIteration as e: + # when exhausting the list return the last index + return len(self.ls.coords) - 1, 0. + + @classmethod + def index_distances(cls, line: shapely.geometry.LineString): + """ + Build a list of segments with the distance range they cover + """ + cumulative_length = 0 + for i in range(len(line.coords) - 1): + segment_length = line.coords[i+1].distance(line.coords[i]) + start_at = cumulative_length + cumulative_length += float(segment_length) + yield (i, i+1), (start_at, cumulative_length) \ No newline at end of file diff --git a/trap/node.py b/trap/node.py index b366d06..d78441b 100644 --- a/trap/node.py +++ b/trap/node.py @@ -22,6 +22,8 @@ class Node(): self._prev_loop_time = 0 + self.dt_since_last_tick = 0 + self.setup() @classmethod @@ -29,7 +31,7 @@ class Node(): return logging.getLogger(f"trap.{cls.__name__}") def tick(self): - self.fps_counter.tick() + self.dt_since_last_tick = self.fps_counter.tick() # with self.fps_counter.get_lock(): # self.fps_counter.value+=1 diff --git a/trap/stage.py b/trap/stage.py index feea32d..f3526aa 100644 --- a/trap/stage.py +++ b/trap/stage.py @@ -1,338 +1,32 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod from argparse import ArgumentParser from collections import defaultdict from dataclasses import dataclass from enum import Enum -import json import logging -import math -import pickle import time -from typing import Dict, List, Optional, Tuple -from matplotlib.pyplot import isinteractive -import numpy as np -from shapely import LineString, MultiLineString, line_locate_point, linestrings +import threading +from typing import Dict, List, Optional -import shapely -from shapely.ops import substring -from statemachine import Event, State, StateMachine -from statemachine.exceptions import TransitionNotAllowed import zmq - -from sgan.sgan import data -from trap import shapes -from trap.base import Camera, DataclassJSONEncoder, DistortedCamera, Frame, ProjectedTrack, Track +from trap.anomaly import DiffSegment, calc_anomaly, calculate_loitering_scores +from trap.base import DataclassJSONEncoder, Frame, ProjectedTrack, Track from trap.counter import CounterSender -from trap.laser_renderer import circle_points, rotateMatrix -from trap.lines import RenderableLayers, RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SimplifyMethod, SrgbaColor, circle_arc, load_lines_from_svg +from trap.lines import AppendableLine, AppendableLineAnimator, Coordinate, CropLine, DashedLine, DeltaT, FadeOutLine, FadedTailLine, NoiseLine, RenderableLayers, RenderableLine, RenderableLines, SimplifyMethod, SrgbaColor, StaticLine, load_lines_from_svg from trap.node import Node -from trap.timer import Timer -from trap.utils import exponentialDecay, exponentialDecayRounded, lerp, relativePointToPolar, relativePolarToPoint -from noise import snoise2 logger = logging.getLogger('trap.stage') -Coordinate = Tuple[float, float] -DeltaT = float # delta_t in seconds - OPTION_RENDER_DEBUG = False OPTION_POSITION_MARKER = False OPTION_GROW_ANOMALY_CIRCLE = False # OPTION_RENDER_DIFF_SEGMENT = True OPTION_TRACK_NOISE = False -class LineGenerator(ABC): - @abstractmethod - def update_drawn_positions(self, dt: DeltaT): - pass - - def as_renderable(self, color: SrgbaColor) -> RenderableLines: - points = [RenderablePoint(p, color) for p in self._drawn_points] - lines = [RenderableLine(points)] - return RenderableLines(lines) - -class AppendableLine(LineGenerator): - """ - A line generator that allows for points to be added over time. - Simply use `line.points.extend([p1, p2])` - """ - def __init__(self, points: Optional[List[Coordinate]] = None, draw_decay_speed = 25.): - self.points: List[Coordinate] = points if points is not None else [] # when providing [] as default, it messes up instancing by reusing the same list - self._drawn_points = [] - self.ready = len(self.points) == 0 - self.draw_decay_speed = draw_decay_speed - - def nr_of_passed_points(self): - """The number of points passed in the animation""" - return len(self._drawn_points) - 1 - - def update_drawn_positions(self, dt: DeltaT): - if len(self.points) == 0: - # nothing to draw yet - return - - # self._drawn_points = self.points - - if len(self._drawn_points) == 0: - # create origin - self._drawn_points.append(self.points[0]) - # and drawing head - self._drawn_points.append(self.points[0]) - - idx = len(self._drawn_points) - 1 - target = self.points[idx] - - if np.isclose(self._drawn_points[-1], target, atol=.05).all(): - # TODO: might want to migrate to np.isclose() - if len(self._drawn_points) == len(self.points): - self.ready = True - return # done until a new point is added - # add new point as drawing head - self._drawn_points.append(self._drawn_points[-1]) - self.ready = False - - x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.draw_decay_speed, dt, .05) - y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.draw_decay_speed, dt, .05) - self._drawn_points[-1] = (float(x), float(y)) - -class ProceduralChain(LineGenerator): - """A line that can be 'dragged' to a target. In which - it disappears.""" - MOVE_DECAY_SPEED = 80 # speed at which the drawing head should approach the next point - VELOCITY_DAMPING = 10 - VELOCITY_FACTOR = 2 - link_size = .1 # 10cm - # angle_constraint = 5 - - def __init__(self, joints: List[Coordinate], scenario: DrawnScenario, use_velocity = False): - self.joints: List[Coordinate] = joints - self.target: Coordinate = joints[-1] - self.ready = False - self.move_decay_speed = self.MOVE_DECAY_SPEED - self.scenario = scenario - - self.use_velocity = use_velocity - if self.use_velocity: - if len(self.joints) > 1: - self.v = np.array(self.joints[-2]) - np.array(self.joints[-1]) - self.v /= np.linalg.norm(self.v) / 10 - else: - self.v = np.array([0,0]) - - @classmethod - def from_appendable_line(cls, al: AppendableLine, scenario: DrawnScenario) -> ProceduralChain: - # TODO: create more segments: - # last added points becomes the head of the chain - points = list(reversed(al.points)) - linestring = LineString(points) - linestring = linestring.segmentize(cls.link_size) - joints = list(linestring.coords) - - return cls(joints, scenario) - - def update_drawn_positions(self, dt: DeltaT): - if self.ready: - return - # direction = np.array(self.joints[-1] - self.target) - - # TODO: check self.joints empty, and stop then - if self.use_velocity: - vx = exponentialDecayRounded(self.v[0], self.target[0] - self.joints[0][0], self.VELOCITY_DAMPING, dt, .05) - vy = exponentialDecayRounded(self.v[1], self.target[1] - self.joints[0][1], self.VELOCITY_DAMPING, dt, .05) - self.v = np.array([vx, vy]) - self.joints[0] = (float(self.joints[0][0] + self.v[0] * dt * self.VELOCITY_FACTOR), float(self.joints[0][1] + self.v[1] * dt * self.VELOCITY_FACTOR)) - else: - x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05) - y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05) - self.joints[0] = (float(x), float(y)) - - # Loop inspired by: https://github.com/argonautcode/animal-proc-anim/blob/main/Chain.pde - # see that code for angle constrains. - for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1): - diff = np.array(prev_joint) - np.array(joint) - direction = diff / np.linalg.norm(diff) - self.joints[i] = prev_joint - direction * self.link_size - - if np.isclose(self.joints[0], self.target, atol=.05).all(): - # self.ready = True - # TODO: smooth transition instead of cutting off - self.joints.pop(0) - self.scenario.add_anomaly_length(self.link_size) - if len(self.joints) == 0: - self.ready = True - - self._drawn_points = self.joints - - -class DiffSegment(): - """ - A segment of a prediction track, that can be diffed - with a track. The track is continously update. - If a new prediction comes in, the diff is marked as - finished. After which it is animated and added to the - Scenario's anomaly score. - """ - DRAW_DECAY_SPEED = 25 - POINT_INTERVAL = 4 - - def __init__(self, prediction: ProjectedTrack): - self.ptrack = prediction - self._last_diff_frame_idx = 0 - self.finished = False - - self.line = AppendableLine( draw_decay_speed=self.DRAW_DECAY_SPEED) - self.points: List[Coordinate] = [] - self._drawn_points = [] - self._target_track = prediction - - def finish(self): - self.finished = True - - def nr_of_passed_points(self): - if isinstance(self.line, AppendableLine): - return self.line.nr_of_passed_points() * self.POINT_INTERVAL - else: - return len(self.points) * self.POINT_INTERVAL - - # run on each track update received - def update_track(self, track: ProjectedTrack): - self._target_track = track - - if self.finished: - # don't add new points if finished - return - - # migrate SceneraioScene function - start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx) - traj_diff_steps_back = track.frame_index - start_frame_idx # positive value - pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value - - if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back: - logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip") - # elif len(ptrack.predictions[0]) < pred_diff_steps_back: - # logger.warning("Prediction does not reach prediction start. Should not be possible. Skip") - else: - trajectory = track.projected_history - - # from start to as far as it gets - trajectory_range = trajectory[-1*traj_diff_steps_back:] - prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space - line = [] - for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)): - offset_from_start = (pred_diff_steps_forward + i) - if offset_from_start % self.POINT_INTERVAL == 0: - self.line.points.extend([p1, p2]) - self.points.extend([p1, p2]) - - self._last_diff_frame_idx = track.frame_index - - - # run each render tick - def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario): - if isinstance(self.line, AppendableLine): - if self.finished and self.line.ready: - # convert when fully drawn - # print(self, "CONVERT LINE") - self.line = ProceduralChain.from_appendable_line(self.line, scenario) - - if isinstance(self.line, ProceduralChain): - self.line.target = self._target_track.projected_history[-1] - - - # if not self.finished or not self.line.ready: - self.line.update_drawn_positions(dt) - - - - def as_renderable(self) -> RenderableLines: - color = SrgbaColor(0,0,1,1) - # lines = [] - # points = [RenderablePoint(p, color) for p in self._drawn_points] - # lines = [RenderableLine(points)] - # return RenderableLines(lines) - if not self.finished or not self.line.ready: - return self.line.as_renderable(color) - return self.line.as_renderable(color) - # points = [RenderablePoint(p, color) for p in self._drawn_points] - # lines = [RenderableLine(points)] - return RenderableLines([]) - - -class DiffSegmentScan(DiffSegment): - """ - Provide alternative diffing, in the form of a sort of scan line - Should be faster with the laser - TODO: This is work in progress, does not work yet! - """ - - def __init__(self, prediction: ProjectedTrack): - self.ptrack = prediction - self._target_track = prediction - self.finished = False - self._last_diff_frame_idx = 0 - - def finish(self): - self.finished = True - - def prediction_offset(self): - """Difference is starting moment between track and prediction""" - return self.ptrack.frame_index - self._target_track.frame_index - - def nr_of_passed_points(self): - """Number of points of the given ptrack that have passed""" - return len(self._target_track.projected_history) - 1 - self.prediction_offset() - # len(self.points) * self.POINT_INTERVAL - - # run on each track update received - def update_track(self, track: ProjectedTrack): - self._target_track = track - - if self.finished: - # don't add new points if finished - return - - start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx) - traj_diff_steps_back = track.frame_index - start_frame_idx # positive value - pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value - self._last_diff_frame_idx = track.frame_index - - # run each render tick - def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario): - # if not self.finished or not self.line.ready: - # self.line.update_drawn_positions(dt) - pass # TODO: use easing - - - - def as_renderable(self) -> RenderableLines: - if self.finished: - return RenderableLines([]) - color = SrgbaColor(0,0,1,1) - # steps_diff = self.nr_of_passed_points() - idx = self.nr_of_passed_points() - if len(self.ptrack.predictions[0]) < idx+1: - self.finish() - return RenderableLines([]) - points = [self._target_track.projected_history[-1], self.ptrack.predictions[0][idx]] - - points = [RenderablePoint(pos, color) for pos in points] - line = RenderableLine(points) - - return RenderableLines([line]) - - -class ScenarioScene(Enum): - DETECTED = 1 - FIRST_PREDICTION = 2 - CORRECTED_PREDICTION = 3 - LOITERING = 4 - PLAY = 4 - LOST = -1 +TRACK_ASSUMED_FPS = 12 +TAKEOVER_FADEOUT = 3 LOST_FADEOUT = 3 PREDICTION_INTERVAL: float|None = 20 # frames PREDICTION_FADE_IN: float = 3 @@ -342,171 +36,97 @@ PREDICTION_END_FADE = 2 #frames # TRACK_MAX_POINTS = 100 TRACK_FADE_AFTER_DURATION = 15. # seconds TRACK_END_FADE = 30 # points -TRACK_FADE_ASSUME_FPS = 12 +TRACK_FADE_ASSUME_FPS = TRACK_ASSUMED_FPS -# Don't render the first n points of the prediction, -# helps to avoid jitter in the line transition -# PREDICTION_OFFSET = int(TRACK_FADE_ASSUME_FPS * PREDICTION_INTERVAL * .8) - -class TrackScenario(StateMachine): - detected = State(initial=True) - substantial = State() - first_prediction = State() - corrected_prediction = State() - loitering = State() - play = State() - lost = State(final=True) - - receive_track = lost.from_( - detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost" - ) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long") - - mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play) - - receive_prediction = detected.to(first_prediction) | substantial.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing") +# LOITERING_WINDOW = 8 * TRACK_ASSUMED_FPS +# LOITERING_DISTANCE = 1 # meter diff in LOITERING_WINDOW time +# LOITERING_MEDIAN_FILTER = TRACK_ASSUMED_FPS // 3 # frames: smooth out velocity over n frames +LOITERING_VELOCITY_TRESHOLD = .5 # m/s +LOITERING_DURATION_TO_LINGER = TRACK_ASSUMED_FPS * 1 # start counting as lingering after this many frames +LOITERING_LINGER_FACTOR = TRACK_ASSUMED_FPS * 4 # number of frames to reach loitering score of 1 (+LOITERING_DURATION_TO_LINGER) - def __init__(self): - self.track: ProjectedTrack = None - self.camera: Optional[Camera] = None - # self.first_prediction_track: Optional[Track] = None - # self.prediction_track: Optional[Track] = None - self.predictions: List[Track] = [] +class DefaultDictKeyed(dict): + def __init__(self, factory): + self.factory = factory + + def __missing__(self, key): + self[key] = self.factory(key) + return self[key] + +@dataclass +class SceneInfo: + priority: int + description: str = "" + takeover_possible: bool = False # whether to allow for other scenarios to steal the stage + +class ScenarioScene(Enum): + DETECTED = SceneInfo(4, "First detection") + SUBSTANTIAL = SceneInfo(6, "Multiple detections") + FIRST_PREDICTION = SceneInfo(10, "Prediction is ready") + CORRECTED_PREDICTION = SceneInfo(11, "Multiple predictions") + LOITERING = SceneInfo(7, "Foundto be loitering", takeover_possible=True) + PLAY = SceneInfo(7, description="After many predictions; just fooling around", takeover_possible=True) + LOST = SceneInfo(-1, description="Track lost", takeover_possible=True) + +Time = float + +class Scenario: + def __init__(self, track_id): + self.track_id = track_id + self.scene: ScenarioScene = ScenarioScene.DETECTED + self.start_time = 0. + self.current_time = 0 + self.take_over_at: Optional[Time] = None + + self.track: Optional[ProjectedTrack] = None + self.prediction_tracks: List[ProjectedTrack] = [] self._last_diff_frame_idx: Optional[int] = 0 - self.diffs: List[Tuple[Coordinate, Coordinate]] = [] self.prediction_diffs: List[DiffSegment] = [] - super().__init__() - def track_is_long(self, track: ProjectedTrack): - return len(track.history) > 20 + self.state_change_at = None - def track_is_lost(self, track: ProjectedTrack): - # return self._track and self._track.created_at < time.time() - 5 - return track.lost # Note, for now this is not implemented in the tacker, see check_lost() + self.is_running = False - def track_is_loitering(self, track: ProjectedTrack): - # TODO)) Change to measure displacement over the last n seconds - return len(track.history) > (track.fps * 60) # seconds after which someone is loitering - - def prediction_is_stale(self, track: ProjectedTrack): - # TODO use displacement instead of time - return bool(len(self.predictions) and self.predictions[-1].created_at < (time.time() - 2)) - - def prediction_is_playing(self, track): - return False - - def check_lost(self): - if self.current_state is not self.lost and self.track and self.track.updated_at < time.time() - 5: - self.mark_lost() - - def set_track(self, track: ProjectedTrack): - if self.track and self.track.created_at > track.created_at: - # ignore old track - return - - self.track = track - self.update_prediction_diff() + logger.info(f"Found {self.track_id}: {self.scene.name}") - # check to change state - try: - self.receive_track(track) - except TransitionNotAllowed as e: - # state change is optional - pass + def start(self): + # change when visible + logger.info(f"Start {self.track_id}: {self.scene.name}") + self.is_running = True - - def update_prediction_diff(self): - """ - gather the diffs of the trajectory with the most recent prediction - """ - if len(self.prediction_diffs) == 0: - return - - for diff in self.prediction_diffs: - diff.update_track(self.track) - - - - def add_prediction(self, track: ProjectedTrack): + def track_age(self): if not self.track: - # in case of the unlikely event that prediction was passed sooner - self.set_track(track) - - # if not self.first_prediction_track: - # self.first_prediction_track = track - - if PREDICTION_INTERVAL is not None and len(self.predictions) and (track.frame_index - self.predictions[-1].frame_index) < PREDICTION_INTERVAL: - # just drop tracks if the predictions come to quick + return 0 + return time.time() - self.track.updated_at + + def take_over(self): + if self.take_over_at: return - if track._track.predictions is None or not len(track._track.predictions): - # don't count to predictions if no prediction is set of given track (e.g. young tracks) - return + self.take_over_at = time.time() - - self.predictions.append(track) - if len(self.prediction_diffs): - self.prediction_diffs[-1].finish() # existing diffing can end - # and create a new one - self.prediction_diffs.append(DiffSegment(track)) - # self.prediction_diffs.append(DiffSegmentScan(track)) + def taken_over(self): + self.is_running = False + self.take_over_at = None - # check to change state - try: - self.receive_prediction(track) - except TransitionNotAllowed as e: - # state change is optional - pass + def takenover_for(self): + if self.take_over_at: + return time.time() - self.take_over_at + return None + + def takeover_factor(self): + l = self.takenover_for() + if not l: + return 0 + return l/TAKEOVER_FADEOUT - def after_receive_track(self, track: ProjectedTrack): - print('changed state') - def on_receive_track(self, track: ProjectedTrack): - # on event, only runs once, upon first track - print('updating track!') - - def on_receive_prediction(self, track: ProjectedTrack): - # on event, because it happens for every receive, despite transition - print('updating prediction!') - # self.track = track - - def after_receive_prediction(self, track: ProjectedTrack): - # after - pass - # self.prediction_track = track - # if not self.first_prediction_track: - # self.first_prediction_track = track - - def on_enter_corrected_prediction(self): - print('corrected!') - - def on_enter_detected(self): - print("DETECTED!") - - def on_enter_first_prediction(self): - print("Hello!") - - def on_enter_detected(self): - print(f"enter {self.current_state.id}") - def on_enter_substantial(self): - print(f"enter {self.current_state.id}") - def on_enter_first_prediction(self): - print(f"enter {self.current_state.id}") - def on_enter_corrected_prediction(self): - print(f"enter {self.current_state.id}") - def on_enter_loitering(self): - print(f"enter {self.current_state.id}") - def on_enter_play(self): - print(f"enter {self.current_state.id}") - def on_enter_lost(self): - print(f"enter {self.current_state.id}") - self.lost_at = time.time() - def lost_for(self): - if self.current_state is self.lost: - return time.time() - self.lost_at + if self.scene is ScenarioScene.LOST: + return time.time() - self.state_change_at return None def lost_factor(self): @@ -514,566 +134,323 @@ class TrackScenario(StateMachine): if not l: return 0 return l/LOST_FADEOUT - - def to_lines(self) -> List[RenderableLine]: - raise RuntimeError("Not implemented yet") + def anomaly_factor(self): + return calc_anomaly(self.prediction_diffs, 10) + + def deactivate(self): + self.take_over_at = None + + def update(self): + """Animation tick, check state.""" + # 1) lost_score: unlike other states, this runs for each rendering pass to handle crashing tracker + self.check_lost() -class DrawnScenario(TrackScenario): + + def set_scene(self, scene: ScenarioScene): + if self.scene is scene: + return + + logger.info(f"Changing scene for {self.track_id}: {self.scene.name} -> {scene.name}") + self.scene = scene + self.state_change_at = time.time() + + def update_state(self): + self.check_lost() or self.check_loitering() or self.check_track() + + + def check_lost(self): + if self.track and (self.track.lost or self.track.updated_at < time.time() - 5): + self.set_scene(ScenarioScene.LOST) + return True + return False + + def check_loitering(self): + scores = [s for s in calculate_loitering_scores(self.track, LOITERING_DURATION_TO_LINGER, LOITERING_LINGER_FACTOR, LOITERING_VELOCITY_TRESHOLD/TRACK_ASSUMED_FPS, 150)] + if scores[-1] > .99: + self.set_scene(ScenarioScene.LOITERING) + return True + return False + + def check_track(self): + predictions = len(self.prediction_tracks) + if predictions == 1: + self.set_scene(ScenarioScene.FIRST_PREDICTION) + return True + if predictions > 10: + self.set_scene(ScenarioScene.PLAY) + return True + if predictions: + self.set_scene(ScenarioScene.CORRECTED_PREDICTION) + return True + if self.track: + if len(self.track.projected_history) > TRACK_ASSUMED_FPS * 3: + self.set_scene(ScenarioScene.SUBSTANTIAL) + else: + self.set_scene(ScenarioScene.DETECTED) + return True + return False + + + # the tracker track: replace + def recv_track(self, track: ProjectedTrack): + if self.track and self.track.created_at > track.created_at: + # ignore old track + return + + self.track = track + self.update_prediction_diff() + + self.update_state() + + def update_prediction_diff(self): + """ + gather the diffs of the trajectory with the most recent prediction + """ + if len(self.prediction_diffs) == 0: + return + + self.prediction_diffs[-1].update_track(self.track) + + + # receive new predictions: accumulate + def recv_prediction(self, track: ProjectedTrack): + if not self.track: + # in case of the unlikely event that prediction was received sooner + self.recv_track(track) + + + if PREDICTION_INTERVAL is not None and len(self.prediction_tracks) and (track.frame_index - self.prediction_tracks[-1].frame_index) < PREDICTION_INTERVAL: + # just drop tracks if the predictions come to quick + return + + if track._track.predictions is None or not len(track._track.predictions): + # don't count to predictions if no prediction is set of given track (e.g. young tracks, that are still passed by the predictor) + return + + + self.prediction_tracks.append(track) + if len(self.prediction_diffs): + self.prediction_diffs[-1].finish() # existing diffing can end + # and create a new one + self.prediction_diffs.append(DiffSegment(track)) + # self.prediction_diffs.append(DiffSegmentScan(track)) + + self.update_state() + + +class DrawnScenario(Scenario): """ Scenario contains the controls (scene, target positions) DrawnScenario class does the actual drawing of points incl. transitions + This distinction is only for ordering the code """ - ANOMALY_DECAY = .2 # speed with which the cirlce shrinks over time - DISTANCE_ANOMALY_FACTOR = .03 # the ammount to which the difference counts to the anomaly score - MAX_HISTORY = 200 # points of history of trajectory to display (preventing too long lines) + MAX_HISTORY = 300 # points of history of trajectory to display (preventing too long lines) CUT_GAP = 5 # when adding a new prediction, keep the existing prediction until that point + this CUT_GAP margin - def __init__(self): - # self.created_at = time.time() - # self.track_id = track_id + def __init__(self, track_id): + super().__init__(track_id) self.last_update_t = time.perf_counter() + + history_color = SrgbaColor(1.,0.,1.,1.) + self.line_history = StaticLine([], history_color) + self.line_history_smooth = AppendableLineAnimator(self.line_history, draw_decay_speed=25) + self.line_history_capped = CropLine(self.line_history_smooth, self.MAX_HISTORY) + self.line_history_disappearing = FadedTailLine(self.line_history_capped, TRACK_FADE_AFTER_DURATION * TRACK_ASSUMED_FPS, TRACK_END_FADE) + self.line_history_noisy = NoiseLine(self.line_history_disappearing, amplitude=0, t_factor=.3) + self.line_history_faded = FadeOutLine(self.line_history_noisy) + self.line_history_drawn = self.line_history_faded + + + self.active_ptrack: Optional[ProjectedTrack] = None + self.prediction_color = SrgbaColor(0,1,0,1) + self.line_prediction =StaticLine([], self.prediction_color) + self.line_prediction_dashed = DashedLine(self.line_prediction, t_factor=8) + self.line_prediction_dashed.skip = True + self.line_prediction_faded = FadeOutLine(self.line_prediction_dashed) + self.line_prediction_drawn = self.line_prediction_faded - self.drawn_position: Optional[Coordinate] = None - self.drawn_positions: List[Coordinate] = [] - self.drawn_pred_history: List[Coordinate] = [] - self.drawn_predictions: List[List[Coordinate]] = [] - self._current_drawn_prediction: List[Coordinate] = [] - self.drawn_text = "" - self.drawn_text_lines: List[RenderableLine] = [] + def update(self): + super().update() + if self.track: + self.line_history.points = self.track.projected_history - self.anomaly_score = 0 # TODO: variable - self._drawn_anomaly_score = 0 - super().__init__() - - def add_anomaly_length(self, length: float): - """ - append a difference in meters between point - """ - self.anomaly_score += length * self.DISTANCE_ANOMALY_FACTOR - if self.anomaly_score > 1: - self.anomaly_score = 1. - - def decay_anomaly_score(self, dt: DeltaT): - if self.anomaly_score == 0: - return - - self.anomaly_score = exponentialDecay(self.anomaly_score, 0, self.ANOMALY_DECAY, dt) - - def update_drawn_positions(self) -> List: - ''' - use dt to lerp the drawn positions in the direction of current prediction - ''' - # TODO: make lerp, currently quick way to get results - - def int_or_not(v): - """quick wrapper to toggle int'ing""" - return v - # return int(v) - - # 0. calculate dt - # if dt is None: - t = time.perf_counter() - dt: DeltaT = t - self.last_update_t - self.last_update_t = t - - # 0. Update anomaly, slowly decreasing it over time - self.decay_anomaly_score(dt) - - # for diff in self.prediction_diffs: - # diff.update_drawn_positions(dt, self) - - # 1. track history, direct update - - # positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:] - # self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:] - self.drawn_positions = self.track.projected_history - if self.drawn_position is None: - self.drawn_position = self.drawn_positions[-1] - else: - self.drawn_position[0] = exponentialDecay(self.drawn_position[0], self.drawn_positions[-1][0], 13, dt) - self.drawn_position[1] = exponentialDecay(self.drawn_position[1], self.drawn_positions[-1][1], 13, dt) - - # 3. predictions - if len(self.drawn_predictions) < len(self.predictions): - # first prediction - if len(self.drawn_predictions) == 0: - last_pred = self.predictions[-1] - self.drawn_predictions.append(last_pred.predictions[0]) - else: - # if a new prediction has arised, transition from existing one. - # First, cut existing prediction - # CUT_GAP indicates that some is lost in the transition, to prevent glitches when velocity of person changes - end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP - keep = self.drawn_predictions[-1][end_step:] - last_item: Coordinate = (keep)[-1] - self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step] # cut the old part - # print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep)) - # duplicate last item, so the new one has the same nr. of points as the incoming prediction (so it can actually transition) - ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep)) - keep.extend(ext) - self.drawn_predictions.append(keep) - - for a, drawn_prediction in enumerate(self.drawn_predictions): - # origin = self.predictions[a].predictions[0][0] - origin = self.predictions[a].predictions[0][0] - # associated_diff = self.prediction_diffs[a] - # progress = associated_diff.nr_of_passed_points() - for i, pos in enumerate(drawn_prediction): - # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) - decay = max(3, (18/i) if i else 10) # points further away move with more delay - decay = 16 - drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) - pred_r, pred_angle = relativePointToPolar(origin, self.predictions[a].predictions[0][i]) - r = exponentialDecay(drawn_r, pred_r, decay, dt) - - # make circular coordinates transition through the smaller arc - if abs(drawn_angle - pred_angle) > math.pi: - pred_angle -= math.pi * 2 - angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) - x, y = relativePolarToPoint(origin, r, angle) - self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y) - # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt)) - # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) - - # self.drawn_predictions = [] - # for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])): - - # prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track - # if next_ptrack is not None: - # # not the last one, cut off - # next_ptrack: ProjectedTrack = self.predictions[a+1] - # end_step = next_ptrack.frame_index - ptrack.frame_index - - - # else: - # end_step = None # not last item; show all - # self.drawn_predictions.append(ptrack.predictions[0][:end_step]) - - - - # Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s - - - self._drawn_anomaly_score = exponentialDecay(self._drawn_anomaly_score, self.anomaly_score, 3, dt) + if len(self.prediction_tracks): + # TODO: only when animation is ready for it? or collect lines + if not self.active_ptrack: + self.active_ptrack = self.prediction_tracks[-1] + elif self.active_ptrack._track.updated_at < self.prediction_tracks[-1]._track.updated_at: + # logger.debug('pending change?', self.line_prediction_drawn.is_ready()) + # switch only if ready + if self.line_prediction_drawn.is_ready(): + self.active_ptrack = self.prediction_tracks[-1] + self.line_prediction_drawn.start() # reset positions - # print(self.drawn_predictions) - # line = [] - # for i, pos in enumerate(ptrack.predictions): - # line.append((ptrack.predictions[i][0], ptrack.predictions[i][1])) - # print(line) + # self.line_prediction_dashed.set_offset_t(self.active_ptrack._track.track_update_dt() * 4) + self.line_prediction.points = self.active_ptrack._track.predictions[0] - # if len(self.drawn_predictions) <= a: - # self.drawn_predictions.append(line) - # else: - # self.drawn_predictions[a] = line + if self.scene is ScenarioScene.CORRECTED_PREDICTION: + self.line_prediction_dashed.skip = False - # if self.prediction_track and self.prediction_track.predictions: - # prediction_offset = self._track.frame_index - self.prediction_track.frame_index - # if len(self.prediction_track.predictions): - # for a, drawn_prediction in enumerate(self.drawn_predictions): - # for i, pos in enumerate(drawn_prediction): - # # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) - # decay = max(3, (18/i) if i else 10) # points further away move with more delay - # decay = 16 - # origin = self.drawn_positions[-1] - # drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) - # pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i+prediction_offset]) - # r = exponentialDecay(drawn_r, pred_r, decay, dt) - # angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) - # x, y = relativePolarToPoint(origin, r, angle) - # self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y) - # # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt)) - # # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) - - # if len(self.prediction_track.predictions) > len(self.drawn_predictions): - # for pred in self.prediction_track.predictions[len(self.drawn_predictions):]: - # self.drawn_predictions.append(pred[prediction_offset:]) - - - def to_renderable_lines(self) -> RenderableLines: - t = time.time() - track_age = t - self.track.updated_at # Should be beginning - lines = RenderableLines([]) - - - # track_age_in_frames = int(track_age * TRACK_FADE_ASSUME_FPS) - # track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames - - # 1. Trajectory history - # drawable_points, alphas = self.drawn_positions[:self.MAX_HISTORY], [1]*len(self.drawn_positions) - - # perlin/simplex noise - # dt: change speed. Divide to make slower - # amp: amplitude of noise - # frequency: make smaller to make longer waves - if OPTION_TRACK_NOISE: - noisy_points = apply_perlin_noise_to_line_normal(self.drawn_positions, t/5, .3, .02) - else: - noisy_points = self.drawn_positions - - drawable_points, alphas = points_fade_out_alpha_mask(noisy_points, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE) - color = SrgbaColor(1.,0.,1.,1.-self.lost_factor()) - - # TODO: effect configuration - - - points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)] - # points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)] - - lines.append(RenderableLine(points)) - - # 2. Position Marker / anomaly score - - if OPTION_POSITION_MARKER: - anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout - # lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color)) - # last point, (but this draws line in circle, requiring a 'jump back' for the laser) - cx, cy = self.drawn_position[0], self.drawn_position[1], - - radius = max(.1, self._drawn_anomaly_score * 1.) if OPTION_GROW_ANOMALY_CIRCLE else .1 - - steps=0 - if len(self.drawn_positions) >= steps: - dx, dy = self.drawn_positions[-1][0] - self.drawn_positions[-steps][0], self.drawn_positions[-1][1] - self.drawn_positions[-steps][1], - diff = np.array([dx,dy]) - diff = diff/np.linalg.norm(diff) * radius * 1.1 - cx += diff[0] - cy += diff[1] - - lines.append(circle_arc( - cx, cy, - radius, - 0, 1, - anomaly_marker_color) - ) - - # 3. Predictions - if len(self.drawn_predictions): - color = SrgbaColor(0.,1,0.,1.-self.lost_factor()) - # positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions] - for a, drawn_prediction in enumerate(self.drawn_predictions): - if a < (len(self.drawn_predictions) - 1): - # not the newest: fade out: - deprecation_age = t - self.predictions[a+1].updated_at - if deprecation_age > PREDICTION_FADE_IN: - # old: skip drawing. - continue - else: - fade_factor = 1 - (deprecation_age / PREDICTION_FADE_IN) - color = color.as_faded(fade_factor) - - prediction_track_age = time.time() - self.predictions[a].updated_at - t_factor = prediction_track_age / PREDICTION_FADE_IN - - associated_diff = self.prediction_diffs[a] - progress = associated_diff.nr_of_passed_points() - - # drawn_prediction, alphas1 = points_fade_out_alpha_mask(drawn_prediction, prediction_track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE, no_frame_max=True) - - gradient = np.linspace(0, PREDICTION_FADE_SLOPE, len(drawn_prediction)) - alphas2 = np.clip(gradient + t_factor * (-1*PREDICTION_FADE_SLOPE), 0, 1) - # print(alphas) - - - # colors = [color.with_alpha(np.clip(t_factor*3-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))] - # colors = [color.with_alpha(np.clip(t_factor*2-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))] - - # apply both fade in and fade out mask: - colors = [color.as_faded(a2) for a2 in alphas2] - # colors = [color.as_faded(a1*a2) for a1, a2 in zip(alphas1, alphas2)] - - - # points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])] - points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction, colors)] - points = points[progress//2:] - ls = LineString(drawn_prediction) - if t_factor < 1: - ls = substring(ls, 0, t_factor*ls.length, ls.length) - - # print(prediction_track_age) - - # Option 1 : dashes - dashed = dashed_line(ls, .8, 1., prediction_track_age, False) - for line in dashed.geoms: - dash_points = [RenderablePoint(point, color) for point in line.coords] - lines.append(RenderableLine(dash_points)) - - # Option 2 : flash - flash_distance = prediction_track_age * 5 - # flashes = [] - # for i in range(10): - # flashes.append(substring(ls, flash_distance*i, flash_distance + .5)) - - # flash_multiline = shapely.union_all(flashes) - # flashes = flash_multiline.geoms if isinstance(flash_multiline, MultiLineString) else [flash_multiline] - # print(flashes) - # for flash_ls in flashes: - # flash_points = [RenderablePoint(point, color) for point in flash_ls.coords] - # if len(flash_points) > 1: - # lines.append(RenderableLine(flash_points)) - - - # flash_points = [RenderablePoint(point, color) for point in flash_ls.coords] - # if len(flash_points) > 1: - # lines.append(RenderableLine(flash_points)) - - - - # lines.append(RenderableLine(points)) - - # 4. Diffs - # for drawn_diff in self.drawn_diffs: - # color = SrgbaColor(0.,1,1.,1.-self.lost_factor()) - # colors = [color.as_faded(1) for a2 in range(len(drawn_diff))] - # points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)] - # lines.append(RenderableLine(points)) - - # if OPTION_RENDER_DIFF_SEGMENT: - # for diff in self.prediction_diffs: - # lines.append_lines(diff.as_renderable()) - # pass - - - # # print(self.current_state) - # if self.current_state is self.first_prediction or self.current_state is self.corrected_prediction: - # shape = np.array(shapes.YOUR if time.time() % 2 > 1 else shapes.FUTURE) - # text = "your" if time.time() % 2 > 1 else "future" - # color = SrgbaColor(0.5,0.5,0.5,1.-self.lost_factor()) - - # line = self.get_text_lines(text, shape, color) - # if not line: - # POSITION_INDEX = 50 - - # draw_pos = self.drawn_predictions[0][POSITION_INDEX-1] - # current_pos = self.drawn_positions[-1] - # angle = np.arctan2(draw_pos[0]-current_pos[0], draw_pos[1]-current_pos[1]) + np.pi - # # for i, line in enumerate(shape): - # # if i != 0: - # # continue - # points = np.array(shape[:,:2]) - - # avg_x = np.average(points[:,0]) - # avg_y = np.average(points[:,1]) - - # minx, maxx = np.min(points[:,0]), np.max(points[:,0]) - # miny, maxy = np.min(points[:,1]), np.max(points[:,1]) - - # sx = maxx-minx - # sy = maxy-miny - - # points[:,0] -= avg_x - # points[:,1] -= avg_y - # points /= (sx * 1.5) # scale to 1 - - # points @= rotateMatrix(angle) - - # points += draw_pos - - # points = [RenderablePoint.from_list(pos, color.with_alpha(intensity)) for pos, intensity in zip(points, shape[:,2])] - # self.drawn_text = text - # self.drawn_text_lines = [RenderableLine(points)] - # lines.extend(self.drawn_text_lines) - - - return lines - def get_text_lines(self, text, shape, color): - if self.drawn_text == text: - return self.drawn_text_lines - return None + + def to_renderable_lines(self, dt: DeltaT) -> RenderableLines: + # each scene is handled differently: + + + # 1) history, fade out when lost + self.line_history_faded.set_alpha(1-self.lost_factor()) + self.line_prediction_faded.set_alpha(1-self.lost_factor()) + self.line_history_noisy.amplitude = self.lost_factor() -# def circle_points(cx, cy, r, c: Color): PointList -# # r = r -# steps = 30 -# pointlist: list[LaserPoint] = [] -# for i in range(steps): -# x = int(cx + math.cos(i * (2*math.pi)/steps) * r) -# y = int(cy + math.sin(i * (2*math.pi)/steps)* r) -# pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0))) - -# return pointlist + # fade out history after max duration, given in frames + track_age_in_frames = self.track_age() * TRACK_ASSUMED_FPS + self.line_history_disappearing.set_frame_offset(track_age_in_frames) -def points_fade_out_alpha_mask(positions: List, track_age_seconds: float, fade_after_duration: float, fade_frames: int, no_frame_max=False): - track_age_in_frames = int(track_age_seconds * TRACK_FADE_ASSUME_FPS) - if not no_frame_max: - track_max_points = int(fade_after_duration * TRACK_FADE_ASSUME_FPS) - track_age_in_frames - else: - if track_age_seconds < fade_after_duration: - track_max_points = len(positions) #+ fade_frames - else: - FADE_DURATION = 2 - t_fade = max(0, track_age_seconds - fade_after_duration) / FADE_DURATION - track_max_points = int(t_fade * len(positions) / FADE_DURATION) + + # 2) also fade-out when moving into loitering mode. + # when fading out is done, start drawing historical data + history_line = self.line_history_drawn.as_renderable_line(dt) + prediction_line = self.line_prediction_drawn.as_renderable_line(dt) - - drawable_points = positions[-track_max_points:] - alphas = [] - for i, point in enumerate(drawable_points): - reverse_i = len(drawable_points) - i - fade_i = reverse_i - (track_max_points - fade_frames) # -90 - fade_i /= fade_frames # / 10 - fade_i = np.clip(fade_i, 0, 1) - alpha = 1 - fade_i - if alpha > 0: - alphas.append(alpha) + # print(history_line) + # print(self.track_id, len(self.line_history.points), len(history_line)) + + return RenderableLines([ + history_line, + prediction_line + ]) - return drawable_points, alphas - - # drawn_pred_history - # drawn_predictions - - -# @dataclass -# class RenderablePosition(): -# x: float -# y: float - -# @classmethod -# def from_list(cls, l: List[float, float]) -> RenderablePosition: -# return cls(x = float(l[0]), y=float(l[1])) - -# TODO)) Or Shapely point? class Stage(Node): - """ - Render a stage, on which different TrackScenarios take place to a - single image of lines. Which can be passed to different renderers - E.g. the laser or image renderers. - """ FPS = 60 def setup(self): - # self.scenarios: List[DrawnScenario] = [] - self.scenarios: Dict[str, DrawnScenario] = defaultdict(lambda: DrawnScenario()) + self.active_scenarios: List[DrawnScenario] = [] # List of currently running Scenario instances + + + self.scenarios: Dict[str, DrawnScenario] = DefaultDictKeyed(lambda key: DrawnScenario(key)) self.frame_noimg_sock = self.sub(self.config.zmq_frame_noimg_addr) self.trajectory_sock = self.sub(self.config.zmq_trajectory_addr) self.prediction_sock = self.sub(self.config.zmq_prediction_addr) self.stage_sock = self.pub(self.config.zmq_stage_addr) - self.counter = CounterSender() - self.frame: Optional[Frame] = None - + if self.config.debug_map: debug_color = SrgbaColor(0.,0.,1.,1.) self.debug_lines = RenderableLines(load_lines_from_svg(self.config.debug_map, 100, debug_color)) - + + def run(self): - prev_time = time.perf_counter() - while self.is_running.is_set(): - - self.tick() - - # 1) poll & update + while self.run_loop_capped_fps(self.FPS): + dt = max(1/ self.FPS, self.dt_since_last_tick) # never dt of 0 self.loop_receive() + self.loop_update_scenarios() + self.loop_render(dt) - # 2) render - self.loop_render() - - # 3) calculate latency for desired FPS - now = time.perf_counter() - time_diff = (now - prev_time) - if time_diff < 1/self.FPS: - # print(f"sleep {1/self.FPS - time_diff}") - time.sleep(1/self.FPS - time_diff) - now += 1/self.FPS - time_diff - - prev_time = now def loop_receive(self): - # 1) receive frames - try: - camera_frame: Frame = self.frame_noimg_sock.recv_pyobj(zmq.NOBLOCK) - self.frame = camera_frame - except zmq.ZMQError as e: - pass - # 1) receive predictions try: prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK) for track_id, track in prediction_frame.tracks.items(): proj_track = ProjectedTrack(track, prediction_frame.camera) - self.scenarios[track_id].add_prediction(proj_track) + self.scenarios[track_id].recv_prediction(proj_track) except zmq.ZMQError as e: - self.logger.debug(f'reuse prediction') + # no msgs + pass # 2) receive tracker tracks try: trajectory_frame: Frame = self.trajectory_sock.recv_pyobj(zmq.NOBLOCK) for track_id, track in trajectory_frame.tracks.items(): proj_track = ProjectedTrack(track, trajectory_frame.camera) - # if not self.scenarios[track_id].camera: - # self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera! - self.scenarios[track_id].set_track(proj_track) + self.scenarios[track_id].recv_track(proj_track) except zmq.ZMQError as e: - self.logger.debug(f'reuse tracks') + pass + # self.logger.debug(f'reuse tracks') - # 3) Remove stale tracks + + def loop_update_scenarios(self): + """Update active scenarios and handle pauses/completions.""" + # 1) process timestep for all scenarios + for s in self.scenarios.values(): + s.update() + + + # 2) Remove stale tracks and take-overs for track_id, scenario in list(self.scenarios.items()): - # check when last tracker update was received - scenario.check_lost() - - if scenario.lost_factor() > 1: - self.logger.info(f"rm track {track_id}") + if scenario.lost_factor() >= 1: + if scenario in self.active_scenarios: + self.active_scenarios = list(filter(scenario.__ne__, self.active_scenarios)) + + self.logger.info(f"rm lost track {track_id}") del self.scenarios[track_id] + if scenario.takeover_factor() >= 1: + if scenario in self.active_scenarios: + self.active_scenarios = list(filter(scenario.__ne__, self.active_scenarios)) + scenario.taken_over() + - def loop_render(self): + # 3) determine set of pending scenarios (all except running) + pending_scenarios = [s for s in self.scenarios.values() if s not in self.active_scenarios] + # ... highest priority first + pending_scenarios.sort(key=lambda s: s.scene.value.priority, reverse=True) + + # 4) check if there's a slot free: + while len(self.active_scenarios) < self.config.max_active_scenarios and len(pending_scenarios): + scenario = pending_scenarios.pop(0) + self.active_scenarios.append(scenario) + scenario.start() + + # 5) Takeover Logic: If no space, try to replace a lower-priority active scenario + # which is in a scene in which takeover is possible + eligible_active_scenarios = [ + s for s in self.active_scenarios if s.scene.value.takeover_possible + ] + eligible_active_scenarios.sort(key=lambda s: s.scene.value.priority) + + if eligible_active_scenarios and pending_scenarios: + lowest_priority_active = eligible_active_scenarios[0] + highest_priority_waiting = pending_scenarios[0] + + if highest_priority_waiting.scene.value.priority > lowest_priority_active.scene.value.priority: + # Takeover! Stop the active scenario + # will be cleaned up in update() loop after animation finishes + # automatically triggering the start of the highest priority scene + lowest_priority_active.take_over() + + + def loop_render(self, dt: DeltaT): + """Draw all active scenarios onto the canvas.""" lines = RenderableLines([]) - - # 0. DEBUG lines: - + for scenario in self.active_scenarios: + lines.append_lines(scenario.to_renderable_lines(dt)) - - # 1. Draw each scenario: - for track_id, scenario in self.scenarios.items(): - scenario.update_drawn_positions() - - lines.append_lines(scenario.to_renderable_lines()) - - # print(lines) - # rl = RenderableLines(lines) - # with open('/tmp/lines.pcl', 'wb') as fp: - # pickle.dump(rl, fp) - # rl = lines rl = lines.as_simplified(SimplifyMethod.RDP, .003) # or segmentise (see shapely) self.counter.set("stage.lines", len(lines.lines)) self.counter.set("stage.points_orig", lines.point_count()) self.counter.set("stage.points", rl.point_count()) - - # debug_lines = RenderableLines([]) - # if self.frame and hasattr(self.frame.camera, 'debug_lines'): - # for points in self.frame.camera.debug_lines: - # line_points = [] - # # interpolate, so the laser can correct the lines - # for i in range(20): - # t = i / 19 - # x = lerp(points[0][0], points[1][0], t) - # y = lerp(points[0][1], points[1][1], t) - # line_points.append(RenderablePoint((x, y), debug_color)) - - # debug_lines.append(RenderableLine(line_points)) - layers: RenderableLayers = { - 1: rl, + 1: lines, 2: self.debug_lines, } - # print(rl.__dict__) self.stage_sock.send_json(obj=layers, cls=DataclassJSONEncoder) - # print(json.dumps(rl, cls=DataclassJSONEncoder)) + @classmethod def arg_parser(cls) -> ArgumentParser: @@ -1098,129 +475,10 @@ class Stage(Node): help='specify a map (svg-file) from which to load lines which will be overlayed', type=str, default="../DATASETS/hof3/map_hof.svg") + argparser.add_argument('--max-active-scenarios', + help='Maximum number of active scenarios that can be drawn at once (to not overlod the laser)', + type=int, + default=2) return argparser - - -# TODO place somewhere else: -# Gemma3:27b prompt: "python. Given a list of coordinates, that describes a line: `drawable_points: List[Tuple[float,float]]` apply perlin noise over the normal of the line, that changes over time `dt`." -def apply_perlin_noise_to_line_normal(drawable_points: np.ndarray, dt: float, amplitude: float = 1.0, frequency: float = 1.0, fade_over_n_points = 8) -> np.ndarray: - """ - Applies Perlin noise to the normals of a line described by a list of coordinates, changing over time. - - Args: - drawable_points: A list of (x, y) tuples representing the points of the line. - dt: The time delta, used to animate the Perlin noise. - amplitude: The strength of the Perlin noise effect. - frequency: The frequency of the Perlin noise (how many waves per unit). - - Returns: - A new list of (x, y) tuples representing the line with Perlin noise applied to the normals. If drawable_points - has fewer than 2 points, it returns the original list unchanged. - - Raises: - TypeError: If drawable_points is not a list or dt is not a float. - ValueError: If the input points are not tuples of length 2. - """ - - # if not isinstance(drawable_points, list): - # print(drawable_points, type(drawable_points)) - # raise TypeError("drawable_points must be a list.") - if not isinstance(dt, float): - raise TypeError("dt must be a float.") - - if len(drawable_points) < 2: - return drawable_points # Nothing to do with fewer than 2 points - - # for point in drawable_points: - # if not isinstance(point, tuple) or len(point) != 2: - # raise ValueError("Each point in drawable_points must be a tuple of length 2.") - - - # noise = PerlinNoise(octaves=4) # You can adjust octaves for different noise patterns - - new_points = [] - for i in range(len(drawable_points)): - x, y = drawable_points[i] - - # Calculate the normal vector. We'll approximate it using the previous and next points. - if i == 0: - # For the first point, use the next point to estimate the normal - next_x, next_y = drawable_points[i + 1] - normal_x = next_y - y - normal_y = -(next_x - x) - elif i == len(drawable_points) - 1: - # For the last point, use the previous point - prev_x, prev_y = drawable_points[i - 1] - normal_x = y - prev_y - normal_y = -(x - prev_x) - else: - prev_x, prev_y = drawable_points[i - 1] - next_x, next_y = drawable_points[i + 1] - normal_x = next_y - prev_y - normal_y = -(next_x - prev_x) - - # Normalize the normal vector - norm = np.sqrt(normal_x**2 + normal_y**2) - if norm > 0: - normal_x /= norm - normal_y /= norm - - # Apply Perlin noise to the normal - # noise_x = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_x - # noise_y = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_y - noise = snoise2(i * frequency, dt % 1000, octaves=4) - - use_amp = amplitude - if fade_over_n_points > 0: - rev_step = len(drawable_points) - i - amp_factor = rev_step / fade_over_n_points - if amp_factor < 1: - use_amp *= amp_factor - - noise_x = noise * use_amp * normal_x - noise_y = noise * use_amp * normal_y - - # print(noise_x, noise_y, dt, frequency, i, dt, snoise2(i * frequency, dt % 1000, octaves=4)) - - - # Add the noise to the point's coordinates - new_x = x + noise_x - new_y = y + noise_y - - new_points.append((new_x, new_y)) - - # print(drawable_points, new_points) - - return np.array(new_points) - - -import math - -def distance(p1, p2): - return math.hypot(p2[0] - p1[0], p2[1] - p1[1]) - - -def dashed_line(line: LineString, dash_len: float, gap_len: float, offset: float = 0, loop_offset = True) -> MultiLineString: - total_length = line.length - - segments = [] - - if loop_offset: - # by default, prepend skipped gap - pos = offset % (dash_len + gap_len) - - if pos > gap_len: - segments.append(substring(line, 0, pos - gap_len)) - else: - pos = offset - - while pos < total_length: - end = min(pos + dash_len, total_length) - if pos < end: - dash = substring(line, pos, end) - segments.append(dash) - pos += dash_len + gap_len - - return MultiLineString(segments) \ No newline at end of file diff --git a/trap/stage_old.py b/trap/stage_old.py new file mode 100644 index 0000000..ac64c7c --- /dev/null +++ b/trap/stage_old.py @@ -0,0 +1,1009 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from argparse import ArgumentParser +from collections import defaultdict +from dataclasses import dataclass +from enum import Enum +import json +import logging +import math +import pickle +import time +from typing import Dict, List, Optional, Tuple +from matplotlib.pyplot import isinteractive +import numpy as np +from shapely import LineString, MultiLineString, line_locate_point, linestrings + +import shapely +from shapely.ops import substring +from statemachine import Event, State, StateMachine +from statemachine.exceptions import TransitionNotAllowed +import zmq + + +from sgan.sgan import data +from trap import shapes +from trap.anomaly import DiffSegment +from trap.base import Camera, DataclassJSONEncoder, DistortedCamera, Frame, ProjectedTrack, Track +from trap.counter import CounterSender +from trap.laser_renderer import circle_points, rotateMatrix +from trap.lines import AppendableLine, Coordinate, DeltaT, ProceduralChain, RenderableLayers, RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SimplifyMethod, SrgbaColor, circle_arc, load_lines_from_svg +from trap.node import Node +from trap.timer import Timer +from trap.utils import exponentialDecay, exponentialDecayRounded, lerp, relativePointToPolar, relativePolarToPoint + +from noise import snoise2 + +logger = logging.getLogger('trap.stage') + + +OPTION_RENDER_DEBUG = False +OPTION_POSITION_MARKER = False +OPTION_GROW_ANOMALY_CIRCLE = False +# OPTION_RENDER_DIFF_SEGMENT = True +OPTION_TRACK_NOISE = False + + +# class DiffSegmentScan(DiffSegment): +# """ +# Provide alternative diffing, in the form of a sort of scan line +# Should be faster with the laser +# TODO: This is work in progress, does not work yet! +# """ + +# def __init__(self, prediction: ProjectedTrack): +# self.ptrack = prediction +# self._target_track = prediction +# self.finished = False +# self._last_diff_frame_idx = 0 + +# def finish(self): +# self.finished = True + +# def prediction_offset(self): +# """Difference is starting moment between track and prediction""" +# return self.ptrack.frame_index - self._target_track.frame_index + +# def nr_of_passed_points(self): +# """Number of points of the given ptrack that have passed""" +# return len(self._target_track.projected_history) - 1 - self.prediction_offset() +# # len(self.points) * self.POINT_INTERVAL + +# # run on each track update received +# def update_track(self, track: ProjectedTrack): +# self._target_track = track + +# if self.finished: +# # don't add new points if finished +# return + +# start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx) +# traj_diff_steps_back = track.frame_index - start_frame_idx # positive value +# pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value +# self._last_diff_frame_idx = track.frame_index + +# # run each render tick +# def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario): +# # if not self.finished or not self.line.ready: +# # self.line.update_drawn_positions(dt) +# pass # TODO: use easing + + + +# def as_renderable(self) -> RenderableLines: +# if self.finished: +# return RenderableLines([]) +# color = SrgbaColor(0,0,1,1) +# # steps_diff = self.nr_of_passed_points() +# idx = self.nr_of_passed_points() +# if len(self.ptrack.predictions[0]) < idx+1: +# self.finish() +# return RenderableLines([]) +# points = [self._target_track.projected_history[-1], self.ptrack.predictions[0][idx]] + +# points = [RenderablePoint(pos, color) for pos in points] +# line = RenderableLine(points) + +# return RenderableLines([line]) + + +class ScenarioScene(Enum): + DETECTED = 1 + FIRST_PREDICTION = 2 + CORRECTED_PREDICTION = 3 + LOITERING = 4 + PLAY = 4 + LOST = -1 + +LOST_FADEOUT = 3 +PREDICTION_INTERVAL: float|None = 20 # frames +PREDICTION_FADE_IN: float = 3 +PREDICTION_FADE_SLOPE: float = -10 +PREDICTION_FADE_AFTER_DURATION: float = 8 # seconds +PREDICTION_END_FADE = 2 #frames +# TRACK_MAX_POINTS = 100 +TRACK_FADE_AFTER_DURATION = 15. # seconds +TRACK_END_FADE = 30 # points +TRACK_FADE_ASSUME_FPS = 12 + +# Don't render the first n points of the prediction, +# helps to avoid jitter in the line transition +# PREDICTION_OFFSET = int(TRACK_FADE_ASSUME_FPS * PREDICTION_INTERVAL * .8) + +class TrackScenario(StateMachine): + detected = State(initial=True) + substantial = State() + first_prediction = State() + corrected_prediction = State() + loitering = State() + play = State() + lost = State(final=True) + + receive_track = lost.from_( + detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost" + ) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long") + + mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play) + + receive_prediction = detected.to(first_prediction) | substantial.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing") + + + def __init__(self): + self.track: ProjectedTrack = None + self.camera: Optional[Camera] = None + # self.first_prediction_track: Optional[Track] = None + # self.prediction_track: Optional[Track] = None + self.predictions: List[Track] = [] + + self._last_diff_frame_idx: Optional[int] = 0 + + self.diffs: List[Tuple[Coordinate, Coordinate]] = [] + self.prediction_diffs: List[DiffSegment] = [] + super().__init__() + + def track_is_long(self, track: ProjectedTrack): + return len(track.history) > 20 + + def track_is_lost(self, track: ProjectedTrack): + # return self._track and self._track.created_at < time.time() - 5 + return track.lost # Note, for now this is not implemented in the tacker, see check_lost() + + def track_is_loitering(self, track: ProjectedTrack): + # TODO)) Change to measure displacement over the last n seconds + return len(track.history) > (track.fps * 60) # seconds after which someone is loitering + + def prediction_is_stale(self, track: ProjectedTrack): + # TODO use displacement instead of time + return bool(len(self.predictions) and self.predictions[-1].created_at < (time.time() - 2)) + + def prediction_is_playing(self, track): + return False + + def check_lost(self): + if self.current_state is not self.lost and self.track and self.track.updated_at < time.time() - 5: + self.mark_lost() + + def set_track(self, track: ProjectedTrack): + if self.track and self.track.created_at > track.created_at: + # ignore old track + return + + self.track = track + self.update_prediction_diff() + + # check to change state + try: + self.receive_track(track) + except TransitionNotAllowed as e: + # state change is optional + pass + + + def update_prediction_diff(self): + """ + gather the diffs of the trajectory with the most recent prediction + """ + if len(self.prediction_diffs) == 0: + return + + for diff in self.prediction_diffs: + diff.update_track(self.track) + + + + def add_prediction(self, track: ProjectedTrack): + if not self.track: + # in case of the unlikely event that prediction was passed sooner + self.set_track(track) + + # if not self.first_prediction_track: + # self.first_prediction_track = track + + if PREDICTION_INTERVAL is not None and len(self.predictions) and (track.frame_index - self.predictions[-1].frame_index) < PREDICTION_INTERVAL: + # just drop tracks if the predictions come to quick + return + + if track._track.predictions is None or not len(track._track.predictions): + # don't count to predictions if no prediction is set of given track (e.g. young tracks) + return + + + self.predictions.append(track) + if len(self.prediction_diffs): + self.prediction_diffs[-1].finish() # existing diffing can end + # and create a new one + self.prediction_diffs.append(DiffSegment(track)) + # self.prediction_diffs.append(DiffSegmentScan(track)) + + # check to change state + try: + self.receive_prediction(track) + except TransitionNotAllowed as e: + # state change is optional + pass + + def after_receive_track(self, track: ProjectedTrack): + print('changed state') + + def on_receive_track(self, track: ProjectedTrack): + # on event, only runs once, upon first track + print('updating track!') + + def on_receive_prediction(self, track: ProjectedTrack): + # on event, because it happens for every receive, despite transition + print('updating prediction!') + # self.track = track + + def after_receive_prediction(self, track: ProjectedTrack): + # after + pass + # self.prediction_track = track + # if not self.first_prediction_track: + # self.first_prediction_track = track + + def on_enter_corrected_prediction(self): + print('corrected!') + + def on_enter_detected(self): + print("DETECTED!") + + def on_enter_first_prediction(self): + print("Hello!") + + def on_enter_detected(self): + print(f"enter {self.current_state.id}") + def on_enter_substantial(self): + print(f"enter {self.current_state.id}") + def on_enter_first_prediction(self): + print(f"enter {self.current_state.id}") + def on_enter_corrected_prediction(self): + print(f"enter {self.current_state.id}") + def on_enter_loitering(self): + print(f"enter {self.current_state.id}") + def on_enter_play(self): + print(f"enter {self.current_state.id}") + def on_enter_lost(self): + print(f"enter {self.current_state.id}") + self.lost_at = time.time() + + def lost_for(self): + if self.current_state is self.lost: + return time.time() - self.lost_at + return None + + def lost_factor(self): + l = self.lost_for() + if not l: + return 0 + return l/LOST_FADEOUT + + def to_lines(self) -> List[RenderableLine]: + raise RuntimeError("Not implemented yet") + + +class DrawnScenario(TrackScenario): + """ + Scenario contains the controls (scene, target positions) + DrawnScenario class does the actual drawing of points incl. transitions + """ + + ANOMALY_DECAY = .2 # speed with which the cirlce shrinks over time + DISTANCE_ANOMALY_FACTOR = .03 # the ammount to which the difference counts to the anomaly score + MAX_HISTORY = 200 # points of history of trajectory to display (preventing too long lines) + CUT_GAP = 5 # when adding a new prediction, keep the existing prediction until that point + this CUT_GAP margin + + def __init__(self): + # self.created_at = time.time() + # self.track_id = track_id + self.last_update_t = time.perf_counter() + + self.drawn_position: Optional[Coordinate] = None + self.drawn_positions: List[Coordinate] = [] + self.drawn_pred_history: List[Coordinate] = [] + self.drawn_predictions: List[List[Coordinate]] = [] + self._current_drawn_prediction: List[Coordinate] = [] + + self.drawn_text = "" + self.drawn_text_lines: List[RenderableLine] = [] + + self.anomaly_score = 0 # TODO: variable + self._drawn_anomaly_score = 0 + super().__init__() + + def add_anomaly_length(self, length: float): + """ + append a difference in meters between point + """ + self.anomaly_score += length * self.DISTANCE_ANOMALY_FACTOR + if self.anomaly_score > 1: + self.anomaly_score = 1. + + def decay_anomaly_score(self, dt: DeltaT): + if self.anomaly_score == 0: + return + + self.anomaly_score = exponentialDecay(self.anomaly_score, 0, self.ANOMALY_DECAY, dt) + + def update_drawn_positions(self) -> List: + ''' + use dt to lerp the drawn positions in the direction of current prediction + ''' + # TODO: make lerp, currently quick way to get results + + def int_or_not(v): + """quick wrapper to toggle int'ing""" + return v + # return int(v) + + # 0. calculate dt + # if dt is None: + t = time.perf_counter() + dt: DeltaT = t - self.last_update_t + self.last_update_t = t + + # 0. Update anomaly, slowly decreasing it over time + self.decay_anomaly_score(dt) + + # for diff in self.prediction_diffs: + # diff.update_drawn_positions(dt, self) + + # 1. track history, direct update + + # positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:] + # self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:] + self.drawn_positions = self.track.projected_history + if self.drawn_position is None: + self.drawn_position = self.drawn_positions[-1] + else: + self.drawn_position[0] = exponentialDecay(self.drawn_position[0], self.drawn_positions[-1][0], 13, dt) + self.drawn_position[1] = exponentialDecay(self.drawn_position[1], self.drawn_positions[-1][1], 13, dt) + + # 3. predictions + if len(self.drawn_predictions) < len(self.predictions): + # first prediction + if len(self.drawn_predictions) == 0: + last_pred = self.predictions[-1] + self.drawn_predictions.append(last_pred.predictions[0]) + else: + # if a new prediction has arised, transition from existing one. + # First, cut existing prediction + # CUT_GAP indicates that some is lost in the transition, to prevent glitches when velocity of person changes + end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP + keep = self.drawn_predictions[-1][end_step:] + last_item: Coordinate = (keep)[-1] + self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step] # cut the old part + # print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep)) + # duplicate last item, so the new one has the same nr. of points as the incoming prediction (so it can actually transition) + ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep)) + keep.extend(ext) + self.drawn_predictions.append(keep) + + for a, drawn_prediction in enumerate(self.drawn_predictions): + # origin = self.predictions[a].predictions[0][0] + origin = self.predictions[a].predictions[0][0] + # associated_diff = self.prediction_diffs[a] + # progress = associated_diff.nr_of_passed_points() + for i, pos in enumerate(drawn_prediction): + # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) + decay = max(3, (18/i) if i else 10) # points further away move with more delay + decay = 16 + drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) + pred_r, pred_angle = relativePointToPolar(origin, self.predictions[a].predictions[0][i]) + r = exponentialDecay(drawn_r, pred_r, decay, dt) + + # make circular coordinates transition through the smaller arc + if abs(drawn_angle - pred_angle) > math.pi: + pred_angle -= math.pi * 2 + angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) + x, y = relativePolarToPoint(origin, r, angle) + self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y) + # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt)) + # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) + + # self.drawn_predictions = [] + # for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])): + + # prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track + # if next_ptrack is not None: + # # not the last one, cut off + # next_ptrack: ProjectedTrack = self.predictions[a+1] + # end_step = next_ptrack.frame_index - ptrack.frame_index + + + # else: + # end_step = None # not last item; show all + # self.drawn_predictions.append(ptrack.predictions[0][:end_step]) + + + + # Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s + + + self._drawn_anomaly_score = exponentialDecay(self._drawn_anomaly_score, self.anomaly_score, 3, dt) + + + # print(self.drawn_predictions) + # line = [] + # for i, pos in enumerate(ptrack.predictions): + # line.append((ptrack.predictions[i][0], ptrack.predictions[i][1])) + # print(line) + + # if len(self.drawn_predictions) <= a: + # self.drawn_predictions.append(line) + # else: + # self.drawn_predictions[a] = line + + + # if self.prediction_track and self.prediction_track.predictions: + # prediction_offset = self._track.frame_index - self.prediction_track.frame_index + # if len(self.prediction_track.predictions): + # for a, drawn_prediction in enumerate(self.drawn_predictions): + # for i, pos in enumerate(drawn_prediction): + # # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) + # decay = max(3, (18/i) if i else 10) # points further away move with more delay + # decay = 16 + # origin = self.drawn_positions[-1] + # drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) + # pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i+prediction_offset]) + # r = exponentialDecay(drawn_r, pred_r, decay, dt) + # angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) + # x, y = relativePolarToPoint(origin, r, angle) + # self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y) + # # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt)) + # # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) + + # if len(self.prediction_track.predictions) > len(self.drawn_predictions): + # for pred in self.prediction_track.predictions[len(self.drawn_predictions):]: + # self.drawn_predictions.append(pred[prediction_offset:]) + + + def to_renderable_lines(self) -> RenderableLines: + t = time.time() + track_age = t - self.track.updated_at # Should be beginning + lines = RenderableLines([]) + + + # track_age_in_frames = int(track_age * TRACK_FADE_ASSUME_FPS) + # track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames + + # 1. Trajectory history + # drawable_points, alphas = self.drawn_positions[:self.MAX_HISTORY], [1]*len(self.drawn_positions) + + # perlin/simplex noise + # dt: change speed. Divide to make slower + # amp: amplitude of noise + # frequency: make smaller to make longer waves + if OPTION_TRACK_NOISE: + noisy_points = apply_perlin_noise_to_line_normal(self.drawn_positions, t/5, .3, .02) + else: + noisy_points = self.drawn_positions + + drawable_points, alphas = points_fade_out_alpha_mask(noisy_points, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE) + color = SrgbaColor(1.,0.,1.,1.-self.lost_factor()) + + # TODO: effect configuration + + + points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)] + # points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)] + + lines.append(RenderableLine(points)) + + # 2. Position Marker / anomaly score + + if OPTION_POSITION_MARKER: + anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout + # lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color)) + # last point, (but this draws line in circle, requiring a 'jump back' for the laser) + cx, cy = self.drawn_position[0], self.drawn_position[1], + + radius = max(.1, self._drawn_anomaly_score * 1.) if OPTION_GROW_ANOMALY_CIRCLE else .1 + + steps=0 + if len(self.drawn_positions) >= steps: + dx, dy = self.drawn_positions[-1][0] - self.drawn_positions[-steps][0], self.drawn_positions[-1][1] - self.drawn_positions[-steps][1], + diff = np.array([dx,dy]) + diff = diff/np.linalg.norm(diff) * radius * 1.1 + cx += diff[0] + cy += diff[1] + + lines.append(circle_arc( + cx, cy, + radius, + 0, 1, + anomaly_marker_color) + ) + + # 3. Predictions + if len(self.drawn_predictions): + color = SrgbaColor(0.,1,0.,1.-self.lost_factor()) + # positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions] + for a, drawn_prediction in enumerate(self.drawn_predictions): + if a < (len(self.drawn_predictions) - 1): + # not the newest: fade out: + deprecation_age = t - self.predictions[a+1].updated_at + if deprecation_age > PREDICTION_FADE_IN: + # old: skip drawing. + continue + else: + fade_factor = 1 - (deprecation_age / PREDICTION_FADE_IN) + color = color.as_faded(fade_factor) + + prediction_track_age = time.time() - self.predictions[a].updated_at + t_factor = prediction_track_age / PREDICTION_FADE_IN + + associated_diff = self.prediction_diffs[a] + progress = associated_diff.nr_of_passed_points() + + # drawn_prediction, alphas1 = points_fade_out_alpha_mask(drawn_prediction, prediction_track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE, no_frame_max=True) + + gradient = np.linspace(0, PREDICTION_FADE_SLOPE, len(drawn_prediction)) + alphas2 = np.clip(gradient + t_factor * (-1*PREDICTION_FADE_SLOPE), 0, 1) + # print(alphas) + + + # colors = [color.with_alpha(np.clip(t_factor*3-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))] + # colors = [color.with_alpha(np.clip(t_factor*2-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))] + + # apply both fade in and fade out mask: + colors = [color.as_faded(a2) for a2 in alphas2] + # colors = [color.as_faded(a1*a2) for a1, a2 in zip(alphas1, alphas2)] + + + # points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])] + points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction, colors)] + points = points[progress//2:] + ls = LineString(drawn_prediction) + if t_factor < 1: + ls = substring(ls, 0, t_factor*ls.length, ls.length) + + # print(prediction_track_age) + + # Option 1 : dashes + dashed = dashed_line(ls, .8, 1., prediction_track_age, False) + for line in dashed.geoms: + dash_points = [RenderablePoint(point, color) for point in line.coords] + lines.append(RenderableLine(dash_points)) + + # Option 2 : flash + flash_distance = prediction_track_age * 5 + # flashes = [] + # for i in range(10): + # flashes.append(substring(ls, flash_distance*i, flash_distance + .5)) + + # flash_multiline = shapely.union_all(flashes) + # flashes = flash_multiline.geoms if isinstance(flash_multiline, MultiLineString) else [flash_multiline] + # print(flashes) + # for flash_ls in flashes: + # flash_points = [RenderablePoint(point, color) for point in flash_ls.coords] + # if len(flash_points) > 1: + # lines.append(RenderableLine(flash_points)) + + + # flash_points = [RenderablePoint(point, color) for point in flash_ls.coords] + # if len(flash_points) > 1: + # lines.append(RenderableLine(flash_points)) + + + + # lines.append(RenderableLine(points)) + + # 4. Diffs + # for drawn_diff in self.drawn_diffs: + # color = SrgbaColor(0.,1,1.,1.-self.lost_factor()) + # colors = [color.as_faded(1) for a2 in range(len(drawn_diff))] + # points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)] + # lines.append(RenderableLine(points)) + + # if OPTION_RENDER_DIFF_SEGMENT: + # for diff in self.prediction_diffs: + # lines.append_lines(diff.as_renderable()) + # pass + + + # # print(self.current_state) + # if self.current_state is self.first_prediction or self.current_state is self.corrected_prediction: + # shape = np.array(shapes.YOUR if time.time() % 2 > 1 else shapes.FUTURE) + # text = "your" if time.time() % 2 > 1 else "future" + # color = SrgbaColor(0.5,0.5,0.5,1.-self.lost_factor()) + + # line = self.get_text_lines(text, shape, color) + # if not line: + # POSITION_INDEX = 50 + + # draw_pos = self.drawn_predictions[0][POSITION_INDEX-1] + # current_pos = self.drawn_positions[-1] + # angle = np.arctan2(draw_pos[0]-current_pos[0], draw_pos[1]-current_pos[1]) + np.pi + # # for i, line in enumerate(shape): + # # if i != 0: + # # continue + # points = np.array(shape[:,:2]) + + # avg_x = np.average(points[:,0]) + # avg_y = np.average(points[:,1]) + + # minx, maxx = np.min(points[:,0]), np.max(points[:,0]) + # miny, maxy = np.min(points[:,1]), np.max(points[:,1]) + + # sx = maxx-minx + # sy = maxy-miny + + # points[:,0] -= avg_x + # points[:,1] -= avg_y + # points /= (sx * 1.5) # scale to 1 + + # points @= rotateMatrix(angle) + + # points += draw_pos + + # points = [RenderablePoint.from_list(pos, color.with_alpha(intensity)) for pos, intensity in zip(points, shape[:,2])] + # self.drawn_text = text + # self.drawn_text_lines = [RenderableLine(points)] + # lines.extend(self.drawn_text_lines) + + + return lines + + def get_text_lines(self, text, shape, color): + if self.drawn_text == text: + return self.drawn_text_lines + return None + +# def circle_points(cx, cy, r, c: Color): PointList +# # r = r +# steps = 30 +# pointlist: list[LaserPoint] = [] +# for i in range(steps): +# x = int(cx + math.cos(i * (2*math.pi)/steps) * r) +# y = int(cy + math.sin(i * (2*math.pi)/steps)* r) +# pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0))) + +# return pointlist + + +def points_fade_out_alpha_mask(positions: List, track_age_seconds: float, fade_after_duration: float, fade_frames: int, no_frame_max=False): + track_age_in_frames = int(track_age_seconds * TRACK_FADE_ASSUME_FPS) + if not no_frame_max: + track_max_points = int(fade_after_duration * TRACK_FADE_ASSUME_FPS) - track_age_in_frames + else: + if track_age_seconds < fade_after_duration: + track_max_points = len(positions) #+ fade_frames + else: + FADE_DURATION = 2 + t_fade = max(0, track_age_seconds - fade_after_duration) / FADE_DURATION + track_max_points = int(t_fade * len(positions) / FADE_DURATION) + + + + drawable_points = positions[-track_max_points:] + alphas = [] + for i, point in enumerate(drawable_points): + reverse_i = len(drawable_points) - i + fade_i = reverse_i - (track_max_points - fade_frames) # -90 + fade_i /= fade_frames # / 10 + fade_i = np.clip(fade_i, 0, 1) + alpha = 1 - fade_i + if alpha > 0: + alphas.append(alpha) + + return drawable_points, alphas + + # drawn_pred_history + # drawn_predictions + + +# @dataclass +# class RenderablePosition(): +# x: float +# y: float + +# @classmethod +# def from_list(cls, l: List[float, float]) -> RenderablePosition: +# return cls(x = float(l[0]), y=float(l[1])) + +# TODO)) Or Shapely point? + +class Stage(Node): + """ + Render a stage, on which different TrackScenarios take place to a + single image of lines. Which can be passed to different renderers + E.g. the laser or image renderers. + """ + + FPS = 60 + + def setup(self): + # self.scenarios: List[DrawnScenario] = [] + self.scenarios: Dict[str, DrawnScenario] = defaultdict(lambda: DrawnScenario()) + self.frame_noimg_sock = self.sub(self.config.zmq_frame_noimg_addr) + self.trajectory_sock = self.sub(self.config.zmq_trajectory_addr) + self.prediction_sock = self.sub(self.config.zmq_prediction_addr) + self.stage_sock = self.pub(self.config.zmq_stage_addr) + + self.counter = CounterSender() + self.frame: Optional[Frame] = None + + + if self.config.debug_map: + debug_color = SrgbaColor(0.,0.,1.,1.) + self.debug_lines = RenderableLines(load_lines_from_svg(self.config.debug_map, 100, debug_color)) + + + def run(self): + prev_time = time.perf_counter() + while self.is_running.is_set(): + + self.tick() + + # 1) poll & update + self.loop_receive() + + # 2) render + self.loop_render() + + # 3) calculate latency for desired FPS + now = time.perf_counter() + time_diff = (now - prev_time) + if time_diff < 1/self.FPS: + # print(f"sleep {1/self.FPS - time_diff}") + time.sleep(1/self.FPS - time_diff) + now += 1/self.FPS - time_diff + + prev_time = now + + def loop_receive(self): + # 1) receive frames + try: + camera_frame: Frame = self.frame_noimg_sock.recv_pyobj(zmq.NOBLOCK) + self.frame = camera_frame + except zmq.ZMQError as e: + pass + + # 1) receive predictions + try: + prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK) + for track_id, track in prediction_frame.tracks.items(): + proj_track = ProjectedTrack(track, prediction_frame.camera) + self.scenarios[track_id].add_prediction(proj_track) + except zmq.ZMQError as e: + self.logger.debug(f'reuse prediction') + + # 2) receive tracker tracks + try: + trajectory_frame: Frame = self.trajectory_sock.recv_pyobj(zmq.NOBLOCK) + for track_id, track in trajectory_frame.tracks.items(): + proj_track = ProjectedTrack(track, trajectory_frame.camera) + # if not self.scenarios[track_id].camera: + # self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera! + self.scenarios[track_id].set_track(proj_track) + except zmq.ZMQError as e: + self.logger.debug(f'reuse tracks') + + # 3) Remove stale tracks + for track_id, scenario in list(self.scenarios.items()): + # check when last tracker update was received + scenario.check_lost() + + if scenario.lost_factor() > 1: + self.logger.info(f"rm track {track_id}") + del self.scenarios[track_id] + + def loop_render(self): + lines = RenderableLines([]) + + + # 0. DEBUG lines: + + + + # 1. Draw each scenario: + for track_id, scenario in self.scenarios.items(): + scenario.update_drawn_positions() + + lines.append_lines(scenario.to_renderable_lines()) + + # print(lines) + # rl = RenderableLines(lines) + # with open('/tmp/lines.pcl', 'wb') as fp: + # pickle.dump(rl, fp) + # rl = lines + rl = lines.as_simplified(SimplifyMethod.RDP, .003) # or segmentise (see shapely) + self.counter.set("stage.lines", len(lines.lines)) + self.counter.set("stage.points_orig", lines.point_count()) + self.counter.set("stage.points", rl.point_count()) + + + # debug_lines = RenderableLines([]) + # if self.frame and hasattr(self.frame.camera, 'debug_lines'): + # for points in self.frame.camera.debug_lines: + # line_points = [] + # # interpolate, so the laser can correct the lines + # for i in range(20): + # t = i / 19 + # x = lerp(points[0][0], points[1][0], t) + # y = lerp(points[0][1], points[1][1], t) + # line_points.append(RenderablePoint((x, y), debug_color)) + + # debug_lines.append(RenderableLine(line_points)) + + layers: RenderableLayers = { + 1: rl, + 2: self.debug_lines, + } + + # print(rl.__dict__) + + self.stage_sock.send_json(obj=layers, cls=DataclassJSONEncoder) + + # print(json.dumps(rl, cls=DataclassJSONEncoder)) + + @classmethod + def arg_parser(cls) -> ArgumentParser: + argparser = ArgumentParser() + argparser.add_argument('--zmq-frame-noimg-addr', + help='Manually specity communication addr for the frame messages', + type=str, + default="ipc:///tmp/feeds_frame2") + argparser.add_argument('--zmq-trajectory-addr', + help='Manually specity communication addr for the trajectory messages', + type=str, + default="ipc:///tmp/feeds_traj") + argparser.add_argument('--zmq-prediction-addr', + help='Manually specity communication addr for the prediction messages', + type=str, + default="ipc:///tmp/feeds_preds") + argparser.add_argument('--zmq-stage-addr', + help='Manually specity communication addr for the stage messages (the rendered lines)', + type=str, + default="tcp://0.0.0.0:99174") + argparser.add_argument('--debug-map', + help='specify a map (svg-file) from which to load lines which will be overlayed', + type=str, + default="../DATASETS/hof3/map_hof.svg") + return argparser + + + + +# TODO place somewhere else: +# Gemma3:27b prompt: "python. Given a list of coordinates, that describes a line: `drawable_points: List[Tuple[float,float]]` apply perlin noise over the normal of the line, that changes over time `dt`." +def apply_perlin_noise_to_line_normal(drawable_points: np.ndarray, dt: float, amplitude: float = 1.0, frequency: float = 1.0, fade_over_n_points = 8) -> np.ndarray: + """ + Applies Perlin noise to the normals of a line described by a list of coordinates, changing over time. + + Args: + drawable_points: A list of (x, y) tuples representing the points of the line. + dt: The time delta, used to animate the Perlin noise. + amplitude: The strength of the Perlin noise effect. + frequency: The frequency of the Perlin noise (how many waves per unit). + + Returns: + A new list of (x, y) tuples representing the line with Perlin noise applied to the normals. If drawable_points + has fewer than 2 points, it returns the original list unchanged. + + Raises: + TypeError: If drawable_points is not a list or dt is not a float. + ValueError: If the input points are not tuples of length 2. + """ + + # if not isinstance(drawable_points, list): + # print(drawable_points, type(drawable_points)) + # raise TypeError("drawable_points must be a list.") + if not isinstance(dt, float): + raise TypeError("dt must be a float.") + + if len(drawable_points) < 2: + return drawable_points # Nothing to do with fewer than 2 points + + # for point in drawable_points: + # if not isinstance(point, tuple) or len(point) != 2: + # raise ValueError("Each point in drawable_points must be a tuple of length 2.") + + + # noise = PerlinNoise(octaves=4) # You can adjust octaves for different noise patterns + + new_points = [] + for i in range(len(drawable_points)): + x, y = drawable_points[i] + + # Calculate the normal vector. We'll approximate it using the previous and next points. + if i == 0: + # For the first point, use the next point to estimate the normal + next_x, next_y = drawable_points[i + 1] + normal_x = next_y - y + normal_y = -(next_x - x) + elif i == len(drawable_points) - 1: + # For the last point, use the previous point + prev_x, prev_y = drawable_points[i - 1] + normal_x = y - prev_y + normal_y = -(x - prev_x) + else: + prev_x, prev_y = drawable_points[i - 1] + next_x, next_y = drawable_points[i + 1] + normal_x = next_y - prev_y + normal_y = -(next_x - prev_x) + + # Normalize the normal vector + norm = np.sqrt(normal_x**2 + normal_y**2) + if norm > 0: + normal_x /= norm + normal_y /= norm + + # Apply Perlin noise to the normal + # noise_x = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_x + # noise_y = noise([x * frequency, (y + dt) * frequency]) * amplitude * normal_y + noise = snoise2(i * frequency, dt % 1000, octaves=4) + + use_amp = amplitude + if fade_over_n_points > 0: + rev_step = len(drawable_points) - i + amp_factor = rev_step / fade_over_n_points + if amp_factor < 1: + use_amp *= amp_factor + + noise_x = noise * use_amp * normal_x + noise_y = noise * use_amp * normal_y + + # print(noise_x, noise_y, dt, frequency, i, dt, snoise2(i * frequency, dt % 1000, octaves=4)) + + + # Add the noise to the point's coordinates + new_x = x + noise_x + new_y = y + noise_y + + new_points.append((new_x, new_y)) + + # print(drawable_points, new_points) + + return np.array(new_points) + + +import math + +def distance(p1, p2): + return math.hypot(p2[0] - p1[0], p2[1] - p1[1]) + + +def dashed_line(line: LineString, dash_len: float, gap_len: float, offset: float = 0, loop_offset = True) -> MultiLineString: + total_length = line.length + + segments = [] + + if loop_offset: + # by default, prepend skipped gap + pos = offset % (dash_len + gap_len) + + if pos > gap_len: + segments.append(substring(line, 0, pos - gap_len)) + else: + pos = offset + + while pos < total_length: + end = min(pos + dash_len, total_length) + if pos < end: + dash = substring(line, pos, end) + segments.append(dash) + pos += dash_len + gap_len + + return MultiLineString(segments) \ No newline at end of file