transition part of the prediction

This commit is contained in:
Ruben van de Ven 2025-05-16 14:42:59 +02:00
parent 55d86eab45
commit bcf798e166

View file

@ -6,6 +6,7 @@ from dataclasses import dataclass
from enum import Enum
import json
import logging
import math
import pickle
import time
from typing import Dict, List, Optional, Tuple
@ -457,6 +458,8 @@ class DrawnScenario(TrackScenario):
ANOMALY_DECAY = .2
DISTANCE_ANOMALY_FACTOR = .05
MAX_HISTORY = 80
CUT_GAP = 5
def __init__(self):
# self.created_at = time.time()
@ -466,6 +469,7 @@ class DrawnScenario(TrackScenario):
self.drawn_positions: List[Coordinate] = []
self.drawn_pred_history: List[Coordinate] = []
self.drawn_predictions: List[List[Coordinate]] = []
self._current_drawn_prediction: List[Coordinate] = []
self.drawn_text = ""
self.drawn_text_lines: List[RenderableLine] = []
@ -512,66 +516,62 @@ class DrawnScenario(TrackScenario):
diff.update_drawn_positions(dt, self)
# 1. track history, direct update
MAX_HISTORY = 80
# positions = self._track.get_projected_history(None, self.camera)[-MAX_HISTORY:]
self.drawn_positions = self.track.projected_history
# self.drawn_positions = self.track.get_projected_history(None, self.camera)
# TODO)) Limit history to N points, or N lenght
# for i, pos in enumerate(self.drawn_positions):
# self.drawn_positions[i][0] = positions[i][0]
# self.drawn_positions[i][1] = positions[i][1]
# if len(positions) > len(self.drawn_positions):
# self.drawn_positions.extend(positions[len(self.drawn_positions):])
# 2. history as seen by predictor (Trajectron)
# if self.prediction_track and self.prediction_track.predictor_history:
# for i, pos in enumerate(self.drawn_pred_history):
# if len(self.prediction_track.predictor_history) > i:
# self.drawn_pred_history[i][0] = int_or_not(exponentialDecay(self.drawn_pred_history[i][0], self.prediction_track.predictor_history[i][0], 16, dt))
# self.drawn_pred_history[i][1] = int_or_not(exponentialDecay(self.drawn_pred_history[i][1], self.prediction_track.predictor_history[i][1], 16, dt))
# if len(self.prediction_track.predictor_history) > len(self.drawn_pred_history):
# self.drawn_pred_history.extend(positions[len(self.drawn_pred_history):])
self.drawn_positions = self.track.projected_history[-self.MAX_HISTORY:]
# 3. predictions
self.drawn_predictions = []
self.drawn_diffs = []
for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])):
if len(self.drawn_predictions) < len(self.predictions):
# first prediction
if len(self.drawn_predictions) == 0:
self.drawn_predictions.append(self.predictions[-1].predictions[0])
else:
# cut existing prediction
end_step = self.predictions[-1].frame_index - self.predictions[-2].frame_index + self.CUT_GAP
# print(end_step)
keep = self.drawn_predictions[-1][end_step:]
last_item: Coordinate = keep[-1]
self.drawn_predictions[-1] = self.drawn_predictions[-1][:end_step]
# print(self.predictions[-1].frame_index, self.predictions[-2].frame_index, end_step, len(keep))
ext = [last_item] * (len(self.predictions[-1].predictions[0]) - len(keep))
# print(ext)
keep.extend(ext)
self.drawn_predictions.append(keep)
for a, drawn_prediction in enumerate(self.drawn_predictions):
# origin = self.predictions[a].predictions[0][0]
origin = self.predictions[a].predictions[0][0]
for i, pos in enumerate(drawn_prediction):
# TODO: this should be done in polar space starting from origin (i.e. self.drawn_posision[-1])
decay = max(3, (18/i) if i else 10) # points further away move with more delay
decay = 16
drawn_r, drawn_angle = relativePointToPolar( origin, drawn_prediction[i])
pred_r, pred_angle = relativePointToPolar(origin, self.predictions[a].predictions[0][i])
r = exponentialDecay(drawn_r, pred_r, decay, dt)
# make circular coordinates transition through the smaller arc
if abs(drawn_angle - pred_angle) > math.pi:
pred_angle -= math.pi * 2
angle = exponentialDecay(drawn_angle, pred_angle, decay, dt)
x, y = relativePolarToPoint(origin, r, angle)
self.drawn_predictions[a][i] = int_or_not(x), int_or_not(y)
# self.drawn_predictions[i][0] = int(exponentialDecay(self.drawn_predictions[i][0], self.prediction_track.predictions[i][0], decay, dt))
# self.drawn_predictions[i][1] = int(exponentialDecay(self.drawn_predictions[i][1], self.prediction_track.predictions[i][1], decay, dt))
prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track
if next_ptrack is not None:
# not the last one, cut off
next_ptrack: ProjectedTrack = self.predictions[a+1]
end_step = next_ptrack.frame_index - ptrack.frame_index
# self.drawn_predictions = []
# for a, (ptrack, next_ptrack) in enumerate(zip(self.predictions, [*self.predictions[1:], None])):
# prediction = ptrack.predictions[0] # only use one prediction per timestep/frame/track
# if next_ptrack is not None:
# # not the last one, cut off
# next_ptrack: ProjectedTrack = self.predictions[a+1]
# end_step = next_ptrack.frame_index - ptrack.frame_index
else:
end_step = None # not last item; show all
self.drawn_predictions.append(ptrack.predictions[0][:end_step])
# else:
# end_step = None # not last item; show all
# self.drawn_predictions.append(ptrack.predictions[0][:end_step])
# # diff
# diff_steps_back = ptrack.frame_index - self.track.frame_index
# if len(self.drawn_positions) < -1*diff_steps_back:
# logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# pass
# else:
# # trajectory_range = self.camera.[d.get_foot_coords() for d in trajectory_det_range] # in frame coordinate space
# trajectory_range = self.drawn_positions[diff_steps_back:diff_steps_back+end_step]
# prediction_range = ptrack.predictions[0][:end_step] # in world coordinate space
# line = []
# for p1, p2 in zip(trajectory_range[::4], prediction_range[::4]):
# line.extend([
# p1, p2
# ])
# if len(line):
# self.drawn_diffs.append(line)
diff_line = []
for p1, p2 in self.diffs[::4]:
diff_line.extend([p1, p2])
# self.drawn_diffs.append(diff_line)
self.drawn_diffs = [diff_line]
# Animate line as procedural chain https://www.youtube.com/watch?v=qlfh_rv6khY&t=183s