diff --git a/trap/stage.py b/trap/stage.py index 24c4605..306f869 100644 --- a/trap/stage.py +++ b/trap/stage.py @@ -1,5 +1,6 @@ from __future__ import annotations +from abc import ABC, abstractmethod from collections import defaultdict from dataclasses import dataclass from enum import Enum @@ -8,8 +9,9 @@ import logging import pickle import time from typing import Dict, List, Optional, Tuple +from matplotlib.pyplot import isinteractive import numpy as np -from shapely import line_locate_point +from shapely import LineString, line_locate_point, linestrings from statemachine import Event, State, StateMachine from statemachine.exceptions import TransitionNotAllowed import zmq @@ -29,89 +31,33 @@ from trap.utils import exponentialDecay, exponentialDecayRounded, relativePointT logger = logging.getLogger('trap.stage') Coordinate = Tuple[float, float] +DeltaT = float # delta_t in seconds # current_fraction = line_locate_point(new_line_string, Point(old_ls.coords[-1]), normalized=True) # new_fraction = current_fraction + stepsize # grown_string = shapely.ops.substring(new_line_string, 0, new_fraction, normalized=True) -class ProceduralChain(): - link_size = .1 # 10cm - # angle_constraint = 5 - - def __init__(self, first_joint: Coordinate, auto_grow_from: Optional[Coordinate]): - self.joints: List[Coordinate] = [] - self.auto_grow_from = auto_grow_from +class LineGenerator(ABC): + @abstractmethod + def update_drawn_positions(self, dt: DeltaT): pass - def move(self, position): - # - pass + def as_renderable(self, color: SrgbaColor) -> RenderableLines: + points = [RenderablePoint(p, color) for p in self._drawn_points] + lines = [RenderableLine(points)] + return RenderableLines(lines) - if self.auto_grow_from is not None: - pass - # if distance self.joints[-1] - self.auto_grow_from > link_size: - self.joints.append(self.auto_grow_from) - - def arrived(self, point: Coordinate): - [j == point for j in self.joints] - return False - - -DeltaT = float # delta_t - -class DiffSegment(): - DRAW_DECAY_SPEED = 25 - - def __init__(self, prediction: ProjectedTrack): - self.ptrack = prediction - self._last_diff_frame_idx = 0 - self.finished = False - self.ready = False - - self.points: List[Coordinate] = [] +class AppendableLine(LineGenerator): + """ + A line generator that allows for points to be added over time. + Simply use `line.points.extend([p1, p2])` + """ + def __init__(self, points: Optional[List[Coordinate]] = None, draw_decay_speed = 25.): + self.points: List[Coordinate] = points if points is not None else [] # when providing [] as default, it messes up instancing by reusing the same list self._drawn_points = [] + self.ready = len(self.points) == 0 + self.draw_decay_speed = draw_decay_speed - def finish(self): - self.finished = True - - # run on each track update received - def update_track(self, track: ProjectedTrack): - # migrate SceneraioScene function - start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx) - traj_diff_steps_back = track.frame_index - start_frame_idx # positive value - pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value - - if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back: - logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip") - # elif len(ptrack.predictions[0]) < pred_diff_steps_back: - # logger.warning("Prediction does not reach prediction start. Should not be possible. Skip") - else: - trajectory = track.projected_history - - # from start to as far as it gets - trajectory_range = trajectory[-1*traj_diff_steps_back:] - prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space - line = [] - for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)): - offset_from_start = (pred_diff_steps_forward + i) - if offset_from_start % 4 == 0: - self.points.extend([p1, p2]) - - self._last_diff_frame_idx = track.frame_index - # pass - - # # rewrite: - # if not self.finished: - # # build the coordinates for the line - - # pass - # if not self.chain.arrived(self.points[-1]): - # pass # wait for drawing to complete - # else: - # # Move head towards person stats - # pass - - # run each render tick def update_drawn_positions(self, dt: DeltaT): if len(self.points) == 0: # nothing to draw yet @@ -137,16 +83,172 @@ class DiffSegment(): self._drawn_points.append(self._drawn_points[-1]) self.ready = False - x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.DRAW_DECAY_SPEED, dt, .05) - y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.DRAW_DECAY_SPEED, dt, .05) + x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.draw_decay_speed, dt, .05) + y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.draw_decay_speed, dt, .05) self._drawn_points[-1] = (float(x), float(y)) + +class ProceduralChain(LineGenerator): + MOVE_DECAY_SPEED = 50 + link_size = .1 # 10cm + # angle_constraint = 5 + + def __init__(self, joints: List[Coordinate]): + self.joints: List[Coordinate] = joints + self.target: Coordinate = joints[-1] + self.ready = False + self.move_decay_speed = self.MOVE_DECAY_SPEED + + @classmethod + def from_appendable_line(cls, al: AppendableLine) -> ProceduralChain: + # TODO: create more segments: + # last added points becomes the head of the chain + points = list(reversed(al.points)) + linestring = LineString(points) + linestring = linestring.segmentize(cls.link_size) + joints = list(linestring.coords) + + return cls(joints) + + def update_drawn_positions(self, dt: DeltaT): + if self.ready: + return + # direction = np.array(self.joints[-1] - self.target) + + # TODO: check self.joints empty, and stop then + + x = exponentialDecayRounded(self.joints[0][0], self.target[0], self.move_decay_speed, dt, .05) + y = exponentialDecayRounded(self.joints[0][1], self.target[1], self.move_decay_speed, dt, .05) + self.joints[0] = (float(x), float(y)) + + for i, (joint, prev_joint) in enumerate(zip(self.joints[1:], self.joints), start=1): + + diff = np.array(prev_joint) - np.array(joint) + direction = diff / np.linalg.norm(diff) + self.joints[i] = prev_joint - direction * self.link_size + + if np.isclose(self.joints[0], self.target, atol=.05).all(): + # self.ready = True + self.joints.pop(0) + if len(self.joints) == 0: + self.ready = True + + self._drawn_points = self.joints + # void resolve(PVector pos) { + # angles.set(0, PVector.sub(pos, joints.get(0)).heading()); + # joints.set(0, pos); + # for (int i = 1; i < joints.size(); i++) { + # float curAngle = PVector.sub(joints.get(i - 1), joints.get(i)).heading(); + # angles.set(i, constrainAngle(curAngle, angles.get(i - 1), angleConstraint)); + # joints.set(i, PVector.sub(joints.get(i - 1), PVector.fromAngle(angles.get(i)).setMag(linkSize))); + # } + # } + + +class DiffSegment(): + DRAW_DECAY_SPEED = 25 + + def __init__(self, prediction: ProjectedTrack): + self.ptrack = prediction + self._last_diff_frame_idx = 0 + self.finished = False + + self.line = AppendableLine( draw_decay_speed=self.DRAW_DECAY_SPEED) + self.points: List[Coordinate] = [] + self._drawn_points = [] + self._target_track = prediction + + def finish(self): + self.finished = True + + # run on each track update received + def update_track(self, track: ProjectedTrack): + self._target_track = track + + if self.finished: + # don't add new points if finished + return + + # migrate SceneraioScene function + start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx) + traj_diff_steps_back = track.frame_index - start_frame_idx # positive value + pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value + + if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back: + logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip") + # elif len(ptrack.predictions[0]) < pred_diff_steps_back: + # logger.warning("Prediction does not reach prediction start. Should not be possible. Skip") + else: + trajectory = track.projected_history + + # from start to as far as it gets + trajectory_range = trajectory[-1*traj_diff_steps_back:] + prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space + line = [] + for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)): + offset_from_start = (pred_diff_steps_forward + i) + if offset_from_start % 4 == 0: + self.line.points.extend([p1, p2]) + self.points.extend([p1, p2]) + + self._last_diff_frame_idx = track.frame_index + + + # run each render tick + def update_drawn_positions(self, dt: DeltaT): + if isinstance(self.line, AppendableLine): + if self.finished and self.line.ready: + # convert when fully drawn + # print(self, "CONVERT LINE") + self.line = ProceduralChain.from_appendable_line(self.line) + + if isinstance(self.line, ProceduralChain): + self.line.target = self._target_track.projected_history[-1] + + # if len(self.points) == 0: + # # nothing to draw yet + # return + + # # self._drawn_points = self.points + + # if len(self._drawn_points) == 0: + # # create origin + # self._drawn_points.append(self.points[0]) + # # and drawing head + # self._drawn_points.append(self.points[0]) + + # idx = len(self._drawn_points) - 1 + # target = self.points[idx] + + # if np.isclose(self._drawn_points[-1], target, atol=.05).all(): + # # TODO: might want to migrate to np.isclose() + # if len(self._drawn_points) == len(self.points): + # self.ready = True + # return # done until a new point is added + # # add new point as drawing head + # self._drawn_points.append(self._drawn_points[-1]) + # self.ready = False + + # x = exponentialDecayRounded(self._drawn_points[-1][0], target[0], self.DRAW_DECAY_SPEED, dt, .05) + # y = exponentialDecayRounded(self._drawn_points[-1][1], target[1], self.DRAW_DECAY_SPEED, dt, .05) + # self._drawn_points[-1] = (float(x), float(y)) + + # if not self.finished or not self.line.ready: + self.line.update_drawn_positions(dt) + + def as_renderable(self) -> RenderableLines: - # lines = [] color = SrgbaColor(0,0,1,1) - points = [RenderablePoint(p, color) for p in self._drawn_points] - lines = [RenderableLine(points)] - return RenderableLines(lines) + # lines = [] + # points = [RenderablePoint(p, color) for p in self._drawn_points] + # lines = [RenderableLine(points)] + # return RenderableLines(lines) + if not self.finished or not self.line.ready: + return self.line.as_renderable(color) + return self.line.as_renderable(color) + # points = [RenderablePoint(p, color) for p in self._drawn_points] + # lines = [RenderableLine(points)] + return RenderableLines([]) class ScenarioScene(Enum): @@ -247,34 +349,9 @@ class TrackScenario(StateMachine): if len(self.prediction_diffs) == 0: return - self.prediction_diffs[-1].update_track(self.track) + for diff in self.prediction_diffs: + diff.update_track(self.track) - # ptrack = self.predictions[-1] - - # start_frame_idx = max(ptrack.frame_index, self._last_diff_frame_idx) - # traj_diff_steps_back = self.track.frame_index - start_frame_idx # positive value - # pred_diff_steps_forward = start_frame_idx - ptrack.frame_index # positive value - - # if traj_diff_steps_back < 0 or len(self.track.history) < traj_diff_steps_back: - # logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip") - # # elif len(ptrack.predictions[0]) < pred_diff_steps_back: - # # logger.warning("Prediction does not reach prediction start. Should not be possible. Skip") - # else: - # trajectory = self.track.get_projected_history(camera=self.camera) - - # # from start to as far as it gets - # trajectory_range = trajectory[-1*traj_diff_steps_back:] - # prediction_range = ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space - # line = [] - # for p1, p2 in zip(trajectory_range[::4], prediction_range[::4]): - # self.diffs.append((p1, p2)) - # # print(f"Diff for {self.track.frame_index}") - # # print(f"Start at {start_frame_idx=}, which is {traj_diff_steps_back} steps back and {pred_diff_steps_forward}steps forward") - - - # self._last_diff_frame_idx = self.track.frame_index - - def add_prediction(self, track: ProjectedTrack): @@ -379,7 +456,7 @@ class DrawnScenario(TrackScenario): self.drawn_text = "" self.drawn_text_lines: List[RenderableLine] = [] - self.anomly_score = .3 # TODO: variable + self.anomly_score = 0 # TODO: variable super().__init__() def update_drawn_positions(self) -> List: @@ -524,7 +601,13 @@ class DrawnScenario(TrackScenario): # 2. Position Marker anomaly_marker_color = SrgbaColor(0.,0.,1, 1.-self.lost_factor()) # fadeout - lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color)) + # lines.append(circle_arc(self.drawn_positions[-1][0], self.drawn_positions[-1][1], 1, t, self.anomly_score, anomaly_marker_color)) + lines.append(circle_arc( + self.drawn_positions[-1][0], self.drawn_positions[-1][1], + max(.1, self.anomly_score * 2), + 0, 1, + anomaly_marker_color) + ) # 3. Predictions if len(self.drawn_predictions):