791 lines
No EOL
28 KiB
Python
791 lines
No EOL
28 KiB
Python
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
import argparse
|
|
from collections import defaultdict
|
|
from copy import deepcopy
|
|
from enum import IntFlag
|
|
from itertools import cycle
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
import time
|
|
import types
|
|
from typing import Iterable, Optional, Tuple, Union, List
|
|
import cv2
|
|
from dataclasses import dataclass, field
|
|
import dataclasses
|
|
|
|
from nptyping import Float64, NDArray, Shape
|
|
import numpy as np
|
|
from deep_sort_realtime.deep_sort.track import Track as DeepsortTrack
|
|
from deep_sort_realtime.deep_sort.track import TrackState as DeepsortTrackState
|
|
from bytetracker.byte_tracker import STrack as ByteTrackTrack
|
|
from bytetracker.basetrack import TrackState as ByteTrackTrackState
|
|
import pandas as pd
|
|
from shapely import Point
|
|
|
|
|
|
from trap.utils import get_bins, inv_lerp, lerp
|
|
from trajectron.environment import Environment, Node, Scene
|
|
from urllib.parse import urlparse
|
|
from cv2.typing import MatLike
|
|
logger = logging.getLogger('trap.base')
|
|
|
|
class UrlOrPath():
|
|
"""
|
|
Some video sources are on a path (files), others a url (some cameras).
|
|
Provide some utilities to easily deal with either.
|
|
"""
|
|
def __init__(self, string):
|
|
self.url = urlparse(str(string))
|
|
|
|
def __str__(self) -> str:
|
|
return self.url.geturl()
|
|
|
|
def is_url(self) -> bool:
|
|
return len(self.url.netloc) > 0
|
|
|
|
def path(self) -> Path:
|
|
if self.is_url():
|
|
return Path(self.url.path)
|
|
return Path(self.url.geturl()) # can include scheme, such as C:/
|
|
|
|
class Space(IntFlag):
|
|
Image = 1 # As detected in the image
|
|
Undistorted = 2 # After applying lense undistortiion
|
|
World = 4 # After lens undistort and homography
|
|
Render = 8 # View space of renderer
|
|
|
|
|
|
@dataclass
|
|
class Position:
|
|
x: float
|
|
y: float
|
|
conf: float
|
|
state: DetectionState
|
|
frame_nr: int
|
|
det_class: str
|
|
|
|
|
|
|
|
class DetectionState(IntFlag):
|
|
Tentative = 1 # state before n_init (see DeepsortTrack)
|
|
Confirmed = 2 # after tentative
|
|
Lost = 4 # lost when DeepsortTrack.time_since_update > 0 but not Deleted
|
|
Interpolated = 8 # A position estimated through interpolation of adjecent detections
|
|
|
|
@classmethod
|
|
def from_deepsort_track(cls, track: DeepsortTrack):
|
|
if track.state == DeepsortTrackState.Tentative:
|
|
return cls.Tentative
|
|
if track.state == DeepsortTrackState.Confirmed:
|
|
if track.time_since_update > 0:
|
|
return cls.Lost
|
|
return cls.Confirmed
|
|
raise RuntimeError("Should not run into Deleted entries here")
|
|
|
|
@classmethod
|
|
def from_bytetrack_track(cls, track: ByteTrackTrack):
|
|
if track.state == ByteTrackTrackState.New:
|
|
return cls.Tentative
|
|
if track.state == ByteTrackTrackState.Lost:
|
|
return cls.Lost
|
|
# if track.time_since_update > 0:
|
|
if track.state == ByteTrackTrackState.Tracked:
|
|
return cls.Confirmed
|
|
raise RuntimeError("Should not run into Deleted entries here")
|
|
|
|
|
|
|
|
def H_from_path(path: Path):
|
|
if path.suffix == '.json':
|
|
with path.open('r') as fp:
|
|
H = np.array(json.load(fp))
|
|
else:
|
|
H = np.loadtxt(path, delimiter=',')
|
|
return H
|
|
|
|
|
|
PointList = List[Tuple[float, float]] | np.ndarray | cv2.typing.MatLike
|
|
|
|
|
|
def scale_homography(H: cv2.Mat, scale: float):
|
|
"""Transform the given matrix so that it immediately converts
|
|
the points to img space"""
|
|
new_H = H.copy()
|
|
new_H[:2] = H[:2] * scale
|
|
return new_H
|
|
|
|
|
|
class DistortedCamera(ABC):
|
|
@abstractmethod
|
|
def undistort_img(self, img: MatLike):
|
|
return cv2.remap(img, self.map1, self.map2, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
|
|
|
|
def project_img(self, undistorted_img: MatLike, scale: float = 1.0):
|
|
w, h = undistorted_img.shape[1], undistorted_img.shape[0]
|
|
if scale != 1:
|
|
H = scale_homography(self.H, scale)
|
|
else:
|
|
H = self.H
|
|
return cv2.warpPerspective(undistorted_img, H,(w, h))
|
|
|
|
|
|
def img_to_world(self, img: MatLike, scale = 1.):
|
|
img = self.undistort_img(img)
|
|
return self.project_img(img, scale)
|
|
|
|
@abstractmethod
|
|
def undistort_points(self, distorted_points: PointList):
|
|
pass
|
|
|
|
def project_point(self, point):
|
|
return self.project_points([point])[0]
|
|
|
|
def project_points(self, points: PointList, scale: float = 1.0):
|
|
if scale != 1:
|
|
H = scale_homography(self.H, scale)
|
|
else:
|
|
H = self.H
|
|
|
|
coords = cv2.perspectiveTransform(np.array([points]),H)
|
|
# if coords.shape[1:] == (1,2):
|
|
coords = np.reshape(coords, (len(points), 2))
|
|
|
|
return coords
|
|
|
|
@classmethod
|
|
def from_calibfile(cls, calibration_path, H, fps):
|
|
with calibration_path.open('r') as fp:
|
|
data = json.load(fp)
|
|
camera = cls.from_calibdata(data, H, fps)
|
|
|
|
return camera
|
|
|
|
|
|
@classmethod
|
|
def from_paths(cls, calibration_path: Path, h_path: Path, fps: float):
|
|
H = H_from_path(h_path)
|
|
with calibration_path.open('r') as fp:
|
|
calibdata = json.load(fp)
|
|
if 'type' in calibdata and calibdata['type'] == 'fisheye':
|
|
camera = FisheyeCamera.from_calibdata(calibdata, H, fps)
|
|
elif 'type' in calibdata and calibdata['type'] == 'undistorted':
|
|
camera = UndistortedCamera(calibdata['fps'])
|
|
else:
|
|
camera = Camera.from_calibdata(calibdata, H, fps)
|
|
|
|
return camera
|
|
|
|
# return cls.from_calibfile(calibration_path, H, fps)
|
|
|
|
def points_img_to_world(self, points: PointList, scale = 1.):
|
|
# undistort & project
|
|
coords = self.undistort_points(points)
|
|
|
|
coords = self.project_points(coords, scale)
|
|
return coords
|
|
|
|
|
|
|
|
class FisheyeCamera(DistortedCamera):
|
|
def __init__(self, dim1, dim2, dim3, K, D, new_K, scaled_K, balance, H, fps):
|
|
# dimensions as per: https://medium.com/@kennethjiang/calibrate-fisheye-lens-using-opencv-part-2-13990f1b157f
|
|
self.dim1 = dim1 # original image
|
|
self.dim2 = dim2 # dimension of the box you want to keep after un-distorting the image. influced by balance
|
|
self.dim3 = dim3 # Dimension of the final box where OpenCV will put the undistorted image.
|
|
self.K = K
|
|
self.D = D
|
|
self.new_K = new_K
|
|
self.scaled_K = scaled_K
|
|
self.balance = balance
|
|
|
|
self.H = H # Homography
|
|
|
|
self._R = np.eye(3)
|
|
self.fps = fps
|
|
|
|
|
|
self.map1, self.map2 = cv2.fisheye.initUndistortRectifyMap(self.scaled_K, self.D, self._R, self.new_K, self.dim3, cv2.CV_16SC2)
|
|
# self.map1, self.map2 = cv2.fisheye.initUndistortRectifyMap(self.scaled_K, self.D, self._R, self.new_K, self.dim3, cv2.CV_32FC1)
|
|
|
|
def undistort_img(self, img: MatLike):
|
|
# map1, map2 = adjust_remap_maps(self.map1, self.map2, 2, (0,0))
|
|
# this only works on the undistort, but screws up when doing subsequent homography,
|
|
# there needs to be a way to combine both this remap and warpPerspective into a
|
|
# single remap call...
|
|
# scale = 0.3
|
|
# cx = self.dim3[0] / 2
|
|
# cy = self.dim3[1] / 2
|
|
|
|
# map1 = (self.map1 - cx) / scale + cx
|
|
# map2 = (self.map2 - cy) / scale + cy
|
|
|
|
# map1 += 900 #translate x (>0 left, <0 right)
|
|
# map2 += 1500 #translate y (>0 up, <0 down)
|
|
|
|
|
|
return cv2.remap(img, self.map1, self.map2, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
|
|
|
|
def undistort_points(self, distorted_points: PointList):
|
|
points = cv2.fisheye.undistortPoints (np.array([distorted_points]).astype(np.float32), K=self.scaled_K, D=self.D, R=self._R, P=self.new_K)
|
|
|
|
return points[0]
|
|
|
|
@property
|
|
def projected_w(self):
|
|
return self.dim3[0]
|
|
|
|
@property
|
|
def projected_h(self):
|
|
return self.dim3[1]
|
|
|
|
|
|
@classmethod
|
|
def from_calibdata(cls, data, H, fps):
|
|
return cls(
|
|
data['dim1'],
|
|
data['dim2'],
|
|
data['dim3'],
|
|
np.array(data['K']),
|
|
np.array(data['D']),
|
|
np.array(data['new_K']),
|
|
np.array(data['scaled_K']),
|
|
data['balance'],
|
|
H, fps)
|
|
|
|
|
|
|
|
class UndistortedCamera(DistortedCamera):
|
|
def __init__(self, fps = 10):
|
|
self.fps = fps
|
|
self.H = np.eye(3,3)
|
|
|
|
def undistort_img(self, img: MatLike):
|
|
return deepcopy(img)
|
|
|
|
def undistort_points(self, distorted_points: PointList):
|
|
return deepcopy(distorted_points)
|
|
|
|
|
|
|
|
|
|
class Camera(DistortedCamera):
|
|
def __init__(self, mtx: cv2.Mat, dist: cv2.Mat, w: float, h: float, H: cv2.Mat, fps: float):
|
|
self.mtx = mtx
|
|
self.dist = dist
|
|
self.w = w
|
|
self.h = h
|
|
self.H = H
|
|
self.fps = fps
|
|
|
|
self.newcameramtx, self.roi = cv2.getOptimalNewCameraMatrix(self.mtx, self.dist, (self.w,self.h), 1, (self.w,self.h))
|
|
|
|
@classmethod
|
|
def from_calibdata(cls, data, H, fps):
|
|
|
|
return cls(
|
|
np.array(data['camera_matrix']),
|
|
np.array(data['dist_coeff']),
|
|
data['dim']['width'],
|
|
data['dim']['height'],
|
|
H, fps)
|
|
|
|
@property
|
|
def projected_w(self):
|
|
return self.w
|
|
|
|
@property
|
|
def projected_h(self):
|
|
return self.h
|
|
|
|
|
|
|
|
def undistort_img(self, img: MatLike):
|
|
return cv2.undistort(img, self.mtx, self.dist, None, self.newcameramtx)
|
|
|
|
|
|
def undistort_points(self, distorted_points: PointList):
|
|
points = cv2.undistortPoints(np.array([distorted_points]).astype('float32'), self.mtx, self.dist, None, self.newcameramtx)
|
|
# print(points.reshape())
|
|
return points.reshape(points.shape[0], 2)
|
|
|
|
|
|
@dataclass
|
|
class Detection:
|
|
track_id: str # deepsort track id association
|
|
l: int # left - image space
|
|
t: int # top - image space
|
|
w: int # width - image space
|
|
h: int # height - image space
|
|
conf: float # object detector probablity
|
|
state: DetectionState
|
|
frame_nr: int
|
|
det_class: str
|
|
|
|
def get_foot_coords(self) -> list[float, float]:
|
|
return [self.l + 0.5 * self.w, self.t+self.h]
|
|
|
|
@classmethod
|
|
def from_deepsort(cls, dstrack: DeepsortTrack, frame_nr: int):
|
|
return cls(dstrack.track_id, *dstrack.to_ltwh(), dstrack.det_conf or 0, DetectionState.from_deepsort_track(dstrack), frame_nr, dstrack.det_class)
|
|
|
|
|
|
@classmethod
|
|
def from_bytetrack(cls, bstrack: ByteTrackTrack, frame_nr: int):
|
|
return cls(bstrack.track_id, *bstrack.tlwh, bstrack.score, DetectionState.from_bytetrack_track(bstrack), frame_nr, bstrack.cls)
|
|
|
|
def get_scaled(self, scale: float = 1):
|
|
if scale == 1:
|
|
return self
|
|
|
|
return Detection(
|
|
self.track_id,
|
|
self.l*scale,
|
|
self.t*scale,
|
|
self.w*scale,
|
|
self.h*scale,
|
|
self.conf,
|
|
self.state,
|
|
self.frame_nr,
|
|
self.det_class)
|
|
|
|
def to_ltwh(self):
|
|
return (int(self.l), int(self.t), int(self.w), int(self.h))
|
|
|
|
def to_ltrb(self):
|
|
return (int(self.l), int(self.t), int(self.l+self.w), int(self.t+self.h))
|
|
|
|
# Proxy'd Track, which caches projected history
|
|
class ProjectedTrack(object):
|
|
def __init__(self, track: Track, camera: Camera):
|
|
self._track = track
|
|
self.camera = camera # keep to wrap other calls
|
|
self.projected_history = track.get_projected_history(camera=camera)
|
|
|
|
# TODO wrap functions of Track()
|
|
|
|
def __getattr__(self, attr):
|
|
return getattr(self._track, attr)
|
|
|
|
|
|
@dataclass
|
|
class Track:
|
|
"""A bit of an haphazardous wrapper around the 'real' tracker to provide
|
|
a history, with which the predictor can work, as we then can deduce velocity
|
|
and acceleration.
|
|
"""
|
|
track_id: str = None
|
|
history: List[Detection] = field(default_factory=list)
|
|
predictor_history: Optional[list] = None # in image space
|
|
predictions: Optional[list] = None
|
|
fps: int = 12 # TODO)) convert this to camera? That way, incorporates H and dist, alternatively, each track is as a whole attached to a space
|
|
source: Optional[int] = None # to keep track of processed tracks
|
|
lost: bool = False
|
|
created_at: Optional[float] = None
|
|
frame_index: int = 0
|
|
updated_at: Optional[float] = None
|
|
|
|
def __post_init__(self):
|
|
if not self.created_at:
|
|
self.created_at = time.time()
|
|
if not self.updated_at:
|
|
self.updated_at = time.time()
|
|
|
|
def track_age(self) -> float:
|
|
return time.time() - self.created_at
|
|
|
|
def track_update_dt(self) -> float:
|
|
return time.time() - self.updated_at
|
|
|
|
def get_projected_history(self, H: Optional[cv2.Mat] = None, camera: Optional[DistortedCamera]= None) -> NDArray[Shape["*, 2"], Float64]:
|
|
foot_coordinates = [d.get_foot_coords() for d in self.history]
|
|
# TODO)) Undistort points before perspective transform
|
|
if len(foot_coordinates):
|
|
if camera:
|
|
coords = camera.points_img_to_world(foot_coordinates)
|
|
return coords
|
|
# coords = cv2.undistortPoints(np.array([foot_coordinates]).astype('float32'), camera.mtx, camera.dist, None, camera.newcameramtx)
|
|
# coords = cv2.perspectiveTransform(np.array(coords),camera.H)
|
|
# return coords.reshape((coords.shape[0],2))
|
|
else:
|
|
coords = cv2.perspectiveTransform(np.array([foot_coordinates]),H)
|
|
return coords[0]
|
|
return np.empty(shape=(0,2)) #np.array([], shape)
|
|
|
|
def get_projected_history_as_dict(self, H, camera: Optional[DistortedCamera]= None) -> dict:
|
|
coords = self.get_projected_history(H, camera)
|
|
return [{"x":c[0], "y":c[1]} for c in coords]
|
|
|
|
def get_with_interpolated_history(self) -> Track:
|
|
# new_history = [Detection(d.track_id, l, t, w, h, d.conf, d.state, d.frame_nr, d.det_class) for l, t, w, h, d in zip(ls,ts,ws,hs, track.history)]
|
|
# new_track = Track(track.track_id, new_history, track.predictor_history, track.predictions)
|
|
new_history = []
|
|
for j in range(len(self.history)):
|
|
a = self.history[j]
|
|
new_history.append(Detection(a.track_id, a.l, a.t, a.w, a.h, a.conf, a.state, a.frame_nr, a.det_class))
|
|
|
|
if j+1 >= len(self.history):
|
|
break
|
|
|
|
b = self.history[j+1]
|
|
gap = b.frame_nr - a.frame_nr
|
|
if gap < 1:
|
|
logger.error(f"WARNING, gap between frames {a.frame_nr} -> {b.frame_nr} is negative?")
|
|
if gap > 1:
|
|
for g in range(1, gap):
|
|
l = lerp(a.l, b.l, g/gap)
|
|
t = lerp(a.t, b.t, g/gap)
|
|
w = lerp(a.w, b.w, g/gap)
|
|
h = lerp(a.h, b.h, g/gap)
|
|
conf = 0
|
|
state = DetectionState.Lost
|
|
frame_nr = a.frame_nr + g
|
|
new_history.append(Detection(a.track_id, l, t, w, h, conf, state, frame_nr, a.det_class))
|
|
|
|
return self.get_with_new_history(new_history)
|
|
|
|
def get_with_new_history(self, new_history: List[Detection]):
|
|
|
|
return Track(
|
|
self.track_id,
|
|
new_history,
|
|
self.predictor_history,
|
|
self.predictions,
|
|
self.fps,
|
|
self.source,
|
|
self.lost,
|
|
self.created_at,
|
|
self.frame_index,
|
|
self.updated_at)
|
|
|
|
def is_complete(self):
|
|
diffs = [(b.frame_nr - a.frame_nr) for a,b in zip(self.history[:-1], self.history[1:])]
|
|
return any([d != 1 for d in diffs])
|
|
|
|
|
|
def get_sampled(self, step_size = 1, offset=0):
|
|
"""Get copy of track, with every n-th frame"""
|
|
if not self.is_complete():
|
|
t = self.get_with_interpolated_history()
|
|
else:
|
|
t = self
|
|
|
|
return Track(
|
|
t.track_id,
|
|
t.history[offset::step_size],
|
|
t.predictor_history,
|
|
t.predictions,
|
|
t.fps/step_size,
|
|
self.source,
|
|
self.lost,
|
|
self.created_at,
|
|
self.frame_index,
|
|
self.updated_at)
|
|
|
|
def get_simplified_history(self, distance: float, camera: Camera) -> list[tuple[float, float]]:
|
|
# TODO)) Simplify to get a point every n-th meter
|
|
# usefull for both predicting and rendering with laser
|
|
# raise RuntimeError("Not Implemented Yet")
|
|
if len(self.history) < 1:
|
|
return []
|
|
|
|
|
|
path = self.get_projected_history(H=None, camera=camera)
|
|
new_path: List[dict] = [path[0]]
|
|
lengths = np.sqrt(np.sum(np.diff(path, axis=0)**2, axis=1))
|
|
cum_lengths = np.cumsum(lengths)
|
|
pos = distance
|
|
for a, b, l_a, l_b in zip(path[:-1], path[1:], cum_lengths[:-1], cum_lengths[1:]):
|
|
# check if segment has our next point (pos)
|
|
# because running sequentially, this is if point b
|
|
# is lower then our target position
|
|
if l_b <= pos:
|
|
continue
|
|
|
|
relative_t = inv_lerp(l_a, l_b, pos)
|
|
x = lerp(a[0], b[0], relative_t)
|
|
y = lerp(a[1], b[1], relative_t)
|
|
new_path.append([x,y])
|
|
pos += distance
|
|
|
|
return new_path
|
|
|
|
def get_simplified_history_with_absolute_distance(self, distance: float, camera: Camera) -> list[tuple[float, float]]:
|
|
# Similar to get_simplified_history, but with absolute world-space distance
|
|
# not the distance of the track length
|
|
|
|
if len(self.history) < 1:
|
|
return []
|
|
|
|
|
|
path = self.get_projected_history(H=None, camera=camera)
|
|
new_path: List[dict] = [path[0]]
|
|
|
|
distance_sq = distance**2
|
|
|
|
for a, b in zip(path[:-1], path[1:]):
|
|
# check if segment has our next point (pos)
|
|
# because running sequentially, this is if point b
|
|
# is lower then our target position
|
|
b_distance_sq = ((b[0]-new_path[0])**2 + (b[1]-new_path[1])**2)
|
|
|
|
if b_distance_sq <= distance_sq:
|
|
continue
|
|
|
|
a_distance_sq = ((a[0]-new_path[0])**2 + (a[1]-new_path[1])**2)
|
|
|
|
relative_t = inv_lerp(a_distance_sq, b_distance_sq, distance_sq)
|
|
x = lerp(a[0], b[0], relative_t)
|
|
y = lerp(a[1], b[1], relative_t)
|
|
new_path.append([x,y])
|
|
|
|
return new_path
|
|
|
|
|
|
|
|
|
|
def get_binned(self, bin_size, camera: Camera, bin_start=True):
|
|
"""
|
|
For an experiment: what if we predict using only concrete positions, by mapping
|
|
dx,dy to a grid. Thus prediction can be for 8 moves, or rather headings
|
|
see ~/notes/attachments example svg
|
|
"""
|
|
|
|
history = self.get_projected_history_as_dict(H=None, camera=camera)
|
|
|
|
def round_to_grid_precision(x):
|
|
factor = 1/bin_size
|
|
return round(x * factor) / factor
|
|
|
|
new_history: List[dict] = []
|
|
for i, (det0, det1) in enumerate(zip(history[:-1], history[1:])):
|
|
if i == 0:
|
|
new_history.append({
|
|
'x': round_to_grid_precision(det0['x']),
|
|
'y': round_to_grid_precision(det0['y'])
|
|
} if bin_start else det0)
|
|
continue
|
|
if abs(det1['x'] - new_history[-1]['x']) < bin_size and abs(det1['y'] - new_history[-1]['y']) < bin_size:
|
|
continue
|
|
|
|
# det1 falls outside of the box [-bin_size:+bin_size] around last detection
|
|
|
|
# 1. Interpolate exact point between det0 and det1 that this happens
|
|
if abs(det1['x'] - new_history[-1]['x']) >= bin_size:
|
|
if det1['x'] - new_history[-1]['x'] >= bin_size:
|
|
# det1 left of last
|
|
x = new_history[-1]['x'] + bin_size
|
|
f = inv_lerp(det0['x'], det1['x'], x)
|
|
elif new_history[-1]['x'] - det1['x'] >= bin_size:
|
|
# det1 left of last
|
|
x = new_history[-1]['x'] - bin_size
|
|
f = inv_lerp(det0['x'], det1['x'], x)
|
|
y = lerp(det0['y'], det1['y'], f)
|
|
if abs(det1['y'] - new_history[-1]['y']) >= bin_size:
|
|
if det1['y'] - new_history[-1]['y'] >= bin_size:
|
|
# det1 left of last
|
|
y = new_history[-1]['y'] + bin_size
|
|
f = inv_lerp(det0['y'], det1['y'], y)
|
|
elif new_history[-1]['y'] - det1['y'] >= bin_size:
|
|
# det1 left of last
|
|
y = new_history[-1]['y'] - bin_size
|
|
f = inv_lerp(det0['y'], det1['y'], y)
|
|
x = lerp(det0['x'], det1['x'], f)
|
|
|
|
|
|
# 2. Find closest point on rectangle (rectangle's four corners, or 4 midpoints)
|
|
points = get_bins(bin_size)
|
|
points = [[new_history[-1]['x']+p[0], new_history[-1]['y'] + p[1]] for p in points]
|
|
|
|
distances = [np.linalg.norm([p[0] - x, p[1]-y]) for p in points]
|
|
closest = np.argmin(distances)
|
|
|
|
point = points[closest]
|
|
|
|
new_history.append({'x': point[0], 'y':point[1]})
|
|
# todo Offsets to points:[ history for in points]
|
|
return new_history
|
|
|
|
def to_dataframe(self, camera: Camera) -> pd.DataFrame:
|
|
positions = self.get_projected_history(None, camera)
|
|
velocity = np.gradient(positions, 1/self.fps, axis=0)
|
|
acceleration = np.gradient(velocity, 1/self.fps, axis=0)
|
|
|
|
# # we can calculate heading based on the velocity components
|
|
# heading = (np.arctan2(velocity[:,1], velocity[:,0]) * 180 / np.pi) % 360
|
|
|
|
# # and derive it to get the rate of change of the heading
|
|
# d_heading = np.gradient(heading, 1/self.fps, axis=0)
|
|
|
|
data_columns = pd.MultiIndex.from_product([['position', 'velocity', 'acceleration'], ['x', 'y']])
|
|
# data_columns = data_columns.append(pd.MultiIndex.from_tuples([('heading', '°'), ('heading', 'd°')]))
|
|
|
|
|
|
# vx = derivative_of(x, scene.dt)
|
|
# vy = derivative_of(y, scene.dt)
|
|
# ax = derivative_of(vx, scene.dt)
|
|
# ay = derivative_of(vy, scene.dt)
|
|
|
|
data_dict = {
|
|
('position', 'x'): positions[:,0],
|
|
('position', 'y'): positions[:,1],
|
|
('velocity', 'x'): velocity[:,0],
|
|
('velocity', 'y'): velocity[:,1],
|
|
('acceleration', 'x'): acceleration[:,0],
|
|
('acceleration', 'y'): acceleration[:,1],
|
|
# ('heading', '°'): heading,
|
|
# ('heading', 'd°'): d_heading,
|
|
}
|
|
|
|
return pd.DataFrame(data_dict, columns=data_columns)
|
|
|
|
def to_flat_dataframe(self, camera: Camera) -> pd.DataFrame:
|
|
positions = self.get_projected_history(None, camera)
|
|
data = pd.DataFrame(positions, columns=['x', 'y'])
|
|
|
|
data['dx'] = data['x'].diff()
|
|
data['dy'] = data['y'].diff()
|
|
|
|
return data.bfill()
|
|
|
|
def to_trajectron_node(self, camera: Camera, env: Environment) -> Node:
|
|
node_data = self.to_dataframe(camera)
|
|
new_first_idx = self.history[0].frame_nr
|
|
|
|
return Node(node_type=env.NodeType.PEDESTRIAN, node_id=self.track_id, data=node_data, first_timestep=new_first_idx)
|
|
|
|
@dataclass
|
|
class Frame:
|
|
index: int
|
|
img: np.array
|
|
time: float= field(default_factory=lambda: time.time())
|
|
tracks: Optional[dict[str, Track]] = None
|
|
H: Optional[np.array] = None
|
|
camera: Optional[Camera] = None
|
|
maps: Optional[List[cv2.Mat]] = None
|
|
log: dict = field(default_factory=lambda: {}) # settings used during processing. All intermediate nodes can store their config here
|
|
|
|
def aslist(self) -> List[dict]:
|
|
return { t.track_id:
|
|
{
|
|
'id': t.track_id,
|
|
'history': t.get_projected_history(self.H).tolist(),
|
|
'det_conf': t.history[-1].conf,
|
|
# 'det_conf': trajectory_data[node.id]['det_conf'],
|
|
# 'bbox': trajectory_data[node.id]['bbox'],
|
|
# 'history': history.tolist(),
|
|
'predictions': t.predictions
|
|
} for t in self.tracks.values()
|
|
}
|
|
|
|
def without_img(self):
|
|
return Frame(self.index, None, self.time, self.tracks, self.H, self.camera, self.maps)
|
|
|
|
|
|
class DataclassJSONEncoder(json.JSONEncoder):
|
|
def default(self, o):
|
|
if isinstance(o, np.ndarray):
|
|
return o.tolist()
|
|
# if isinstance(o, np.float32):
|
|
# return "float32!{o}"
|
|
if dataclasses.is_dataclass(o):
|
|
if isinstance(o, Frame):
|
|
tracks = {}
|
|
for track_id, track in o.tracks.items():
|
|
track_obj = dataclasses.asdict(track)
|
|
track_obj['history'] = track.get_projected_history(None, o.camera)
|
|
tracks[track_id] = track_obj
|
|
d = {
|
|
'index': o.index,
|
|
'time': o.time,
|
|
'tracks': tracks,
|
|
'camera': dataclasses.asdict(o.camera),
|
|
}
|
|
else:
|
|
d = dataclasses.asdict(o)
|
|
# if isinstance(o, Frame):
|
|
# # Don't send images over JSON
|
|
# del d['img']
|
|
return d
|
|
return super().default(o)
|
|
|
|
|
|
def video_src_from_config(config) -> Iterable[UrlOrPath]:
|
|
"""deprecated, now in video_source"""
|
|
if config.video_loop:
|
|
video_srcs: Iterable[UrlOrPath] = cycle(config.video_src)
|
|
else:
|
|
video_srcs: Iterable[UrlOrPath] = config.video_src
|
|
return video_srcs
|
|
|
|
|
|
@dataclass
|
|
class Trajectory:
|
|
# TODO)) Replace history and predictions in Track with Trajectory
|
|
space: Space
|
|
fps: int = 12
|
|
points: List[Detection] = field(default_factory=list)
|
|
|
|
def __iter__(self):
|
|
for d in self.points:
|
|
yield d
|
|
|
|
|
|
class HomographyAction(argparse.Action):
|
|
def __init__(self, option_strings, dest, nargs=None, **kwargs):
|
|
if nargs is not None:
|
|
raise ValueError("nargs not allowed")
|
|
super().__init__(option_strings, dest, **kwargs)
|
|
def __call__(self, parser, namespace, values: Path, option_string=None):
|
|
if values.suffix == '.json':
|
|
with values.open('r') as fp:
|
|
H = np.array(json.load(fp))
|
|
else:
|
|
H = np.loadtxt(values, delimiter=',')
|
|
|
|
setattr(namespace, self.dest, values)
|
|
setattr(namespace, 'H', H)
|
|
|
|
class CameraAction(argparse.Action):
|
|
def __init__(self, option_strings, dest, nargs=None, **kwargs):
|
|
if nargs is not None:
|
|
raise ValueError("nargs not allowed")
|
|
super().__init__(option_strings, dest, **kwargs)
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
if values is None:
|
|
setattr(namespace, self.dest, None)
|
|
else:
|
|
values = Path(values)
|
|
with values.open('r') as fp:
|
|
data = json.load(fp)
|
|
if 'type' in data and data['type'] == 'fisheye':
|
|
camera = FisheyeCamera.from_calibfile(Path(values), namespace.H, namespace.camera_fps)
|
|
elif 'type' in data and data['type'] == 'undistorted':
|
|
camera = UndistortedCamera(namespace.camera_fps)
|
|
else:
|
|
camera = Camera.from_calibfile(Path(values), namespace.H, namespace.camera_fps)
|
|
# # print(data)
|
|
# # print(data['camera_matrix'])
|
|
# # camera = {
|
|
# # 'camera_matrix': np.array(data['camera_matrix']),
|
|
# # 'dist_coeff': np.array(data['dist_coeff']),
|
|
# # }
|
|
# camera = Camera(np.array(data['camera_matrix']), np.array(data['dist_coeff']), data['dim']['width'], data['dim']['height'], namespace.H, namespace.camera_fps)
|
|
|
|
setattr(namespace, 'camera', camera)
|
|
|
|
class LambdaParser(argparse.ArgumentParser):
|
|
"""Execute lambda functions
|
|
"""
|
|
def parse_args(self, args=None, namespace=None):
|
|
args = super().parse_args(args, namespace)
|
|
|
|
for key in vars(args):
|
|
f = args.__dict__[key]
|
|
if type(f) == types.LambdaType:
|
|
print(f'Getting default value for {key}')
|
|
args.__dict__[key] = f()
|
|
|
|
return args |