predict position instead of velocity

This commit is contained in:
Ruben van de Ven 2025-01-02 16:24:00 +01:00
parent 06181c8440
commit 41f319b9e2
3 changed files with 100 additions and 38 deletions

View file

@ -96,7 +96,7 @@
},
"pred_state": {
"PEDESTRIAN": {
"velocity": [
"position": [
"x",
"y"
]

View file

@ -501,7 +501,7 @@ def decorate_frame(frame: Frame, tracker_frame: Frame, prediction_frame: Frame,
# draw_track(img, track, int(track_id))
draw_trackjectron_history(img, track, int(track.track_id), convert_world_points_to_img_points)
anim_position = get_animation_position(track, frame)
draw_track_predictions(img, track, int(track.track_id)+1, config.camera, convert_world_points_to_img_points, anim_position=anim_position)
draw_track_predictions(img, track, int(track.track_id)+1, config.camera, convert_world_points_to_img_points, anim_position=anim_position, as_clusters=True)
cv2.putText(img, f"{len(track.predictor_history) if track.predictor_history else 'none'}", to_point(track.history[0].get_foot_coords()), cv2.FONT_HERSHEY_COMPLEX, 1, (255,255,255), 1)
if prediction_frame.maps:
for i, m in enumerate(prediction_frame.maps):

View file

@ -1,3 +1,4 @@
from __future__ import annotations
from argparse import Namespace
from dataclasses import dataclass
import json
@ -10,6 +11,7 @@ import jsonlines
import numpy as np
import pandas as pd
import shapely
from shapely.ops import split
import trap.tracker
from trap.config import parser
from trap.frame_emitter import Camera, Detection, DetectionState, video_src_from_config, Frame
@ -225,43 +227,87 @@ from sklearn.cluster import AgglomerativeClustering
@dataclass
class PointCluster:
point: np.ndarray
start: np.ndarray
source_points: List[np.ndarray]
probability: float
next_point_clusters: List[PointCluster]
def cluster_predictions_by_radius(start_point, lines, radius = .5):
def cluster_predictions_by_radius(start_point, lines: Iterable[np.ndarray] | LineString, radius = .5, p_factor = 1.) -> List[PointCluster]:
# start = lines[0][0]
p0 = Point(*start_point)
print(lines[0][0], start_point)
# print(lines[0][0], start_point)
circle = p0.buffer(radius).boundary
# print(lines)
# print([line.tolist() for line in lines])
linestrings = [LineString(line.tolist()) for line in lines]
intersections = [circle.intersection(line) for line in linestrings]
print(intersections)
intersections = [p if type(p) is Point else p.geoms[0] for p in intersections]
intersections = []
remaining_lines = []
for line in lines:
linestring = line if type(line) is LineString else LineString(line.tolist())
intersection = circle.intersection(linestring)
if type(intersection) is LineString and intersection.is_empty:
# No intersection with circle, a dangling endpoint that we can skip
continue
clustering = AgglomerativeClustering(None, linkage="ward", distance_threshold=radius/2)
# TODO)) test with cosine distance. because it should not be equal to radius
assigned_clusters = clustering.fit_predict(intersections)
if type(intersection) is not Point:
# with multiple intersections: use only the first one
intersection = intersection.geoms[0]
# set a buffer around the intersection to assure a match is fond oun the line
split_line = split(linestring, intersection.buffer(.01))
remaining_line = split_line.geoms[2] if len(split_line.geoms) > 2 else None
# print(intersection, split_line)
intersections.append(intersection)
remaining_lines.append(remaining_line)
if len(intersections) < 1:
return []
# linestrings = [LineString(line.tolist()) for line in lines]
# intersections = [circle.intersection(line) for line in linestrings]
# dangling_lines = [(type(i) is LineString and i.is_empty) for i in intersections]
# intersections = [False if is_end else (p if type(p) is Point else p.geoms[0]) for p, is_end in zip(intersections, dangling_lines)]
# as all intersections are on the same circle we can guestimate angle by
# estimating distance, as circumfence is 2*pi*r, thus distance ~ proportional with radius.
if len(intersections) > 1:
clustering = AgglomerativeClustering(None, linkage="ward", distance_threshold=2*math.pi * radius / 6)
coords = np.asarray([i.coords for i in intersections]).reshape((-1,2))
assigned_clusters = clustering.fit_predict(coords)
else:
assigned_clusters = [0] # only one item
clusters = defaultdict(lambda: [])
for point, c in zip(intersections, assigned_clusters):
clusters[c] = point
cluster_remainders = defaultdict(lambda: [])
for point, line, c in zip(intersections, remaining_lines, assigned_clusters):
clusters[c].append(point)
cluster_remainders[c].append(line)
points = []
for c, points in clusters:
line_clusters = []
for c, points in clusters.items():
mean = np.mean(points, axis=0)
point = len(points) / len(assigned_clusters)
prob = p_factor * len(points) / len(assigned_clusters)
points.append(PointCluster(mean, points, point))
split_lines = [shapely.ops.split(line, point) for line, point in zip(linestrings, intersections)]
remaining_lines = [l[1] for l in split_lines if len(l) > 1]
remaining_lines = cluster_remainders[c]
remaining_lines = list(filter(None, remaining_lines))
print(points)
next_points = cluster_predictions_by_radius(mean, remaining_lines, radius, prob)
line_clusters.append(PointCluster(mean, start_point, points, prob, next_points))
# split_lines = [shapely.ops.split(line, point) for line, point in zip(linestrings, intersections)]
# remaining_lines = [l[1] for l in split_lines if len(l) > 1]
# print(line_clusters)
return line_clusters
@ -280,7 +326,7 @@ def cluster_predictions_by_radius(start_point, lines, radius = .5):
def draw_track_predictions(img: cv2.Mat, track: Track, color_index: int, camera:Camera, convert_points: Optional[Callable], anim_position=.8):
def draw_track_predictions(img: cv2.Mat, track: Track, color_index: int, camera:Camera, convert_points: Optional[Callable], anim_position=.8, as_clusters=False):
"""
anim_position: 0-1
"""
@ -306,26 +352,42 @@ def draw_track_predictions(img: cv2.Mat, track: Track, color_index: int, camera:
line_points = np.concatenate(([current_point], pred_coords)) # 'current point' is amoving target
# print(pred_coords, current_point, line_points)
line_points = transition_path_points(line_points, slide_t)
if convert_points:
line_points = convert_points(line_points)
line_points = np.rint(line_points).astype(int)
# color = (128,0,128) if pred_i else (128,128,0)
lines.append(line_points)
# TODO)) implement:
# these points are alerayd projected. unlike `current_point` UNDO that, and cluster
# on actual (meter) positions.
cluster_predictions_by_radius(current_point, lines)
if as_clusters:
clusters = cluster_predictions_by_radius(current_point, lines, 1.5)
def draw_cluster(img, cluster: PointCluster):
points = convert_points([cluster.start, cluster.point])
# cv2 only draws to integer coordinates
points = np.rint(points).astype(int)
thickness = max(1, int(cluster.probability * 6))
if len(cluster.next_point_clusters) == 1:
# not a final point, nor a split:
cv2.line(img, points[0], points[1], color, thickness, lineType=cv2.LINE_AA)
else:
cv2.arrowedLine(img, points[0], points[1], color, thickness, cv2.LINE_AA)
for sub in cluster.next_point_clusters:
draw_cluster(img, sub)
# pass
# # cv2.circle(img, end, 2, color, 1, lineType=cv2.LINE_AA)
# print(clusters)
for cluster in clusters:
draw_cluster(img, cluster)
else:
# convert function (e.g. to project points to img space)
if convert_points:
lines = [convert_points(points) for points in lines]
# cv2 only draws to integer coordinates
lines = [np.rint(points).astype(int) for points in lines]
# draw in a single pass
line_points = line_points.reshape((1, -1,1,2))
cv2.polylines(img, lines, False, color, 2, cv2.LINE_AA)
# for start, end in zip(line_points[:-1], line_points[1:]):
# cv2.line(img, start, end, color, 2, lineType=cv2.LINE_AA)
# pass
# # cv2.circle(img, end, 2, color, 1, lineType=cv2.LINE_AA)
def draw_trackjectron_history(img: cv2.Mat, track: Track, color_index: int, convert_points: Optional[Callable]):
if not track.predictor_history: