trap/trap/stage.py
2025-05-19 14:11:56 +02:00

897 lines
36 KiB
Python

from __future__ import annotations
from abc import ABC, abstractmethod
from argparse import ArgumentParser
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
import json
import logging
import math
import pickle
import time
from typing import Dict, List, Optional, Tuple
from matplotlib.pyplot import isinteractive
import numpy as np
from shapely import LineString, line_locate_point, linestrings
from statemachine import Event, State, StateMachine
from statemachine.exceptions import TransitionNotAllowed
import zmq
from sgan.sgan import data
from trap import shapes
from trap.base import Camera, DataclassJSONEncoder, DistortedCamera, Frame, ProjectedTrack, Track
from trap.counter import CounterSender
from trap.laser_renderer import circle_points, rotateMatrix
from trap.lines import RenderableLine, RenderableLines, RenderablePoint, RenderablePosition, SrgbaColor, circle_arc
from trap.node import Node
from trap.timer import Timer
from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointToPolar, relativePolarToPoint
logger = logging.getLogger('trap.stage')
Coordinate = Tuple[float, float]
DeltaT = float # delta_t in seconds
class LineGenerator(ABC):
@abstractmethod
def update_drawn_positions(self, dt: DeltaT):
pass
def as_renderable(self, color: SrgbaColor) -> RenderableLines:
points = [RenderablePoint(p, color) for p in self._drawn_points]
lines = [RenderableLine(points)]
return RenderableLines(lines)
class AppendableLine(LineGenerator):
"""
A line generator that allows for points to be added over time.
Simply use `line.points.extend([p1, p2])`
"""
def __init__(self, points: Optional[List[Coordinate]] = None, draw_decay_speed = 25.):
self.points: List[Coordinate] = points if points is not None else [] # when providing [] as default, it messes up instancing by reusing the same list
self._drawn_points = []
self.ready = len(self.points) == 0
self.draw_decay_speed = draw_decay_speed
def update_drawn_positions(self, dt: DeltaT):
if len(self.points) == 0:
# nothing to draw yet
return
# self._drawn_points = self.points
if len(self._drawn_points) == 0:
# create origin
self._drawn_points.append(self.points[0])
# and drawing head
self._drawn_points.append(self.points[0])
idx = len(self._drawn_points) - 1
target = self.points[idx]
if np.isclose(self._drawn_points[-1], target, atol=.05).all():
# TODO: might want to migrate to np.isclose()
if len(self._drawn_points) == len(self.points):
self.ready = True
return # done until a new point is added
# add new point as drawing head
self._drawn_points.append(self._drawn_points[-1])
self.ready = False
x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.draw_decay_speed, dt, .05)
y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.draw_decay_speed, dt, .05)
self._drawn_points[-1] = (float(x), float(y))
class ProceduralChain(LineGenerator):
"""A line that can be 'dragged' to a target. In which
it disappears."""
MOVE_DECAY_SPEED = 80 # speed at which the drawing head should approach the next point
VELOCITY_DAMPING = 10
VELOCITY_FACTOR = 2
link_size = .1 # 10cm
# angle_constraint = 5
def __init__(self, joints: List[Coordinate], scenario: DrawnScenario, use_velocity = False):
self.joints: List[Coordinate] = joints
self.target: Coordinate = joints[-1]
self.ready = False
self.move_decay_speed = self.MOVE_DECAY_SPEED
self.scenario = scenario
self.use_velocity = use_velocity
if self.use_velocity:
if len(self.joints) > 1:
self.v = np.array(self.joints[-2]) - np.array(self.joints[-1])
self.v /= np.linalg.norm(self.v) / 10
else:
self.v = np.array([0,0])
@classmethod
def from_appendable_line(cls, al: AppendableLine, scenario: DrawnScenario) -> ProceduralChain:
# TODO: create more segments:
# last added points becomes the head of the chain
points = list(reversed(al.points))
linestring = LineString(points)
linestring = linestring.segmentize(cls.link_size)
joints = list(linestring.coords)
return cls(joints, scenario)
def update_drawn_positions(self, dt: DeltaT):
if self.ready:
return
# direction = np.array(self.joints[-1] - self.target)
# TODO: check self.joints empty, and stop then
if self.use_velocity:
vx = exponentialDecayRounded(self.v[0], self.target[0] - self.joints[0][0], self.VELOCITY_DAMPING, dt, .05)
vy = exponentialDecayRounded(self.v[1], self.target[1] - self.joints[0][1], self.VELOCITY_DAMPING, dt, .05)
self.v = np.array([vx, vy])
self.joints[0] = (float(self.joints[0][0] + self.v[0] * dt * self.VELOCITY_FACTOR), float(self.joints[0][1] + self.v[1] * dt * self.VELOCITY_FACTOR))
else:
x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05)
y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05)
self.joints[0] = (float(x), float(y))
# Loop inspired by: https://github.com/argonautcode/animal-proc-anim/blob/main/Chain.pde
# see that code for angle constrains.
for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1):
diff = np.array(prev_joint) - np.array(joint)
direction = diff / np.linalg.norm(diff)
self.joints[i] = prev_joint - direction * self.link_size
if np.isclose(self.joints[0], self.target, atol=.05).all():
# self.ready = True
# TODO: smooth transition instead of cutting off
self.joints.pop(0)
self.scenario.add_anomaly_length(self.link_size)
if len(self.joints) == 0:
self.ready = True
self._drawn_points = self.joints
class DiffSegment():
"""
A segment of a prediction track, that can be diffed
with a track. The track is continously update.
If a new prediction comes in, the diff is marked as
finished. After which it is animated and added to the
Scenario's anomaly score.
"""
DRAW_DECAY_SPEED = 25
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._last_diff_frame_idx = 0
self.finished = False
self.line = AppendableLine( draw_decay_speed=self.DRAW_DECAY_SPEED)
self.points: List[Coordinate] = []
self._drawn_points = []
self._target_track = prediction
def finish(self):
self.finished = True
# run on each track update received
def update_track(self, track: ProjectedTrack):
self._target_track = track
if self.finished:
# don't add new points if finished
return
# migrate SceneraioScene function
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back:
logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
else:
trajectory = track.projected_history
# from start to as far as it gets
trajectory_range = trajectory[-1*traj_diff_steps_back:]
prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
line = []
for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)):
offset_from_start = (pred_diff_steps_forward + i)
if offset_from_start % 4 == 0:
self.line.points.extend([p1, p2])
self.points.extend([p1, p2])
self._last_diff_frame_idx = track.frame_index
# run each render tick
def update_drawn_positions(self, dt: DeltaT, scenario: DrawnScenario):
if isinstance(self.line, AppendableLine):
if self.finished and self.line.ready:
# convert when fully drawn
# print(self, "CONVERT LINE")
self.line = ProceduralChain.from_appendable_line(self.line, scenario)
if isinstance(self.line, ProceduralChain):
self.line.target = self._target_track.projected_history[-1]
# if len(self.points) == 0:
# # nothing to draw yet
# return
# # self._drawn_points = self.points
# if len(self._drawn_points) == 0:
# # create origin
# self._drawn_points.append(self.points[0])
# # and drawing head
# self._drawn_points.append(self.points[0])
# idx = len(self._drawn_points) - 1
# target = self.points[idx]
# if np.isclose(self._drawn_points[-1], target, atol=.05).all():
# # TODO: might want to migrate to np.isclose()
# if len(self._drawn_points) == len(self.points):
# self.ready = True
# return # done until a new point is added
# # add new point as drawing head
# self._drawn_points.append(self._drawn_points[-1])
# self.ready = False
# x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.DRAW_DECAY_SPEED, dt, .05)
# y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.DRAW_DECAY_SPEED, dt, .05)
# self._drawn_points[-1] = (float(x), float(y))
# if not self.finished or not self.line.ready:
self.line.update_drawn_positions(dt)
def as_renderable(self) -> RenderableLines:
color = SrgbaColor(0,0,1,1)
# lines = []
# points = [RenderablePoint(p, color) for p in self._drawn_points]
# lines = [RenderableLine(points)]
# return RenderableLines(lines)
if not self.finished or not self.line.ready:
return self.line.as_renderable(color)
return self.line.as_renderable(color)
# points = [RenderablePoint(p, color) for p in self._drawn_points]
# lines = [RenderableLine(points)]
return RenderableLines([])
class ScenarioScene(Enum):
DETECTED = 1
FIRST_PREDICTION = 2
CORRECTED_PREDICTION = 3
LOITERING = 4
PLAY = 4
LOST = -1
LOST_FADEOUT = 3
PREDICTION_INTERVAL: float|None = 20 # frames
PREDICTION_FADE_IN: float = 3
PREDICTION_FADE_SLOPE: float = -10
PREDICTION_FADE_AFTER_DURATION: float = 10 # seconds
PREDICTION_END_FADE = 2 #frames
# TRACK_MAX_POINTS = 100
TRACK_FADE_AFTER_DURATION = 10. # seconds
TRACK_END_FADE = 50 # points
TRACK_FADE_ASSUME_FPS = 12
# Don't render the first n points of the prediction,
# helps to avoid jitter in the line transition
# PREDICTION_OFFSET = int(TRACK_FADE_ASSUME_FPS * PREDICTION_INTERVAL * .8)
class TrackScenario(StateMachine):
detected = State(initial=True)
substantial = State()
first_prediction = State()
corrected_prediction = State()
loitering = State()
play = State()
lost = State(final=True)
receive_track = lost.from_(
detected, first_prediction, corrected_prediction, loitering, play, substantial, cond="track_is_lost"
) | corrected_prediction.to(loitering, cond="track_is_loitering") | detected.to(substantial, cond="track_is_long")
mark_lost = lost.from_(detected, substantial, first_prediction, corrected_prediction, loitering, play)
receive_prediction = detected.to(first_prediction) | substantial.to(first_prediction) | first_prediction.to(corrected_prediction, cond="prediction_is_stale") | corrected_prediction.to(play, cond="prediction_is_playing")
def __init__(self):
self.track: ProjectedTrack = None
self.camera: Optional[Camera] = None
# self.first_prediction_track: Optional[Track] = None
# self.prediction_track: Optional[Track] = None
self.predictions: List[Track] = []
self._last_diff_frame_idx: Optional[int] = 0
self.diffs: List[Tuple[Coordinate, Coordinate]] = []
self.prediction_diffs: List[DiffSegment] = []
super().__init__()
def track_is_long(self, track: ProjectedTrack):
return len(track.history) > 20
def track_is_lost(self, track: ProjectedTrack):
# return self._track and self._track.created_at < time.time() - 5
return track.lost # Note, for now this is not implemented in the tacker, see check_lost()
def track_is_loitering(self, track: ProjectedTrack):
# TODO)) Change to measure displacement over the last n seconds
return len(track.history) > (track.fps * 60) # seconds after which someone is loitering
def prediction_is_stale(self, track: ProjectedTrack):
# TODO use displacement instead of time
return bool(len(self.predictions) and self.predictions[-1].created_at < (time.time() - 2))
def prediction_is_playing(self, track):
return False
def check_lost(self):
if self.current_state is not self.lost and self.track and self.track.created_at < time.time() - 5:
self.mark_lost()
def set_track(self, track: ProjectedTrack):
if self.track and self.track.created_at > track.created_at:
# ignore old track
return
self.track = track
self.update_prediction_diff()
# check to change state
try:
self.receive_track(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def update_prediction_diff(self):
"""
gather the diffs of the trajectory with the most recent prediction
"""
if len(self.prediction_diffs) == 0:
return
for diff in self.prediction_diffs:
diff.update_track(self.track)
def add_prediction(self, track: ProjectedTrack):
if not self.track:
# in case of the unlikely event that prediction was passed sooner
self.set_track(track)
# if not self.first_prediction_track:
# self.first_prediction_track = track
if PREDICTION_INTERVAL is not None and len(self.predictions) and (track.frame_index - self.predictions[-1].frame_index) < PREDICTION_INTERVAL:
# just drop tracks if the predictions come to quick
return
self.predictions.append(track)
if len(self.prediction_diffs):
self.prediction_diffs[-1].finish() # existing diffing can end
# and create a new one
self.prediction_diffs.append(DiffSegment(track))
# check to change state
try:
self.receive_prediction(track)
except TransitionNotAllowed as e:
# state change is optional
pass
def after_receive_track(self, track: ProjectedTrack):
print('changed state')
def on_receive_track(self, track: ProjectedTrack):
# on event, only runs once, upon first track
print('updating track!')
def on_receive_prediction(self, track: ProjectedTrack):
# on event, because it happens for every receive, despite transition
print('updating prediction!')
# self.track = track
def after_receive_prediction(self, track: ProjectedTrack):
# after
pass
# self.prediction_track = track
# if not self.first_prediction_track:
# self.first_prediction_track = track
def on_enter_corrected_prediction(self):
print('corrected!')
def on_enter_detected(self):
print("DETECTED!")
def on_enter_first_prediction(self):
print("Hello!")
def on_enter_detected(self):
print(f"enter {self.current_state.id}")
def on_enter_substantial(self):
print(f"enter {self.current_state.id}")
def on_enter_first_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_corrected_prediction(self):
print(f"enter {self.current_state.id}")
def on_enter_loitering(self):
print(f"enter {self.current_state.id}")
def on_enter_play(self):
print(f"enter {self.current_state.id}")
def on_enter_lost(self):
print(f"enter {self.current_state.id}")
self.lost_at = time.time()
def lost_for(self):
if self.current_state is self.lost:
return time.time() - self.lost_at
return None
def lost_factor(self):
l = self.lost_for()
if not l:
return 0
return l/LOST_FADEOUT
def to_lines(self) -> List[RenderableLine]:
raise RuntimeError("Not implemented yet")
class DrawnScenario(TrackScenario):
"""
Scenario contains the controls (scene, target positions)
DrawnScenario class does the actual drawing of points incl. transitions
"""
ANOMALY_DECAY = .2 # speed with which the cirlce shrinks over time
DISTANCE_ANOMALY_FACTOR = .05 # the ammount to which the difference counts to the anomaly score
MAX_HISTORY = 80 # points of history of trajectory to display (preventing too long lines)
CUT_GAP = 5 # when adding a new prediction, keep the existing prediction until that point + this CUT_GAP margin
def __init__(self):
# self.created_at = time.time()
# self.track_id = track_id
self.last_update_t = time.perf_counter()
self.drawn_positions: List[Coordinate] = []
self.drawn_pred_history: List[Coordinate] = []
self.drawn_predictions: List[List[Coordinate]] = []
self._current_drawn_prediction: List[Coordinate] = []
self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = []
self.anomaly_score = 0 # TODO: variable
self._drawn_anomaly_score = 0
super().__init__()
def add_anomaly_length(self, length: float):
"""
append a difference in meters between point
"""
self.anomaly_score += length * self.DISTANCE_ANOMALY_FACTOR
if self.anomaly_score > 1:
self.anomaly_score = 1.
def decay_anomaly_score(self, dt: DeltaT):
if self.anomaly_score == 0:
return
self.anomaly_score = exponentialDecay(self.anomaly_score, 0, self.ANOMALY_DECAY, dt)
def update_drawn_positions(self) -> List:
'''
use dt to lerp the drawn positions in the direction of current prediction
'''
# TODO: make lerp, currently quick way to get results
def int_or_not(v):
"""quick wrapper to toggle int'ing"""
return v
# return int(v)
# 0. calculate dt
# if dt is None:
t = time.perf_counter()
dt: DeltaT = t - self.last_update_t
self.last_update_t = t
# 0. Update anomaly, slowly decreasing it over time
self.decay_anomaly_score(dt)
for diff in self.prediction_diffs:
diff.update_drawn_positions(dt, self)
# 1. track history, direct update
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:]
# 3. predictions
if len(self.drawn_predictions) < len(self.predictions):
# first prediction
if len(self.drawn_predictions) == 0:
self.drawn_predictions.append(self.predictions[-1].predictions[0])
else:
# cut existing prediction
end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP
# print(end_step)
keep = self.drawn_predictions[-1][end_step:]
last_item: Coordinate = keep[-1]
self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step]
# print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep))
ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep))
# print(ext)
keep.extend(ext)
self.drawn_predictions.append(keep)
for a, drawn_prediction in enumerate(self.drawn_predictions):
# origin = self.predictions[a].predictions[0][0]
origin = self.predictions[a].predictions[0][0]
for i, pos in enumerate(drawn_prediction):
# TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
decay = max(3, (18/i) if i else 10) # points further away move with more delay
decay = 16
drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
pred_r, pred_angle = relativePointToPolar(origin, self.predictions[a].predictions[0][i])
r = exponentialDecay(drawn_r, pred_r, decay, dt)
# make circular coordinates transition through the smaller arc
if abs(drawn_angle - pred_angle) > math.pi:
pred_angle -= math.pi * 2
angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
x, y = relativePolarToPoint(origin, r, angle)
self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
# self.drawn_predictions = []
# for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])):
# prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track
# if next_ptrack is not None:
# # not the last one, cut off
# next_ptrack: ProjectedTrack = self.predictions[a+1]
# end_step = next_ptrack.frame_index - ptrack.frame_index
# else:
# end_step = None # not last item; show all
# self.drawn_predictions.append(ptrack.predictions[0][:end_step])
# Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s
self._drawn_anomaly_score = exponentialDecay(self._drawn_anomaly_score, self.anomaly_score, 3, dt)
# print(self.drawn_predictions)
# line = []
# for i, pos in enumerate(ptrack.predictions):
# line.append((ptrack.predictions[i][0], ptrack.predictions[i][1]))
# print(line)
# if len(self.drawn_predictions) <= a:
# self.drawn_predictions.append(line)
# else:
# self.drawn_predictions[a] = line
# if self.prediction_track and self.prediction_track.predictions:
# prediction_offset = self._track.frame_index - self.prediction_track.frame_index
# if len(self.prediction_track.predictions):
# for a, drawn_prediction in enumerate(self.drawn_predictions):
# for i, pos in enumerate(drawn_prediction):
# # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
# decay = max(3, (18/i) if i else 10) # points further away move with more delay
# decay = 16
# origin = self.drawn_positions[-1]
# drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
# pred_r, pred_angle = relativePointToPolar(origin, self.prediction_track.predictions[a][i+prediction_offset])
# r = exponentialDecay(drawn_r, pred_r, decay, dt)
# angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
# x, y = relativePolarToPoint(origin, r, angle)
# self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
# if len(self.prediction_track.predictions) > len(self.drawn_predictions):
# for pred in self.prediction_track.predictions[len(self.drawn_predictions):]:
# self.drawn_predictions.append(pred[prediction_offset:])
def to_renderable_lines(self) -> RenderableLines:
t = time.time()
track_age = t - self.track.created_at
lines = RenderableLines([])
# track_age_in_frames = int(track_age * TRACK_FADE_ASSUME_FPS)
# track_max_points = TRACK_FADE_AFTER_DURATION * TRACK_FADE_ASSUME_FPS - track_age_in_frames
# 1. Trajectory history
drawable_points, alphas = points_fade_out_alpha_mask(self.drawn_positions, track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE)
color = SrgbaColor(1.,0.,0.,1.-self.lost_factor())
points = [RenderablePoint(pos, color.as_faded(a)) for pos, a in zip(drawable_points, alphas)]
lines.append(RenderableLine(points))
# 2. Position Marker / anomaly score
anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout
# lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomaly_score, anomaly_marker_color))
lines.append(circle_arc(
self.drawn_positions[-1][0], self.drawn_positions[-1][1],
max(.1, self._drawn_anomaly_score * 1.),
0, 1,
anomaly_marker_color)
)
# 3. Predictions
if len(self.drawn_predictions):
color = SrgbaColor(0.,1,0.,1.-self.lost_factor())
prediction_track_age = time.time() - self.predictions[0].created_at
t_factor = prediction_track_age / PREDICTION_FADE_IN
# positions = [RenderablePosition.from_list(pos) for pos in self.drawn_positions]
for drawn_prediction in self.drawn_predictions:
# drawn_prediction, alphas1 = points_fade_out_alpha_mask(drawn_prediction, prediction_track_age, TRACK_FADE_AFTER_DURATION, TRACK_END_FADE, no_frame_max=True)
gradient = np.linspace(0, PREDICTION_FADE_SLOPE, len(drawn_prediction))
alphas2 = np.clip(gradient + t_factor * (-1*PREDICTION_FADE_SLOPE), 0, 1)
# print(alphas)
# colors = [color.with_alpha(np.clip(t_factor*3-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))]
# colors = [color.with_alpha(np.clip(t_factor*2-(p_index/len(drawn_prediction)), 0, 1)) for p_index in range(len(drawn_prediction))]
# apply both fade in and fade out mask:
colors = [color.as_faded(a2) for a2 in alphas2]
# colors = [color.as_faded(a1*a2) for a1, a2 in zip(alphas1, alphas2)]
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction[PREDICTION_OFFSET:], colors[PREDICTION_OFFSET:])]
points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_prediction, colors)]
lines.append(RenderableLine(points))
# 4. Diffs
# for drawn_diff in self.drawn_diffs:
# color = SrgbaColor(0.,1,1.,1.-self.lost_factor())
# colors = [color.as_faded(1) for a2 in range(len(drawn_diff))]
# points = [RenderablePoint(pos, pos_color) for pos, pos_color in zip(drawn_diff, colors)]
# lines.append(RenderableLine(points))
for diff in self.prediction_diffs:
lines.append_lines(diff.as_renderable())
# # print(self.current_state)
# if self.current_state is self.first_prediction or self.current_state is self.corrected_prediction:
# shape = np.array(shapes.YOUR if time.time() % 2 > 1 else shapes.FUTURE)
# text = "your" if time.time() % 2 > 1 else "future"
# color = SrgbaColor(0.5,0.5,0.5,1.-self.lost_factor())
# line = self.get_text_lines(text, shape, color)
# if not line:
# POSITION_INDEX = 50
# draw_pos = self.drawn_predictions[0][POSITION_INDEX-1]
# current_pos = self.drawn_positions[-1]
# angle = np.arctan2(draw_pos[0]-current_pos[0], draw_pos[1]-current_pos[1]) + np.pi
# # for i, line in enumerate(shape):
# # if i != 0:
# # continue
# points = np.array(shape[:,:2])
# avg_x = np.average(points[:,0])
# avg_y = np.average(points[:,1])
# minx, maxx = np.min(points[:,0]), np.max(points[:,0])
# miny, maxy = np.min(points[:,1]), np.max(points[:,1])
# sx = maxx-minx
# sy = maxy-miny
# points[:,0] -= avg_x
# points[:,1] -= avg_y
# points /= (sx * 1.5) # scale to 1
# points @= rotateMatrix(angle)
# points += draw_pos
# points = [RenderablePoint.from_list(pos, color.with_alpha(intensity)) for pos, intensity in zip(points, shape[:,2])]
# self.drawn_text = text
# self.drawn_text_lines = [RenderableLine(points)]
# lines.extend(self.drawn_text_lines)
return lines
def get_text_lines(self, text, shape, color):
if self.drawn_text == text:
return self.drawn_text_lines
return None
# def circle_points(cx, cy, r, c: Color): PointList
# # r = r
# steps = 30
# pointlist: list[LaserPoint] = []
# for i in range(steps):
# x = int(cx + math.cos(i * (2*math.pi)/steps) * r)
# y = int(cy + math.sin(i * (2*math.pi)/steps)* r)
# pointlist.append(LaserPoint(x, y, c, blank=(i==(steps-1)or i==0)))
# return pointlist
def points_fade_out_alpha_mask(positions: List, track_age_seconds: float, fade_after_duration: float, fade_frames: int, no_frame_max=False):
track_age_in_frames = int(track_age_seconds * TRACK_FADE_ASSUME_FPS)
if not no_frame_max:
track_max_points = int(fade_after_duration * TRACK_FADE_ASSUME_FPS) - track_age_in_frames
else:
if track_age_seconds < fade_after_duration:
track_max_points = len(positions) #+ fade_frames
else:
FADE_DURATION = 2
t_fade = max(0, track_age_seconds - fade_after_duration) / FADE_DURATION
track_max_points = int(t_fade * len(positions) / FADE_DURATION)
drawable_points = positions[-track_max_points:]
alphas = []
for i, point in enumerate(drawable_points):
reverse_i = len(drawable_points) - i
fade_i = reverse_i - (track_max_points - fade_frames) # -90
fade_i /= fade_frames # / 10
fade_i = np.clip(fade_i, 0, 1)
alpha = 1 - fade_i
if alpha > 0:
alphas.append(alpha)
return drawable_points, alphas
# drawn_pred_history
# drawn_predictions
# @dataclass
# class RenderablePosition():
# x: float
# y: float
# @classmethod
# def from_list(cls, l: List[float, float]) -> RenderablePosition:
# return cls(x = float(l[0]), y=float(l[1]))
# TODO)) Or Shapely point?
class Stage(Node):
"""
Render a stage, on which different TrackScenarios take place to a
single image of lines. Which can be passed to different renderers
E.g. the laser or image renderers.
"""
FPS = 60
def setup(self):
# self.scenarios: List[DrawnScenario] = []
self.scenarios: Dict[str, DrawnScenario] = defaultdict(lambda: DrawnScenario())
self.trajectory_sock = self.sub(self.config.zmq_trajectory_addr)
self.prediction_sock = self.sub(self.config.zmq_prediction_addr)
self.stage_sock = self.pub(self.config.zmq_stage_addr)
self.counter = CounterSender()
self.camera: Optional[DistortedCamera] = None
def run(self):
prev_time = time.perf_counter()
while self.is_running.is_set():
self.tick()
# 1) poll & update
self.loop_receive()
# 2) render
self.loop_render()
# 3) calculate latency for desired FPS
now = time.perf_counter()
time_diff = (now - prev_time)
if time_diff < 1/self.FPS:
# print(f"sleep {1/self.FPS - time_diff}")
time.sleep(1/self.FPS - time_diff)
now += 1/self.FPS - time_diff
prev_time = now
def loop_receive(self):
# 1) receive predictions
try:
prediction_frame: Frame = self.prediction_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in prediction_frame.tracks.items():
proj_track = ProjectedTrack(track, prediction_frame.camera)
self.scenarios[track_id].add_prediction(proj_track)
except zmq.ZMQError as e:
self.logger.debug(f'reuse prediction')
# 2) receive tracker tracks
try:
trajectory_frame: Frame = self.trajectory_sock.recv_pyobj(zmq.NOBLOCK)
for track_id, track in trajectory_frame.tracks.items():
proj_track = ProjectedTrack(track, trajectory_frame.camera)
# if not self.scenarios[track_id].camera:
# self.scenarios[track_id].camera = trajectory_frame.camera # little hack to pass camera!
self.scenarios[track_id].set_track(proj_track)
except zmq.ZMQError as e:
self.logger.debug(f'reuse tracks')
# 3) Remove stale tracks
for track_id, scenario in list(self.scenarios.items()):
# check when last tracker update was received
scenario.check_lost()
if scenario.lost_factor() > 1:
self.logger.info(f"rm track {track_id}")
del self.scenarios[track_id]
def loop_render(self):
lines = RenderableLines([])
for track_id, scenario in self.scenarios.items():
scenario.update_drawn_positions()
lines.append_lines(scenario.to_renderable_lines())
# print(lines)
# rl = RenderableLines(lines)
# with open('/tmp/lines.pcl', 'wb') as fp:
# pickle.dump(rl, fp)
rl = lines.as_simplified() # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines.lines))
# print(rl.__dict__)
self.stage_sock.send_json(rl, cls=DataclassJSONEncoder)
# print(json.dumps(rl, cls=DataclassJSONEncoder))
@classmethod
def arg_parser(cls) -> ArgumentParser:
argparser = ArgumentParser()
argparser.add_argument('--zmq-trajectory-addr',
help='Manually specity communication addr for the trajectory messages',
type=str,
default="ipc:///tmp/feeds_traj")
argparser.add_argument('--zmq-prediction-addr',
help='Manually specity communication addr for the prediction messages',
type=str,
default="ipc:///tmp/feeds_preds")
argparser.add_argument('--zmq-stage-addr',
help='Manually specity communication addr for the stage messages (the rendered lines)',
type=str,
default="tcp://0.0.0.0:99174")
return argparser