From bcf798e1667f4ee60152b8e3230f0bc7e8956224 Mon Sep 17 00:00:00 2001 From: Ruben van de Ven Date: Fri, 16 May 2025 14:42:59 +0200 Subject: [PATCH] transition part of the prediction --- trap/stage.py | 108 +++++++++++++++++++++++++------------------------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/trap/stage.py b/trap/stage.py index 7cfbe2a..d00f547 100644 --- a/trap/stage.py +++ b/trap/stage.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from enum import Enum import json import logging +import math import pickle import time from typing import Dict, List, Optional, Tuple @@ -457,6 +458,8 @@ class DrawnScenario(TrackScenario): ANOMALY_DECAY = .2 DISTANCE_ANOMALY_FACTOR = .05 + MAX_HISTORY = 80 + CUT_GAP = 5 def __init__(self): # self.created_at = time.time() @@ -466,6 +469,7 @@ class DrawnScenario(TrackScenario): self.drawn_positions: List[Coordinate] = [] self.drawn_pred_history: List[Coordinate] = [] self.drawn_predictions: List[List[Coordinate]] = [] + self._current_drawn_prediction: List[Coordinate] = [] self.drawn_text = "" self.drawn_text_lines: List[RenderableLine] = [] @@ -512,66 +516,62 @@ class DrawnScenario(TrackScenario): diff.update_drawn_positions(dt, self) # 1. track history, direct update - MAX_HISTORY = 80 + # positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:] - self.drawn_positions = self.track.projected_history - # self.drawn_positions = self.track.get_projected_history(None, self.camera) - # TODO)) Limit history to N points, or N lenght - # for i, pos in enumerate(self.drawn_positions): - # self.drawn_positions[i][0] = positions[i][0] - # self.drawn_positions[i][1] = positions[i][1] - - # if len(positions) > len(self.drawn_positions): - # self.drawn_positions.extend(positions[len(self.drawn_positions):]) - - # 2. history as seen by predictor (Trajectron) - # if self.prediction_track and self.prediction_track.predictor_history: - # for i, pos in enumerate(self.drawn_pred_history): - # if len(self.prediction_track.predictor_history) > i: - # self.drawn_pred_history[i][0] = int_or_not(exponentialDecay(self.drawn_pred_history[i][0], self.prediction_track.predictor_history[i][0], 16, dt)) - # self.drawn_pred_history[i][1] = int_or_not(exponentialDecay(self.drawn_pred_history[i][1], self.prediction_track.predictor_history[i][1], 16, dt)) - - # if len(self.prediction_track.predictor_history) > len(self.drawn_pred_history): - # self.drawn_pred_history.extend(positions[len(self.drawn_pred_history):]) - + self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:] + # 3. predictions - self.drawn_predictions = [] - self.drawn_diffs = [] - for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])): + if len(self.drawn_predictions) < len(self.predictions): + # first prediction + if len(self.drawn_predictions) == 0: + self.drawn_predictions.append(self.predictions[-1].predictions[0]) + else: + # cut existing prediction + end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP + # print(end_step) + keep = self.drawn_predictions[-1][end_step:] + last_item: Coordinate = keep[-1] + self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step] + # print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep)) + ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep)) + # print(ext) + keep.extend(ext) + self.drawn_predictions.append(keep) + + for a, drawn_prediction in enumerate(self.drawn_predictions): + # origin = self.predictions[a].predictions[0][0] + origin = self.predictions[a].predictions[0][0] + for i, pos in enumerate(drawn_prediction): + # TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1]) + decay = max(3, (18/i) if i else 10) # points further away move with more delay + decay = 16 + drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i]) + pred_r, pred_angle = relativePointToPolar(origin, self.predictions[a].predictions[0][i]) + r = exponentialDecay(drawn_r, pred_r, decay, dt) + + # make circular coordinates transition through the smaller arc + if abs(drawn_angle - pred_angle) > math.pi: + pred_angle -= math.pi * 2 + angle = exponentialDecay(drawn_angle, pred_angle, decay, dt) + x, y = relativePolarToPoint(origin, r, angle) + self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y) + # self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt)) + # self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt)) - prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track - if next_ptrack is not None: - # not the last one, cut off - next_ptrack: ProjectedTrack = self.predictions[a+1] - end_step = next_ptrack.frame_index - ptrack.frame_index + # self.drawn_predictions = [] + # for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])): + + # prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track + # if next_ptrack is not None: + # # not the last one, cut off + # next_ptrack: ProjectedTrack = self.predictions[a+1] + # end_step = next_ptrack.frame_index - ptrack.frame_index - else: - end_step = None # not last item; show all - self.drawn_predictions.append(ptrack.predictions[0][:end_step]) + # else: + # end_step = None # not last item; show all + # self.drawn_predictions.append(ptrack.predictions[0][:end_step]) - # # diff - # diff_steps_back = ptrack.frame_index - self.track.frame_index - # if len(self.drawn_positions) < -1*diff_steps_back: - # logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip") - # pass - # else: - # # trajectory_range = self.camera.[d.get_foot_coords() for d in trajectory_det_range] # in frame coordinate space - # trajectory_range = self.drawn_positions[diff_steps_back:diff_steps_back+end_step] - # prediction_range = ptrack.predictions[0][:end_step] # in world coordinate space - # line = [] - # for p1, p2 in zip(trajectory_range[::4], prediction_range[::4]): - # line.extend([ - # p1, p2 - # ]) - # if len(line): - # self.drawn_diffs.append(line) - - diff_line = [] - for p1, p2 in self.diffs[::4]: - diff_line.extend([p1, p2]) - # self.drawn_diffs.append(diff_line) - self.drawn_diffs = [diff_line] # Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s