draw training lines

This commit is contained in:
Ruben van de Ven 2025-10-31 19:21:49 +01:00
parent 9bcab3fae7
commit c6747ac8e6
6 changed files with 593 additions and 30 deletions

181
trap/anomaly.py Normal file
View file

@ -0,0 +1,181 @@
from __future__ import annotations
import logging
from typing import List
import numpy as np
from trap.base import ProjectedTrack
from trap.lines import AppendableLine, Coordinate, DeltaT, ProceduralChain, RenderableLines, SrgbaColor, StaticLine
logger = logging.getLogger('anomaly')
def calc_anomaly(segments: List[DiffSegment], window: int = 3):
"""Calculate anomaly score based on provided segments
considering a sliding window of the last n items
"""
relevant_segments = segments[-window:]
scores = [s.avg_score() for s in relevant_segments]
s = list(filter(lambda x: x is not None,scores))
return np.average(s)
class DiffSegment():
"""
A segment of a prediction track, that can be diffed
with a track. The track is continously updated.
If a new prediction comes in, the diff is marked as
finished. After which it is animated and added to the
Scenario's anomaly score.
"""
DRAW_DECAY_SPEED = 25
POINT_INTERVAL = 4
def __init__(self, prediction: ProjectedTrack):
self.ptrack = prediction
self._last_diff_frame_idx: int = 0
self.finished = False
self.line = StaticLine()
self.points: List[Coordinate] = []
self._drawn_points = []
self._target_track = prediction
self.score = 0
def finish(self):
self.finished = True
def nr_of_passed_points(self) -> int:
if not self._last_diff_frame_idx:
return 0
return self._last_diff_frame_idx - self.ptrack.frame_index
# if isinstance(self.line, AppendableLine):
# return self.line.nr_of_passed_points() * self.POINT_INTERVAL
# else:
# return len(self.points) * self.POINT_INTERVAL
def avg_score(self):
frames_passed = self.nr_of_passed_points()
if not frames_passed:
return None
else:
return self.score/frames_passed
# run on each track update received
def update_track(self, track: ProjectedTrack):
self._target_track = track
if self.finished:
# don't add new points if finished
return
# migrate SceneraioScene function
start_frame_idx = max(self.ptrack.frame_index, self._last_diff_frame_idx)
traj_diff_steps_back = track.frame_index - start_frame_idx # positive value
pred_diff_steps_forward = start_frame_idx - self.ptrack.frame_index # positive value
if traj_diff_steps_back < 0 or len(track.history) < traj_diff_steps_back:
logger.warning("Track history doesn't reach prediction start. Should not be possible. Skip")
# elif len(ptrack.predictions[0]) < pred_diff_steps_back:
# logger.warning("Prediction does not reach prediction start. Should not be possible. Skip")
else:
trajectory = track.projected_history
# from start to as far as it gets
trajectory_range = trajectory[-1*traj_diff_steps_back:]
prediction_range = self.ptrack.predictions[0][pred_diff_steps_forward:] # in world coordinate space
line = []
for i, (p1, p2) in enumerate(zip(trajectory_range, prediction_range)):
diff = (p1[0]-p2[0], p1[1]-p2[1])
self.score += np.linalg.norm(diff)
offset_from_start = (pred_diff_steps_forward + i)
if offset_from_start % self.POINT_INTERVAL == 0:
self.line.extend([p1, p2])
self.points.extend([p1, p2])
self._last_diff_frame_idx = track.frame_index
# # run each render tick
# def update_drawn_positions(self, dt: DeltaT):
# if isinstance(self.line, AppendableLine):
# if self.finished and self.line.ready:
# # convert when fully drawn
# # print(self, "CONVERT LINE")
# self.line = ProceduralChain.from_appendable_line(self.line)
# if isinstance(self.line, ProceduralChain):
# self.line.target = self._target_track.projected_history[-1]
# # if not self.finished or not self.line.ready:
# self.line.update_drawn_positions(dt)
def as_renderable(self) -> RenderableLines:
color = SrgbaColor(0,0,1,1)
# if not self.finished or not self.line.ready:
return self.line.as_renderable(color)
# return self.line.as_renderable(color)
def calculate_loitering_scores(track: ProjectedTrack, min_duration_to_linger, linger_factor, velocity_threshold, window = None):
"""
Calculates a loitering score (0-1) for each track.
Args:
tracks: A list of tracks, where each track is a list of (frame_id, x, y, width, height).
min_duration_to_linger: Minimum number of frames to start considering a segment as lingering.
linger_factor: Divide number of lingering frames by 'linger_factor' to get a score 0-1
velocity_threshold: Maximum velocity (meters/frame) to consider as lingering.
Returns:
A generator providing loitering scores for each frame
"""
total_frames = len(track.projected_history)
if total_frames < 2:
return 0.0 # Not enough data
offset = window * -1 if window is not None else 0
x_coords = [t[0] for t in track.projected_history[offset:]]
y_coords = [t[1] for t in track.projected_history[offset:]]
# Calculate velocities
velocities = np.sqrt(np.diff(x_coords)**2 + np.diff(y_coords)**2)
# Calculate distances
# distances = np.diff(x_coords)
# distances_y = np.diff(y_coords)
# distances_total = np.sqrt(distances**2 + distances_y**2)
linger_duration = 0
linger_frames = 0
for i in range(len(velocities)):
if velocities[i] < velocity_threshold:
linger_duration += 1
if linger_duration >= min_duration_to_linger:
linger_frames +=1
else:
# decay if moving faster
linger_duration = max(linger_duration - 3, 0)
linger_frames = max(linger_frames - 3, 0)
# Calculate loitering score
if total_frames > 0:
loitering_score = min(1, max(0, linger_frames / linger_factor))
else:
loitering_score = 0.0
yield loitering_score

View file

@ -25,6 +25,7 @@ from bytetracker.basetrack import TrackState as ByteTrackTrackState
import pandas as pd
from shapely import Point
from trap.anomaly import calculate_loitering_scores
from trap.utils import get_bins, inv_lerp, lerp
from trajectron.environment import Environment, Node, Scene
from urllib.parse import urlparse

View file

@ -456,7 +456,7 @@ class LineAnimator(StaticLine):
# print(self, target_line, bool(target_line), target_line is not None)
self.target = target_line if target_line is not None else StaticLine()
self.ready = len(self.target) == 0
self.start_t = time.time()
self.start_t = time.perf_counter()
self.skip = False
def extend(self, coords):
@ -478,15 +478,20 @@ class LineAnimator(StaticLine):
def is_ready(self):
return (self.ready or self.skip) and self.target.is_ready()
def start(self):
self.target.start()
self.start_t = time.time()
self.start_t = time.perf_counter()
return True
def is_running(self):
# when ready, consider not running
return bool(self.start_t) and not self.is_ready()
def running_for(self):
if self.start_t:
return time.time() - self.start_t
return time.perf_counter() - self.start_t
return 0.
@ -619,6 +624,77 @@ class CropLine(LineAnimator):
def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine:
target_line.points = target_line.points[-1 * self.max_points:]
return target_line
class CropAnimationLine(LineAnimator):
"""
Similar to segment, but on points instead of lenght, and as animation
"""
def __init__(self, target_line = None, max_points = 200, assume_fps=12):
super().__init__(target_line)
self.max_points = max_points
self.assume_fps = assume_fps
# self.ready = True # static filter, always ready
# def set_frame_offset(self, frame_offset: int):
# self.frame_offset = frame_offset
def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine:
dt = self.running_for()
frame_offset = int(dt * self.assume_fps)
max_points = int(self.max_points(dt)) if callable(self.max_points) else self.max_points
head = frame_offset + 1
tail = max(0, frame_offset + 1 - max_points)
# print(self.running_for(), frame_offset, head,tail)
target_line.points = target_line.points[tail:head]
self.ready = len(target_line.points) < 1
return target_line
class FadedEndsLine(LineAnimator):
"""
Static filter; Fade the tail of the line. Always (not only cropped)
"""
def __init__(self, target_line = None, in_fade_steps: int = 30, out_fade_steps: int = 30):
super().__init__(target_line)
self.ready = True
self.fade_in_steps = in_fade_steps
self.fade_out_steps = out_fade_steps
def apply(self, target_line: RenderableLine, dt: DeltaT) -> RenderableLine:
l = len(target_line.points)
points = []
# TODO: fractional divide if fade_in and out are not equal
half_points = l // 2
fade_in = min(self.fade_in_steps, half_points)
fade_out = min(self.fade_out_steps, half_points)
for i, point in enumerate(target_line.points):
if i < fade_in:
t = i / self.fade_in_steps
elif i > (l - fade_out):
t = 1 - (i - (l - fade_out)) / self.fade_out_steps
else:
t = 1
alpha = t
if alpha >= 0:
alpha = min(1, alpha)
point.color = point.color.as_faded(alpha)
points.append(point)
return RenderableLine(points)
class FadedTailLine(LineAnimator):
"""
@ -984,6 +1060,9 @@ class LineAnimationStack():
def is_ready(self):
return self.tail.is_ready()
def is_running(self):
return self.tail.is_running()
class LineAnimationSequenceStep(NamedTuple):
line: LineAnimator
@ -1013,7 +1092,7 @@ class LineAnimationSequence():
def start(self):
self.start_at = time.time()
self.start_at = time.perf_counter()
self.idx = 0
def as_renderable_line(self, dt: DeltaT):

50
trap/renderable.proto Normal file
View file

@ -0,0 +1,50 @@
syntax = "proto3";
package renderable;
// Enum for coordinate spaces
enum CoordinateSpace {
UNDEFINED=0;
CAMERA = 1;
UNDISTORTED_CAMERA = 2;
WORLD = 3;
LASER = 4;
RAW_LASER = 8;
}
// Message for RenderablePosition (Tuple[float, float])
message RenderablePosition {
float x = 1;
float y = 2;
}
// Message for SrgbaColor
message SrgbaColor {
float red = 1;
float green = 2;
float blue = 3;
float alpha = 4;
}
// Message for RenderablePoint
message RenderablePoint {
RenderablePosition position = 1;
SrgbaColor color = 2;
}
// Message for RenderableLine
message RenderableLine {
repeated RenderablePoint points = 1;
}
// Message for RenderableLines
message RenderableLines {
repeated RenderableLine lines = 1;
CoordinateSpace space = 2;
}
// Message to represent RenderableLayers (Dict[int, RenderableLines])
message RenderableLayers {
map<int32, RenderableLines> layers = 1;
}

View file

@ -8,7 +8,9 @@ from functools import partial
import json
import logging
from math import inf
import math
from pathlib import Path
import random
import time
import threading
from typing import Dict, Generator, List, Optional, Type, TypeVar
@ -19,7 +21,7 @@ import zmq
from trap.anomaly import DiffSegment, calc_anomaly, calculate_loitering_scores
from trap.base import CameraAction, DataclassJSONEncoder, Frame, HomographyAction, ProjectedTrack, Track
from trap.counter import CounterSender
from trap.lines import AppendableLine, AppendableLineAnimator, Coordinate, CropLine, DashedLine, DeltaT, FadeOutJitterLine, FadeOutLine, FadedTailLine, LineAnimationStack, LineAnimator, NoiseLine, RenderableLayers, RenderableLine, RenderableLines, SegmentLine, SimplifyMethod, SrgbaColor, StaticLine, layers_to_message, load_lines_from_svg
from trap.lines import AppendableLine, AppendableLineAnimator, Coordinate, CoordinateSpace, CropAnimationLine, CropLine, DashedLine, DeltaT, FadeOutJitterLine, FadeOutLine, FadedEndsLine, FadedTailLine, LineAnimationStack, LineAnimator, NoiseLine, RenderableLayers, RenderableLine, RenderableLines, SegmentLine, SimplifyMethod, SrgbaColor, StaticLine, layers_to_message, load_lines_from_svg
from trap.node import Node
from trap.track_history import TrackHistory
@ -98,6 +100,8 @@ class Scenario:
self.is_running = False
self.loitering_factor = 0
logger.info(f"Found {self.track_id}: {self.scene.name}")
def start(self):
@ -114,7 +118,7 @@ class Scenario:
if self.take_over_at:
return
self.take_over_at = time.time()
self.take_over_at = time.perf_counter()
def taken_over(self):
self.is_running = False
@ -122,7 +126,7 @@ class Scenario:
def takenover_for(self):
if self.take_over_at:
return time.time() - self.take_over_at
return time.perf_counter() - self.take_over_at
return None
def takeover_factor(self):
@ -134,7 +138,7 @@ class Scenario:
def lost_for(self):
if self.scene is ScenarioScene.LOST:
return time.time() - self.state_change_at
return time.perf_counter() - self.state_change_at
return None
def lost_factor(self):
@ -144,7 +148,7 @@ class Scenario:
return l/LOST_FADEOUT
def anomaly_factor(self):
return calc_anomaly(self.prediction_diffs, 10)
return calc_anomaly(self.prediction_diffs)
def deactivate(self):
self.take_over_at = None
@ -161,7 +165,7 @@ class Scenario:
logger.info(f"Changing scene for {self.track_id}: {self.scene.name} -> {scene.name}")
self.scene = scene
self.state_change_at = time.time()
self.state_change_at = time.perf_counter()
def update_state(self):
self.check_lost() or self.check_loitering() or self.check_track()
@ -175,7 +179,9 @@ class Scenario:
def check_loitering(self):
scores = [s for s in calculate_loitering_scores(self.track, LOITERING_DURATION_TO_LINGER, LOITERING_LINGER_FACTOR, LOITERING_VELOCITY_TRESHOLD/TRACK_ASSUMED_FPS, 150)]
if scores[-1] > .99:
self.loitering_factor = scores[-1]
if self.loitering_factor > .99:
self.set_scene(ScenarioScene.LOITERING)
return True
return False
@ -284,9 +290,12 @@ class DrawnScenario(Scenario):
# when rendering tracks from others similar/close to the current one
self.others_color = SrgbaColor(1,1,0,1)
self.line_others = LineAnimationStack(StaticLine([], self.others_color))
self.line_others.add(SegmentLine(self.line_others.tail, duration=3, anim_f=partial(SegmentLine.anim_grow, in_and_out=True, max_len=5)))
# self.line_others.add(SegmentLine(self.line_others.tail, duration=3, anim_f=partial(SegmentLine.anim_grow, in_and_out=True, max_len=5)))
self.line_others.add(CropAnimationLine(self.line_others.tail, lambda dt: 10 + math.sin(dt/4) * 70, assume_fps=TRACK_ASSUMED_FPS*2)) # speed up
self.line_others.add(NoiseLine(self.line_others.tail, amplitude=0, t_factor=.3))
# self.line_others.add(DashedLine(self.line_others.tail, t_factor=4, loop_offset=True))
# self.line_others.get(DashedLine).skip = True
self.line_others.add(FadedEndsLine(self.line_others.tail, 30, 30))
self.line_others.add(FadeOutLine(self.line_others.tail))
self.line_others.get(FadeOutLine).set_alpha(0)
@ -340,9 +349,9 @@ class DrawnScenario(Scenario):
# special case: LOITERING
if self.scene is ScenarioScene.LOITERING or self.state_change_at:
if self.scene is ScenarioScene.LOITERING: # or self.state_change_at:
# logger.info('loitering')
transition = min(1, (time.time() - self.state_change_at)/1.4)
transition = min(1, (time.perf_counter() - self.state_change_at)/1.4)
# TODO: transition fade, using to_alpha(), so it can fade back in again:
@ -362,26 +371,38 @@ class DrawnScenario(Scenario):
self.tracks_to_self_fetched_at = time.perf_counter()
# fetch lines nearby
track_ids = stage.history.get_nearest_tracks(current_position, 15)
track_ids = stage.history.get_nearest_tracks(current_position, 30)
self.track_ids_to_self = iter(track_ids)
self.tracks_to_self = stage.history.ids_as_trajectory(track_ids)
print(time.perf_counter() - t, "fetch delya")
print(time.perf_counter() - t, "fetch delay")
if self.tracks_to_self and self.line_others.is_ready():
current_history_id = next(self.track_ids_to_self)
current_history = next(self.tracks_to_self)
# if self.tracks_to_self and not len(self.line_others.root.points):
if self.tracks_to_self and not self.line_others.is_running():
try:
current_history = next(self.tracks_to_self)
current_history_id = next(self.track_ids_to_self)
self.line_others.get(CropAnimationLine).assume_fps += TRACK_ASSUMED_FPS*1.5 # faster each time
self.line_others.get(NoiseLine).amplitude = .05
logger.info(f"play history item: {current_history_id}")
self.line_others.get(FadeOutLine).set_alpha(1)
self.line_others.root.points = current_history
# print(self.line_others.root.points)
self.line_others.start()
logger.info(f"play history item: {current_history_id}")
self.line_others.get(FadeOutLine).set_alpha(1)
self.line_others.root.points = current_history
# print(self.line_others.root.points)
self.line_others.start()
except StopIteration as e:
pass
# logger.info("Exhausted similar tracks?")
else:
# reset loitering values
self.line_others.get(CropAnimationLine).assume_fps = TRACK_ASSUMED_FPS*2
self.line_others.get(NoiseLine).amplitude = 0
# special case: PLAY
elif self.scene is ScenarioScene.PLAY:
if self.scene is ScenarioScene.PLAY:
pass
# if self.scene is ScenarioScene.CORRECTED_PREDICTION:
# self.line_prediction.get(DashedLine).skip = False
@ -391,8 +412,10 @@ class DrawnScenario(Scenario):
def to_renderable_lines(self, dt: DeltaT) -> RenderableLines:
# each scene is handled differently:
t1 = time.perf_counter()
# 1) history, fade out when lost
# self.line_history.get(StaticLine).color = SrgbaColor(1, 0, 1-self.anomaly_factor(), 1)
self.line_history.get(FadeOutJitterLine).set_alpha(1-self.lost_factor())
self.line_prediction.get(FadeOutLine).set_alpha(1-self.lost_factor())
self.line_history.get(NoiseLine).amplitude = self.lost_factor()
@ -401,23 +424,60 @@ class DrawnScenario(Scenario):
track_age_in_frames = self.track_age() * TRACK_ASSUMED_FPS
self.line_history.get(FadedTailLine).set_frame_offset(track_age_in_frames)
t2 = time.perf_counter()
# 2) also fade-out when moving into loitering mode.
# when fading out is done, start drawing historical data
history_line = self.line_history.as_renderable_line(dt)
t3 = time.perf_counter()
prediction_line = self.line_prediction.as_renderable_line(dt)
t4 = time.perf_counter()
others_line = self.line_others.as_renderable_line(dt)
t5 = time.perf_counter()
# print(history_line)
# print(self.track_id, len(self.line_history.points), len(history_line))
timings = (t5-t4, t4-t3, t3-t2, t2-t1)
return RenderableLines([
history_line,
prediction_line,
others_line
])
]), timings
class DatasetDrawer():
def __init__(self, stage: Stage):
self.stage = stage
line_color = SrgbaColor(0,1,1,1)
self.track_line = LineAnimationStack(StaticLine([], line_color))
self.track_line.add(CropAnimationLine(self.track_line.tail, 100, assume_fps=TRACK_ASSUMED_FPS*30)) # speed up
# self.track_line.add(DashedLine(self.track_line.tail, t_factor=4, loop_offset=True))
# self.track_line.get(DashedLine).skip = True
self.track_line.add(FadedEndsLine(self.track_line.tail, 10, 10))
# self.track_line.add(FadeOutLine(self.track_line.tail))
# self.track_line.get(FadeOutLine).set_alpha(1)
def to_renderable_lines(self, dt: DeltaT):
lines = RenderableLines([], CoordinateSpace.WORLD)
if not self.track_line.is_running():
track_id = random.choice(list(self.stage.history.state.tracks.keys()))
# print('track_id', track_id)
positions = self.stage.history.state.track_histories[track_id]
self.track_line.root.points = positions
self.track_line.start()
lines.lines.append(
self.track_line.as_renderable_line(dt)
)
# print(lines)
return lines
class Stage(Node):
@ -441,6 +501,8 @@ class Stage(Node):
self.debug_lines = RenderableLines(load_lines_from_svg(self.config.debug_map, 100, debug_color))
self.history = TrackHistory(self.config.tracker_output_dir, self.config.camera, self.config.cache_path)
self.auxilary = DatasetDrawer(self)
@ -536,10 +598,15 @@ class Stage(Node):
# TODO: sometimes very slow!
t1 = time.perf_counter()
timings = []
for scenario in self.active_scenarios:
lines.append_lines(scenario.to_renderable_lines(dt))
scenario_lines, timing = scenario.to_renderable_lines(dt)
lines.append_lines(scenario_lines)
timings.append(timing)
t2 = time.perf_counter()
training_lines = self.auxilary.to_renderable_lines(dt)
t2b = time.perf_counter()
rl = lines.as_simplified(SimplifyMethod.RDP, .003) # or segmentise (see shapely)
self.counter.set("stage.lines", len(lines.lines))
self.counter.set("stage.points_orig", lines.point_count())
@ -550,6 +617,7 @@ class Stage(Node):
layers: RenderableLayers = {
1: lines,
2: self.debug_lines,
3: training_lines,
}
t4 = time.perf_counter()
@ -564,12 +632,13 @@ class Stage(Node):
t6 = time.perf_counter()
t = (t2-t1, t3-t2, t4-t3, t5-t4, t6-t5)
t = (t2-t1, t3-t2b, t2b-t2, t4-t3, t5-t4, t6-t5)
if sum(t) > .1:
print(t)
print(len(lines.lines))
print(lines.point_count())
print(len(msg))
print('scenario timings:', timings)
# print(msg)
# exit()

183
trap/track_history.py Normal file
View file

@ -0,0 +1,183 @@
from dataclasses import dataclass
import logging
from pathlib import Path
import pickle
from threading import Lock
import time
from typing import Dict, Iterable, List, Optional, Set
import numpy as np
from trap.base import Camera, Track
from trap.lines import Coordinate
from trap.tracker import FinalDisplacementFilter, Smoother, TrackReader
from scipy.spatial import KDTree
logger = logging.getLogger('history')
@dataclass
class TrackHistoryState():
"""
The lock of TrackHistory is not pickle-able so separate it into a separate state
"""
tracks: List[Track]
track_histories: Dict[str, np.ndarray]
indexed_track_ids: List[str]
tree: KDTree
class TrackHistory():
def __init__(self, path: Path, camera: Camera, cache_path: Optional[Path]):
self.path = path
self.camera = camera
self.cache_path = cache_path
self.lock = Lock()
self.load_from_cache() or self.reload()
def load_from_cache(self):
if self.cache_path.exists():
logger.debug("Load history state from cache")
with self.cache_path.open('rb') as fp:
try:
state = pickle.load(fp)
if not isinstance(state, TrackHistoryState):
raise RuntimeError("Pickled data is not a trackhistorystate")
self.state = state
return True
except Exception as e:
logger.warning(f"Cannot read cache {self.cache_path}: {e}")
return False
def build_tree(self):
reader = TrackReader(self.path, self.camera.fps)
logger.debug(f'loaded {len(reader)} tracks')
track_filter = FinalDisplacementFilter(2)
tracks = track_filter.apply(reader, self.camera)
logger.debug(f'after filtering left with {len(tracks)} tracks')
tracks: List[Track] = [t.get_with_interpolated_history() for t in tracks]
logger.debug(f'interpolated {len(tracks)} tracks')
# use convolution here, because precision does not matter and it is _way_ faster
smoother = Smoother(convolution=True)
tracks = [smoother.smooth_track(t) for t in tracks]
logger.debug(f'smoothed')
tracks = {track.track_id: track for track in tracks}
track_histories = {t.track_id: t.get_projected_history(camera=self.camera) for t in tracks.values()}
downsampled_histories = {t_id: self.downsample_history(h) for t_id, h in track_histories.items()}
logger.debug(f'projected to world space')
# Sample data (coordinates and metadata)
# coordinates = [(1, 2, 'Point A'), (3, 4, 'Point B'), (5, 6, 'Point C'), (7, 8, 'Point D')]
all_points = []
indexed_track_ids: List[str] = []
for track_id, history in downsampled_histories.items():
all_points.extend([
[point[0], point[1]] for point in history
])
indexed_track_ids.extend([track_id] * len(history))
# self.flat_idx = self.flat_histories[:,2]
# Create the KD-Tree
tree = KDTree(all_points)
logger.debug('built tree')
return TrackHistoryState(
tracks, track_histories, indexed_track_ids, tree
)
def reload(self):
state = self.build_tree()
# aquire lock as brief as possible
with self.lock:
self.state = state
if self.cache_path:
with self.cache_path.open('wb') as fp:
logger.debug("Writing history to cache")
pickle.dump(self.state, fp)
def get_nearest_tracks(self, point: Coordinate, k:int, max_r: Optional[float] = np.inf):
with self.lock:
distances, indexes = self.state.tree.query(point, k, distance_upper_bound=max_r)
# filter out when there's no
indexes = indexes[distances != np.inf]
track_ids: Set[str] = {self.state.indexed_track_ids[idx] for idx in indexes}
# nearby_indexes = self.tree.query_ball_point(point, r)
# track_ids = set([self.flat_idx[idx] for idx in nearby_indexes])
return track_ids
def ids_as_trajectory(self, track_ids: Iterable[str]):
for track_id in track_ids:
yield self.state.tracks[track_id].get_projected_history(camera=self.camera)
@classmethod
def downsample_history(cls, history, cell_size=.3):
if not len(history):
return []
positions = np.unique(np.round(history / cell_size), axis=0) * cell_size
return positions
if __name__ == "__main__":
path = Path("EXPERIMENTS/raw/hof3/")
logging.basicConfig(level=logging.DEBUG)
calibration_path = Path("../DATASETS/hof3/calibration.json")
homography_path = Path("../DATASETS/hof3/homography.json")
camera = Camera.from_paths(calibration_path, homography_path, 12)
# device = device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
s = time.time()
history = TrackHistory(path, camera, Path("/tmp/historystate_hof3.pcl"))
dt = time.time() - s
print(f'loaded {len(history.state.tracks)} tracks in {dt}s')
track = list(history.state.tracks.values())[25]
trajectory_crop = TrackHistory.downsample_history(history.state.track_histories[track.track_id])
trajectory_org = track.get_projected_history(camera=camera)
target_point = trajectory_org[len(trajectory_org)//2+90]
import matplotlib.pyplot as plt # Visualization
track_set = history.get_nearest_tracks(target_point, 10, max_r=np.inf)
plt.gca().set_aspect('equal')
plt.scatter(trajectory_crop[:,0], trajectory_crop[:,1], c='orange')
plt.plot(trajectory_org[:,0], trajectory_org[:,1], c='blue', alpha=1)
plt.scatter(target_point[0], target_point[1], c='red', alpha=1)
for track_id in track_set:
closeby = history.state.tracks[track_id].get_projected_history(camera=camera)
plt.plot(closeby[:,0], closeby[:,1], c='green', alpha=.1)
plt.show()